jstorm-runner: support SourceMetrics.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fec423e5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fec423e5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fec423e5 Branch: refs/heads/jstorm-runner Commit: fec423e51148d26495fb8e6d17ac204b161f3069 Parents: 0c38844 Author: Pei He <[email protected]> Authored: Wed Sep 6 12:37:21 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 8 14:42:28 2017 +0800 ---------------------------------------------------------------------- runners/jstorm/pom.xml | 1 - .../translation/UnboundedSourceSpout.java | 30 ++++++++++++++------ 2 files changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fec423e5/runners/jstorm/pom.xml ---------------------------------------------------------------------- diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml index 681adb5..a433fcb 100644 --- a/runners/jstorm/pom.xml +++ b/runners/jstorm/pom.xml @@ -90,7 +90,6 @@ <excludedGroups> org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream </excludedGroups> http://git-wip-us.apache.org/repos/asf/beam/blob/fec423e5/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java index 73f1f0d..92d2f24 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java @@ -26,12 +26,14 @@ import backtype.storm.tuple.Values; import com.alibaba.jstorm.metric.MetricClient; import com.alibaba.jstorm.metrics.Gauge; import com.alibaba.jstorm.utils.KryoSerializer; +import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.runners.jstorm.JStormPipelineOptions; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -49,7 +51,7 @@ import org.slf4j.LoggerFactory; public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); - private final String name; + private final String stepName; private final String description; private final UnboundedSource source; private final SerializedPipelineOptions serializedOptions; @@ -58,6 +60,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou private transient JStormPipelineOptions pipelineOptions; private transient UnboundedSource.UnboundedReader reader; private transient SpoutOutputCollector collector; + private transient MetricsReporter metricsReporter; private volatile boolean hasNextRecord; private AtomicBoolean activated = new AtomicBoolean(); @@ -67,12 +70,12 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou private long lastWaterMark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); public UnboundedSourceSpout( - String name, + String stepName, String description, UnboundedSource source, JStormPipelineOptions options, TupleTag<?> outputTag) { - this.name = name; + this.stepName = checkNotNull(stepName, "stepName"); this.description = checkNotNull(description, "description"); this.source = checkNotNull(source, "source"); this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); @@ -121,18 +124,23 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); this.serializer = new KryoSerializer<>(conf); - createSourceReader(null); - new MetricClient(context).registerGauge( + // init metrics + MetricClient metricClient = new MetricClient(context); + metricClient.registerGauge( context.getThisComponentId() + CommonInstance.BEAM_SOURCE_WATERMARK_METRICS, new Gauge<Double>() { @Override public Double getValue() { return (double) reader.getWatermark().getMillis(); }}); + metricsReporter = MetricsReporter.create(metricClient); + + createSourceReader(null); } public synchronized void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) { - try { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { if (reader != null) { reader.close(); } @@ -148,11 +156,13 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou if (!activated.get()) { return; } - try { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer( + metricsReporter.getMetricsContainer(stepName))) { if (!hasNextRecord) { hasNextRecord = reader.advance(); } + boolean emitted = false; while (hasNextRecord && activated.get()) { Object value = reader.getCurrent(); Instant timestamp = reader.getCurrentTimestamp(); @@ -169,10 +179,14 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou byte[] immutableValue = serializer.serialize(wv); collector.emit(outputTag.getId(), new Values(immutableValue)); } + emitted = true; // move to next record hasNextRecord = reader.advance(); } + if (emitted) { + metricsReporter.updateMetrics(); + } Instant waterMark = reader.getWatermark(); if (waterMark != null && lastWaterMark < waterMark.getMillis()) { @@ -187,7 +201,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou } public String getName() { - return name; + return stepName; } public TupleTag getOutputTag() {
