Add IO metrics to Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c69d25e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c69d25e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c69d25e Branch: refs/heads/master Commit: 2c69d25e3fda4ac0d9503da7cee3835e4f705506 Parents: 0a7e6c3 Author: JingsongLi <[email protected]> Authored: Wed Mar 29 23:35:44 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Apr 21 11:21:41 2017 +0200 ---------------------------------------------------------------------- .../flink/FlinkBatchTransformTranslators.java | 3 +- .../FlinkStreamingTransformTranslators.java | 2 + .../flink/metrics/ReaderInvocationUtil.java | 71 ++++++++++++++++++++ .../translation/wrappers/SourceInputFormat.java | 20 ++++-- .../streaming/io/BoundedSourceWrapper.java | 17 +++-- .../streaming/io/UnboundedSourceWrapper.java | 18 +++-- .../streaming/UnboundedSourceWrapperTest.java | 12 ++-- 7 files changed, 124 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 57f677c..cb33fc1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -136,7 +136,8 @@ class FlinkBatchTransformTranslators { DataSource<WindowedValue<T>> dataSource = new DataSource<>( context.getExecutionEnvironment(), - new SourceInputFormat<>(source, context.getPipelineOptions()), + new SourceInputFormat<>( + context.getCurrentTransform().getFullName(), source, context.getPipelineOptions()), typeInformation, name); http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 2730236..c024493 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -155,6 +155,7 @@ class FlinkStreamingTransformTranslators { try { UnboundedSourceWrapper<T, ?> sourceWrapper = new UnboundedSourceWrapper<>( + context.getCurrentTransform().getFullName(), context.getPipelineOptions(), transform.getSource(), context.getExecutionEnvironment().getParallelism()); @@ -187,6 +188,7 @@ class FlinkStreamingTransformTranslators { try { BoundedSourceWrapper<T> sourceWrapper = new BoundedSourceWrapper<>( + context.getCurrentTransform().getFullName(), context.getPipelineOptions(), transform.getSource(), context.getExecutionEnvironment().getParallelism()); http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java new file mode 100644 index 0000000..38263d9 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Util for invoking {@link Source.Reader} methods that might require a + * {@link org.apache.beam.sdk.metrics.MetricsContainer} to be active. + * Source.Reader decorator which registers {@link org.apache.beam.sdk.metrics.MetricsContainer}. + * It update metrics to Flink metric and accumulator in start and advance. + */ +public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT>> { + + private final FlinkMetricContainer container; + private final Boolean enableMetrics; + + public ReaderInvocationUtil( + PipelineOptions options, + FlinkMetricContainer container) { + FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class); + enableMetrics = flinkPipelineOptions.getEnableMetrics(); + this.container = container; + } + + public boolean invokeStart(ReaderT reader) throws IOException { + if (enableMetrics) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + boolean result = reader.start(); + container.updateMetrics(); + return result; + } + } else { + return reader.start(); + } + + } + public boolean invokeAdvance(ReaderT reader) throws IOException { + if (enableMetrics) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + boolean result = reader.advance(); + container.updateMetrics(); + return result; + } + } else { + return reader.advance(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 12be8eb..f2b81fc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.flink.translation.wrappers; import java.io.IOException; import java.util.List; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; @@ -28,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; @@ -40,9 +43,10 @@ import org.slf4j.LoggerFactory; * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}. */ public class SourceInputFormat<T> - implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> { + extends RichInputFormat<WindowedValue<T>, SourceInputSplit<T>> { private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); + private final String stepName; private final BoundedSource<T> initialSource; private transient PipelineOptions options; @@ -51,7 +55,11 @@ public class SourceInputFormat<T> private transient BoundedSource.BoundedReader<T> reader; private boolean inputAvailable = false; - public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { + private transient ReaderInvocationUtil<T, BoundedSource.BoundedReader<T>> readerInvoker; + + public SourceInputFormat( + String stepName, BoundedSource<T> initialSource, PipelineOptions options) { + this.stepName = stepName; this.initialSource = initialSource; this.serializedOptions = new SerializedPipelineOptions(options); } @@ -63,8 +71,12 @@ public class SourceInputFormat<T> @Override public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { + FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + readerInvoker = + new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); - inputAvailable = reader.start(); + inputAvailable = readerInvoker.invokeStart(reader); } @Override @@ -129,7 +141,7 @@ public class SourceInputFormat<T> final T current = reader.getCurrent(); final Instant timestamp = reader.getCurrentTimestamp(); // advance reader to have a record ready next time - inputAvailable = reader.advance(); + inputAvailable = readerInvoker.invokeAdvance(reader); return WindowedValue.of( current, timestamp, http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 2ed5024..a142685 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -42,6 +44,7 @@ public class BoundedSourceWrapper<OutputT> private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class); + private String stepName; /** * Keep the options so that we can initialize the readers. */ @@ -66,9 +69,11 @@ public class BoundedSourceWrapper<OutputT> @SuppressWarnings("unchecked") public BoundedSourceWrapper( + String stepName, PipelineOptions pipelineOptions, BoundedSource<OutputT> source, int parallelism) throws Exception { + this.stepName = stepName; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism; @@ -99,6 +104,10 @@ public class BoundedSourceWrapper<OutputT> numSubtasks, localSources); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker = + new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + readers = new ArrayList<>(); // initialize readers from scratch for (BoundedSource<OutputT> source : localSources) { @@ -109,13 +118,13 @@ public class BoundedSourceWrapper<OutputT> // the easy case, we just read from one reader BoundedSource.BoundedReader<OutputT> reader = readers.get(0); - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } while (isRunning) { - dataAvailable = reader.advance(); + dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); @@ -131,7 +140,7 @@ public class BoundedSourceWrapper<OutputT> // start each reader and emit data if immediately available for (BoundedSource.BoundedReader<OutputT> reader : readers) { - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } @@ -142,7 +151,7 @@ public class BoundedSourceWrapper<OutputT> boolean hadData = false; while (isRunning && !readers.isEmpty()) { BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader); - boolean dataAvailable = reader.advance(); + boolean dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index bb9b58a..ee20fd5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.coders.Coder; @@ -64,6 +66,7 @@ public class UnboundedSourceWrapper< private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); + private final String stepName; /** * Keep the options so that we can initialize the localReaders. */ @@ -131,9 +134,11 @@ public class UnboundedSourceWrapper< @SuppressWarnings("unchecked") public UnboundedSourceWrapper( + String stepName, PipelineOptions pipelineOptions, UnboundedSource<OutputT, CheckpointMarkT> source, int parallelism) throws Exception { + this.stepName = stepName; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); if (source.requiresDeduping()) { @@ -209,6 +214,11 @@ public class UnboundedSourceWrapper< context = ctx; + FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker = + new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + + if (localReaders.size() == 0) { // do nothing, but still look busy ... // also, output a Long.MAX_VALUE watermark since we know that we're not @@ -238,7 +248,7 @@ public class UnboundedSourceWrapper< // the easy case, we just read from one reader UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0); - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } @@ -246,7 +256,7 @@ public class UnboundedSourceWrapper< setNextWatermarkTimer(this.runtimeContext); while (isRunning) { - dataAvailable = reader.advance(); + dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); @@ -263,7 +273,7 @@ public class UnboundedSourceWrapper< // start each reader and emit data if immediately available for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) { - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } @@ -274,7 +284,7 @@ public class UnboundedSourceWrapper< boolean hadData = false; while (isRunning) { UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader); - boolean dataAvailable = reader.advance(); + boolean dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 90f95d6..0cb528a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -111,7 +111,7 @@ public class UnboundedSourceWrapperTest { // elements later. TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); + new UnboundedSourceWrapper<>("stepName", options, source, numSplits); assertEquals(numSplits, flinkWrapper.getSplitSources().size()); @@ -179,7 +179,7 @@ public class UnboundedSourceWrapperTest { // elements later. TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); + new UnboundedSourceWrapper<>("stepName", options, source, numSplits); assertEquals(numSplits, flinkWrapper.getSplitSources().size()); @@ -270,7 +270,7 @@ public class UnboundedSourceWrapperTest { TestCountingSource restoredSource = new TestCountingSource(numElements); UnboundedSourceWrapper< KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, restoredSource, numSplits); + new UnboundedSourceWrapper<>("stepName", options, restoredSource, numSplits); assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size()); @@ -343,7 +343,7 @@ public class UnboundedSourceWrapperTest { } }; UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); + new UnboundedSourceWrapper<>("stepName", options, source, numSplits); OperatorStateStore backend = mock(OperatorStateStore.class); @@ -370,7 +370,7 @@ public class UnboundedSourceWrapperTest { UnboundedSourceWrapper< KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, new TestCountingSource(numElements), + new UnboundedSourceWrapper<>("stepName", options, new TestCountingSource(numElements), numSplits); StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper); @@ -429,7 +429,7 @@ public class UnboundedSourceWrapperTest { TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, parallelism); + new UnboundedSourceWrapper<>("stepName", options, source, parallelism); InstantiationUtil.serializeObject(flinkWrapper); }
