jstorm-runner: support distribution metrics, this is required by nexmark.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0c388447 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0c388447 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0c388447 Branch: refs/heads/jstorm-runner Commit: 0c388447813a93ab9afc45af591417a82f8c4b1b Parents: 56ad7a8 Author: Pei He <[email protected]> Authored: Wed Sep 6 11:05:05 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 8 14:42:28 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/TestJStormRunner.java | 1 + .../jstorm/translation/JStormMetricResults.java | 38 +++++++++++-- .../jstorm/translation/MetricsReporter.java | 56 ++++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index b28c127..e9f6337 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -61,6 +61,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { private TestJStormRunner(JStormPipelineOptions options) { this.options = options; Map conf = Maps.newHashMap(); + conf.put("topology.metric.sample.rate", 1); // Default state backend is RocksDB, for the users who could not run RocksDB on local testing // env, following config is used to configure state backend to memory. // conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java index 986bf0c..01d4441 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java @@ -22,7 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.alibaba.jstorm.common.metric.AsmCounter; import com.alibaba.jstorm.common.metric.AsmGauge; import com.alibaba.jstorm.common.metric.AsmHistogram; +import com.alibaba.jstorm.common.metric.snapshot.AsmHistogramSnapshot; import com.alibaba.jstorm.metric.AsmWindow; +import com.alibaba.jstorm.metrics.Snapshot; import com.google.auto.value.AutoValue; import java.util.ArrayList; import java.util.List; @@ -87,7 +89,35 @@ public class JStormMetricResults extends MetricResults { new Instant(0)))); } - return JStormMetricQueryResults.create(counters, gauges); + List<MetricResult<DistributionResult>> distributions = new ArrayList<>(); + for (Map.Entry<String, AsmHistogram> entry : histogramMap.entrySet()) { + MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey()); + if (!MetricFiltering.matches(filter, metricKey)) { + continue; + } + AsmHistogram histogram = entry.getValue(); + histogram.forceFlush(); + + Snapshot snapshot = + ((AsmHistogramSnapshot) histogram.getSnapshots().get(AsmWindow.M10_WINDOW)).getSnapshot(); + // TODO: Sum and count might be under estimated, because JStorm histogram only store a fixed + // number of values. + long sum = 0; + for (long v : snapshot.getValues()) { + sum += v; + } + distributions.add( + JStormMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + DistributionResult.create( + sum, + snapshot.size(), + snapshot.getMin(), + snapshot.getMax()))); + } + + return JStormMetricQueryResults.create(counters, gauges, distributions); } @AutoValue @@ -97,8 +127,10 @@ public class JStormMetricResults extends MetricResults { public static MetricQueryResults create( Iterable<MetricResult<Long>> counters, - Iterable<MetricResult<GaugeResult>> gauges) { - return new AutoValue_JStormMetricResults_JStormMetricQueryResults(counters, gauges, null); + Iterable<MetricResult<GaugeResult>> gauges, + Iterable<MetricResult<DistributionResult>> distributions) { + return new AutoValue_JStormMetricResults_JStormMetricQueryResults( + counters, gauges, distributions); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0c388447/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java index 1e38d1c..7867a83 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -21,12 +21,14 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import com.alibaba.jstorm.common.metric.AsmCounter; +import com.alibaba.jstorm.common.metric.AsmHistogram; import com.alibaba.jstorm.metric.MetricClient; import com.alibaba.jstorm.metrics.Gauge; import com.google.common.collect.Maps; import java.util.Map; import org.apache.beam.runners.core.metrics.MetricKey; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -45,6 +47,7 @@ class MetricsReporter { private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); private final Map<String, Long> reportedCounters = Maps.newHashMap(); + private final Map<String, DistributionResult> reportedDistributions = Maps.newHashMap(); private final MetricClient metricClient; public static MetricsReporter create(MetricClient metricClient) { @@ -77,6 +80,7 @@ class MetricsReporter { metricResults.queryMetrics(MetricsFilter.builder().build()); updateCounters(metricQueryResults.counters()); updateGauges(metricQueryResults.gauges()); + updateDistributions(metricQueryResults.distributions()); } private void updateCounters(Iterable<MetricResult<Long>> counters) { @@ -105,6 +109,58 @@ class MetricsReporter { } } + private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) { + for (final MetricResult<DistributionResult> distributionResult : distributions) { + String metricName = getMetricNameString(COUNTER_PREFIX, distributionResult); + AsmHistogram histogram = metricClient.registerHistogram(metricName); + DistributionResult distribution = distributionResult.attempted(); + if (distribution.count() == 0) { + return; + } + DistributionResult oldDistribution = reportedDistributions.get(metricName); + reportedDistributions.put(metricName, distribution); + Long newMin; + Long newMax; + long restCount; + long restSum; + if (oldDistribution == null) { + newMin = distribution.min(); + newMax = distribution.min() != distribution.max() ? distribution.max() : null; + restCount = distribution.count(); + restSum = distribution.sum(); + } else { + newMin = distribution.min() < oldDistribution.min() ? distribution.min() : null; + newMax = + (distribution.max() > oldDistribution.max() && distribution.min() != distribution.max()) + ? distribution.max() : null; + restCount = distribution.count() - oldDistribution.count(); + restSum = distribution.sum() - oldDistribution.sum(); + } + if (newMin != null) { + histogram.update(newMin); + restCount--; + restSum -= newMin; + } + if (newMax != null) { + histogram.update(newMax); + restCount--; + restSum -= newMax; + } + if (restCount == 0) { + return; + } + long restAvg = restSum / restCount; + long restMod = restSum % restCount; + for (long i = 0; i < restCount; ++i) { + if (i == 0) { + histogram.update(restAvg + restMod); + } else { + histogram.update(restAvg); + } + } + } + } + private String getMetricNameString(String prefix, MetricResult<?> metricResult) { return prefix + CommonInstance.METRIC_KEY_SEPARATOR + metricResult.step()
