Repository: samza Updated Branches: refs/heads/master b22909855 -> ff7bc452f
SAMZA-733: added metrics for the Elasticsearch Producer Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ff7bc452 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ff7bc452 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ff7bc452 Branch: refs/heads/master Commit: ff7bc452f261e6c1f18b964568d544ee4f925756 Parents: b229098 Author: Roger Hoover <[email protected]> Authored: Wed Jul 22 11:29:23 2015 -0700 Committer: Yan Fang <[email protected]> Committed: Wed Jul 22 11:29:23 2015 -0700 ---------------------------------------------------------------------- .../org/apache/samza/metrics/MetricGroup.java | 68 ++++++++++++++++++++ .../org/apache/samza/metrics/MetricsBase.java | 53 +++++++++++++++ .../apache/samza/metrics/MetricsHelper.scala | 22 +++---- .../ElasticsearchSystemFactory.java | 3 +- .../ElasticsearchSystemProducer.java | 24 ++++++- .../ElasticsearchSystemProducerMetrics.java | 37 +++++++++++ .../ElasticsearchSystemProducerMetricsTest.java | 53 +++++++++++++++ .../ElasticsearchSystemProducerTest.java | 5 +- 8 files changed, 251 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java new file mode 100644 index 0000000..53526d8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java @@ -0,0 +1,68 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.metrics; + +/** + * MetricGroup is a little helper class to make it easy to register and + * manage a group of counters, gauges and timers. It's shared between Java + * and Scala + */ +public class MetricGroup { + + public interface ValueFunction<T> { + T getValue(); + } + + protected final MetricsRegistry registry; + protected final String groupName; + protected final String prefix; + + public MetricGroup(String groupName, String prefix, MetricsRegistry registry) { + this.groupName = groupName; + this.registry = registry; + this.prefix = prefix; + } + + public Counter newCounter(String name) { + return registry.newCounter(groupName, (prefix + name).toLowerCase()); + } + + public <T> Gauge<T> newGauge(String name, T value) { + return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value)); + } + + /* + * Specify a dynamic gauge that always returns the latest value when polled. + * The value closure/object must be thread safe, since metrics reporters may access + * it from another thread. + */ + public <T> Gauge<T> newGauge(String name, final ValueFunction<T> valueFunc) { + return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), valueFunc.getValue()) { + @Override + public T getValue() { + return valueFunc.getValue(); + } + }); + } + + public Timer newTimer(String name) { + return registry.newTimer(groupName, (prefix + name).toLowerCase()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java new file mode 100644 index 0000000..7db3b65 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricsBase.java @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.metrics; + +/** + * A base class for metrics. The name of the class that extends the + * base class will be used as the metric group name + */ +public abstract class MetricsBase { + protected final MetricGroup group; + + public MetricsBase(String prefix, MetricsRegistry registry) { + String groupName = this.getClass().getName(); + group = new MetricGroup(groupName, prefix, registry); + } + + public MetricsBase(MetricsRegistry registry) { + this("", registry); + } + + public Counter newCounter(String name) { + return group.newCounter(name); + } + + public <T> Gauge<T> newGauge(String name, T value) { + return group.newGauge(name, value); + } + + public <T> Gauge<T> newGauge(String name, final MetricGroup.ValueFunction<T> valueFunc) { + return group.newGauge(name, valueFunc); + } + + public Timer newTimer(String name) { + return group.newTimer(name); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala index 8eac8ef..1520b0e 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala @@ -19,22 +19,26 @@ package org.apache.samza.metrics +import org.apache.samza.metrics.MetricGroup.ValueFunction + /** * MetricsHelper is a little helper class to make it easy to register and * manage counters, gauges and timers. + * + * The name of the class that extends this trait will be used as the + * metric group name */ trait MetricsHelper { val group = this.getClass.getName val registry: MetricsRegistry + val metricGroup = new MetricGroup(group, getPrefix, registry) - def newCounter(name: String) = { - registry.newCounter(group, (getPrefix + name).toLowerCase) - } + def newCounter(name: String) = metricGroup.newCounter(name) - def newGauge[T](name: String, value: T) = { - registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value)) - } + def newTimer(name: String) = metricGroup.newTimer(name) + + def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value) /** * Specify a dynamic gauge that always returns the latest value when polled. @@ -42,15 +46,11 @@ trait MetricsHelper { * it from another thread. */ def newGauge[T](name: String, value: () => T) = { - registry.newGauge(group, new Gauge((getPrefix + name).toLowerCase, value()) { + metricGroup.newGauge(name, new ValueFunction[T] { override def getValue = value() }) } - def newTimer(name: String) = { - registry.newTimer(group, (getPrefix + name).toLowerCase) - } - /** * Returns a prefix for metric names. */ http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java index a277b69..d8ca70e 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java @@ -55,7 +55,8 @@ public class ElasticsearchSystemFactory implements SystemFactory { return new ElasticsearchSystemProducer(name, getBulkProcessorFactory(elasticsearchConfig), getClient(elasticsearchConfig), - getIndexRequestFactory(elasticsearchConfig)); + getIndexRequestFactory(elasticsearchConfig), + new ElasticsearchSystemProducerMetrics(name, metricsRegistry)); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java index 7eb14a2..f61bd36 100644 --- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java @@ -23,10 +23,13 @@ import org.apache.samza.SamzaException; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,16 +64,19 @@ public class ElasticsearchSystemProducer implements SystemProducer { private final IndexRequestFactory indexRequestFactory; private final BulkProcessorFactory bulkProcessorFactory; + private final ElasticsearchSystemProducerMetrics metrics; private Client client; public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory, - Client client, IndexRequestFactory indexRequestFactory) { + Client client, IndexRequestFactory indexRequestFactory, + ElasticsearchSystemProducerMetrics metrics) { this.system = system; this.sourceBulkProcessor = new HashMap<>(); this.bulkProcessorFactory = bulkProcessorFactory; this.client = client; this.indexRequestFactory = indexRequestFactory; + this.metrics = metrics; } @@ -102,6 +108,7 @@ public class ElasticsearchSystemProducer implements SystemProducer { if (response.hasFailures()) { sendFailed.set(true); } else { + updateSuccessMetrics(response); LOGGER.info(String.format("Written %s messages from %s to %s.", response.getItems().length, source, system)); } @@ -142,4 +149,19 @@ public class ElasticsearchSystemProducer implements SystemProducer { LOGGER.info(String.format("Flushed %s to %s.", source, system)); } + + private void updateSuccessMetrics(BulkResponse response) { + metrics.bulkSendSuccess.inc(); + for (BulkItemResponse itemResp: response.getItems()) { + ActionResponse resp = itemResp.getResponse(); + if (resp instanceof IndexResponse) { + if (((IndexResponse)resp).isCreated()) { + metrics.inserts.inc(); + } + else { + metrics.updates.inc(); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java ---------------------------------------------------------------------- diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java new file mode 100644 index 0000000..e3b635b --- /dev/null +++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.elasticsearch; + +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsBase; +import org.apache.samza.metrics.MetricsRegistry; + +public class ElasticsearchSystemProducerMetrics extends MetricsBase { + public final Counter bulkSendSuccess; + public final Counter inserts; + public final Counter updates; + + public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) { + super(systemName + "-", registry); + + bulkSendSuccess = newCounter("bulk-send-success"); + inserts = newCounter("docs-inserted"); + updates = newCounter("docs-updated"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java ---------------------------------------------------------------------- diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java new file mode 100644 index 0000000..980964f --- /dev/null +++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.elasticsearch; + +import org.apache.samza.metrics.*; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.*; + +public class ElasticsearchSystemProducerMetricsTest { + + public final static String GRP_NAME = "org.apache.samza.system.elasticsearch.ElasticsearchSystemProducerMetrics"; + + @Test + public void testMetrics() { + ReadableMetricsRegistry registry = new MetricsRegistryMap(); + ElasticsearchSystemProducerMetrics metrics = new ElasticsearchSystemProducerMetrics("es", registry); + metrics.bulkSendSuccess.inc(29L); + metrics.inserts.inc(); + metrics.updates.inc(7L); + + Set<String> groups = registry.getGroups(); + assertEquals(1, groups.size()); + assertEquals(GRP_NAME, groups.toArray()[0]); + + Map<String, Metric> metricMap = registry.getGroup(GRP_NAME); + assertEquals(3, metricMap.size()); + assertEquals(29L, ((Counter) metricMap.get("es-bulk-send-success")).getCount()); + assertEquals(1L, ((Counter) metricMap.get("es-docs-inserted")).getCount()); + assertEquals(7L, ((Counter) metricMap.get("es-docs-updated")).getCount()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/ff7bc452/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java ---------------------------------------------------------------------- diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java index e63d62c..684d7f6 100644 --- a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java +++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java @@ -20,6 +20,7 @@ package org.apache.samza.system.elasticsearch; import org.apache.samza.SamzaException; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory; @@ -43,6 +44,7 @@ public class ElasticsearchSystemProducerTest { private static final BulkProcessorFactory BULK_PROCESSOR_FACTORY = mock(BulkProcessorFactory.class); private static final Client CLIENT = mock(Client.class); private static final IndexRequestFactory INDEX_REQUEST_FACTORY = mock(IndexRequestFactory.class); + private static final ElasticsearchSystemProducerMetrics METRICS = new ElasticsearchSystemProducerMetrics("es", new MetricsRegistryMap()); public static final String SOURCE_ONE = "one"; public static final String SOURCE_TWO = "two"; private SystemProducer producer; @@ -54,7 +56,8 @@ public class ElasticsearchSystemProducerTest { producer = new ElasticsearchSystemProducer(SYSTEM_NAME, BULK_PROCESSOR_FACTORY, CLIENT, - INDEX_REQUEST_FACTORY); + INDEX_REQUEST_FACTORY, + METRICS); processorOne = mock(BulkProcessor.class); processorTwo = mock(BulkProcessor.class);
