Repository: beam Updated Branches: refs/heads/jstorm-runner d2b285122 -> ef70031b7
jstorm-runner: saves MetricResults in local JStormRunnerResult after cancelling. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56ad7a85 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56ad7a85 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56ad7a85 Branch: refs/heads/jstorm-runner Commit: 56ad7a852acf66ebd1061e276317481629b270c4 Parents: d2b2851 Author: Pei He <[email protected]> Authored: Wed Sep 6 11:05:51 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 8 14:42:27 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunnerResult.java | 38 +++++++++++++++++++- .../beam/runners/jstorm/TestJStormRunner.java | 18 ---------- .../jstorm/translation/JStormMetricResults.java | 25 +++++++++---- 3 files changed, 56 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/56ad7a85/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 3962ca2..8848717 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -23,10 +23,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import backtype.storm.Config; import backtype.storm.LocalCluster; import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.metric.AsmMetricRegistry; import com.alibaba.jstorm.metric.JStormMetrics; +import com.alibaba.jstorm.task.error.TaskReportErrorAndDie; import com.alibaba.jstorm.utils.JStormUtils; import java.io.IOException; +import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.jstorm.translation.CommonInstance; import org.apache.beam.runners.jstorm.translation.JStormMetricResults; @@ -62,6 +65,7 @@ public abstract class JStormRunnerResult implements PipelineResult { this.topologyName = checkNotNull(topologyName, "topologyName"); } + @Override public State getState() { return null; } @@ -79,6 +83,7 @@ public abstract class JStormRunnerResult implements PipelineResult { private final LocalCluster localCluster; private final long localModeExecuteTimeSecs; private boolean cancelled; + private MetricResults savedMetricResults; LocalJStormPipelineResult( String topologyName, @@ -89,12 +94,27 @@ public abstract class JStormRunnerResult implements PipelineResult { this.localCluster = checkNotNull(localCluster, "localCluster"); this.localModeExecuteTimeSecs = localModeExecuteTimeSecs; this.cancelled = false; + this.savedMetricResults = null; + } + + @Override + public State getState() { + if (cancelled) { + return State.CANCELLED; + } else if (globalWatermark() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + return State.DONE; + } else { + return State.RUNNING; + } } @Override public State cancel() throws IOException { + savedMetricResults = metrics(); localCluster.killTopology(getTopologyName()); localCluster.shutdown(); + clearPAssertCount(); + TaskReportErrorAndDie.setExceptionRecord(null); JStormUtils.sleepMs(1000); cancelled = true; return State.CANCELLED; @@ -129,7 +149,12 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - return new JStormMetricResults(); + if (savedMetricResults != null) { + return savedMetricResults; + } + AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics(); + return new JStormMetricResults( + metricRegistry.getCounters(), metricRegistry.getGauges(), metricRegistry.getHistograms()); } private long globalWatermark() { @@ -151,5 +176,16 @@ public abstract class JStormRunnerResult implements PipelineResult { return BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); } } + + private void clearPAssertCount() { + AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics(); + Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry<String, AsmMetric> metric = itr.next(); + if (metric.getKey().contains(getTopologyName())) { + itr.remove(); + } + } + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/56ad7a85/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index b637b7c..b28c127 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -19,15 +19,11 @@ package org.apache.beam.runners.jstorm; import static com.google.common.base.Preconditions.checkNotNull; -import com.alibaba.jstorm.common.metric.AsmMetric; -import com.alibaba.jstorm.metric.AsmMetricRegistry; -import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.task.error.TaskReportErrorAndDie; import com.alibaba.jstorm.utils.JStormUtils; import com.google.common.base.Optional; import com.google.common.collect.Maps; import java.io.IOException; -import java.util.Iterator; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; @@ -111,9 +107,7 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { throw new AssertionError("Assertion checks timed out."); } } finally { - clearPAssertCount(); cancel(result); - TaskReportErrorAndDie.setExceptionRecord(null); } } @@ -188,18 +182,6 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { } } - private void clearPAssertCount() { - String topologyName = options.getJobName(); - AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics(); - Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator(); - while (itr.hasNext()) { - Map.Entry<String, AsmMetric> metric = itr.next(); - if (metric.getKey().contains(topologyName)) { - itr.remove(); - } - } - } - private void cancel(JStormRunnerResult result) { try { result.cancel(); http://git-wip-us.apache.org/repos/asf/beam/blob/56ad7a85/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java index dbaa28e..986bf0c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java @@ -17,11 +17,12 @@ */ package org.apache.beam.runners.jstorm.translation; +import static com.google.common.base.Preconditions.checkNotNull; + import com.alibaba.jstorm.common.metric.AsmCounter; import com.alibaba.jstorm.common.metric.AsmGauge; -import com.alibaba.jstorm.metric.AsmMetricRegistry; +import com.alibaba.jstorm.common.metric.AsmHistogram; import com.alibaba.jstorm.metric.AsmWindow; -import com.alibaba.jstorm.metric.JStormMetrics; import com.google.auto.value.AutoValue; import java.util.ArrayList; import java.util.List; @@ -42,12 +43,24 @@ import org.joda.time.Instant; * Implementation of {@link MetricResults} for the JStorm Runner. */ public class JStormMetricResults extends MetricResults { + + private final Map<String, AsmCounter> counterMap; + private final Map<String, AsmGauge> gaugeMap; + private final Map<String, AsmHistogram> histogramMap; + + public JStormMetricResults( + Map<String, AsmCounter> counterMap, + Map<String, AsmGauge> gaugeMap, + Map<String, AsmHistogram> histogramMap) { + this.counterMap = checkNotNull(counterMap, "counterMap"); + this.gaugeMap = checkNotNull(gaugeMap, "gaugeMap"); + this.histogramMap = checkNotNull(histogramMap, "histogramMap"); + } + @Override public MetricQueryResults queryMetrics(MetricsFilter filter) { - AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics(); - List<MetricResult<Long>> counters = new ArrayList<>(); - for (Map.Entry<String, AsmCounter> entry : metricRegistry.getCounters().entrySet()) { + for (Map.Entry<String, AsmCounter> entry : counterMap.entrySet()) { MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey()); if (!MetricFiltering.matches(filter, metricKey)) { continue; @@ -60,7 +73,7 @@ public class JStormMetricResults extends MetricResults { } List<MetricResult<GaugeResult>> gauges = new ArrayList<>(); - for (Map.Entry<String, AsmGauge> entry : metricRegistry.getGauges().entrySet()) { + for (Map.Entry<String, AsmGauge> entry : gaugeMap.entrySet()) { MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey()); if (!MetricFiltering.matches(filter, metricKey)) { continue;
