[BEAM-773] Implement Metrics support for Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a7e6c3d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a7e6c3d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a7e6c3d Branch: refs/heads/master Commit: 0a7e6c3d5937a19d33c9bf1945dd4af162d1e51b Parents: b382795 Author: JingsongLi <[email protected]> Authored: Wed Mar 22 15:56:50 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Apr 21 11:21:41 2017 +0200 ---------------------------------------------------------------------- runners/flink/pom.xml | 8 +- .../flink/FlinkBatchTransformTranslators.java | 4 +- .../flink/FlinkBatchTranslationContext.java | 4 + .../runners/flink/FlinkPipelineOptions.java | 5 + .../beam/runners/flink/FlinkRunnerResult.java | 3 +- .../FlinkStreamingTransformTranslators.java | 13 + .../flink/FlinkStreamingTranslationContext.java | 3 + .../metrics/DoFnRunnerWithMetricsUpdate.java | 91 ++++++ .../flink/metrics/FlinkMetricContainer.java | 315 +++++++++++++++++++ .../flink/metrics/FlinkMetricResults.java | 146 +++++++++ .../runners/flink/metrics/package-info.java | 22 ++ .../functions/FlinkDoFnFunction.java | 10 + .../functions/FlinkStatefulDoFnFunction.java | 10 + .../wrappers/streaming/DoFnOperator.java | 13 +- .../streaming/SplittableDoFnOperator.java | 2 + .../wrappers/streaming/WindowDoFnOperator.java | 2 + .../beam/runners/flink/PipelineOptionsTest.java | 10 + .../flink/streaming/DoFnOperatorTest.java | 5 + 18 files changed, 661 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index 6e1d3c5..e88b68a 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -58,7 +58,6 @@ <excludedGroups> org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream </excludedGroups> @@ -92,7 +91,6 @@ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream, org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs @@ -179,6 +177,12 @@ <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index ff9521c..57f677c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -571,7 +571,8 @@ class FlinkBatchTransformTranslators { (KvCoder<?, InputT>) context.getInput(transform).getCoder(); FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>( - (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(), + (DoFn) doFn, context.getCurrentTransform().getFullName(), + windowingStrategy, sideInputStrategies, context.getPipelineOptions(), outputMap, transform.getMainOutputTag() ); @@ -585,6 +586,7 @@ class FlinkBatchTransformTranslators { FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper = new FlinkDoFnFunction( doFn, + context.getCurrentTransform().getFullName(), windowingStrategy, sideInputStrategies, context.getPipelineOptions(), http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java index 98dd0fb..bb86cd9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java @@ -103,6 +103,10 @@ class FlinkBatchTranslationContext { this.currentTransform = currentTransform; } + public AppliedPTransform<?, ?, ?> getCurrentTransform() { + return currentTransform; + } + @SuppressWarnings("unchecked") public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) { return (DataSet<T>) broadcastDataSets.get(value); http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index ef9afea..b769a6f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -98,4 +98,9 @@ public interface FlinkPipelineOptions AbstractStateBackend getStateBackend(); void setStateBackend(AbstractStateBackend stateBackend); + @Description("Enable/disable Beam metrics in Flink Runner") + @Default.Boolean(true) + Boolean getEnableMetrics(); + void setEnableMetrics(Boolean enableMetrics); + } http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 0682b56..0f2462d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.flink; import java.io.IOException; import java.util.Collections; import java.util.Map; +import org.apache.beam.runners.flink.metrics.FlinkMetricResults; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; @@ -93,6 +94,6 @@ public class FlinkRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); + return new FlinkMetricResults(aggregators); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 71f315d..2730236 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -286,6 +286,7 @@ class FlinkStreamingTransformTranslators { interface DoFnOperatorFactory<InputT, OutputT> { DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator( DoFn<InputT, OutputT> doFn, + String stepName, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, @@ -300,6 +301,7 @@ class FlinkStreamingTransformTranslators { static <InputT, OutputT> void translateParDo( String transformName, DoFn<InputT, OutputT> doFn, + String stepName, PCollection<InputT> input, List<PCollectionView<?>> sideInputs, Map<TupleTag<?>, PValue> outputs, @@ -340,6 +342,7 @@ class FlinkStreamingTransformTranslators { DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, + context.getCurrentTransform().getFullName(), sideInputs, mainOutputTag, additionalOutputTags, @@ -365,6 +368,7 @@ class FlinkStreamingTransformTranslators { DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, + context.getCurrentTransform().getFullName(), sideInputs, mainOutputTag, additionalOutputTags, @@ -483,6 +487,7 @@ class FlinkStreamingTransformTranslators { ParDoTranslationHelper.translateParDo( transform.getName(), transform.getFn(), + context.getCurrentTransform().getFullName(), (PCollection<InputT>) context.getInput(transform), transform.getSideInputs(), context.getOutputs(transform), @@ -493,6 +498,7 @@ class FlinkStreamingTransformTranslators { @Override public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator( DoFn<InputT, OutputT> doFn, + String stepName, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, @@ -504,6 +510,7 @@ class FlinkStreamingTransformTranslators { Map<Integer, PCollectionView<?>> transformedSideInputs) { return new DoFnOperator<>( doFn, + stepName, inputCoder, mainOutputTag, additionalOutputTags, @@ -531,6 +538,7 @@ class FlinkStreamingTransformTranslators { ParDoTranslationHelper.translateParDo( transform.getName(), transform.newProcessFn(transform.getFn()), + context.getCurrentTransform().getFullName(), (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>) context.getInput(transform), transform.getSideInputs(), @@ -548,6 +556,7 @@ class FlinkStreamingTransformTranslators { DoFn< KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn, + String stepName, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, @@ -563,6 +572,7 @@ class FlinkStreamingTransformTranslators { Map<Integer, PCollectionView<?>> transformedSideInputs) { return new SplittableDoFnOperator<>( doFn, + stepName, inputCoder, mainOutputTag, additionalOutputTags, @@ -700,6 +710,7 @@ class FlinkStreamingTransformTranslators { WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator = new WindowDoFnOperator<>( reduceFn, + context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, new TupleTag<KV<K, Iterable<InputT>>>("main output"), Collections.<TupleTag<?>>emptyList(), @@ -800,6 +811,7 @@ class FlinkStreamingTransformTranslators { WindowDoFnOperator<K, InputT, OutputT> doFnOperator = new WindowDoFnOperator<>( reduceFn, + context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, new TupleTag<KV<K, OutputT>>("main output"), Collections.<TupleTag<?>>emptyList(), @@ -825,6 +837,7 @@ class FlinkStreamingTransformTranslators { WindowDoFnOperator<K, InputT, OutputT> doFnOperator = new WindowDoFnOperator<>( reduceFn, + context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, new TupleTag<KV<K, OutputT>>("main output"), Collections.<TupleTag<?>>emptyList(), http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 1a943a3..45ee14d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -107,6 +107,9 @@ class FlinkStreamingTranslationContext { return new CoderTypeInformation<>(windowedValueCoder); } + public AppliedPTransform<?, ?, ?> getCurrentTransform() { + return currentTransform; + } @SuppressWarnings("unchecked") public <T extends PValue> T getInput(PTransform<T, ?> transform) { http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java new file mode 100644 index 0000000..29a1a52 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.joda.time.Instant; + +/** + * {@link DoFnRunner} decorator which registers + * {@link org.apache.beam.sdk.metrics.MetricsContainer}. It updates metrics to Flink metrics and + * accumulators in {@link #finishBundle()}. + */ +public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + + private final FlinkMetricContainer container; + private final DoFnRunner<InputT, OutputT> delegate; + + public DoFnRunnerWithMetricsUpdate( + String stepName, + DoFnRunner<InputT, OutputT> delegate, + RuntimeContext runtimeContext) { + this.delegate = delegate; + container = new FlinkMetricContainer(stepName, runtimeContext); + } + + @Override + public void startBundle() { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.startBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void processElement(final WindowedValue<InputT> elem) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.processElement(elem); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp, + final TimeDomain timeDomain) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.onTimer(timerId, window, timestamp, timeDomain); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void finishBundle() { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.finishBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // update metrics + container.updateMetrics(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java new file mode 100644 index 0000000..d020f69 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; + +/** + * Helper class for holding a {@link MetricsContainer} and forwarding Beam metrics to + * Flink accumulators and metrics. + */ +public class FlinkMetricContainer { + + private static final String METRIC_KEY_SEPARATOR = "__"; + static final String COUNTER_PREFIX = "__counter"; + static final String DISTRIBUTION_PREFIX = "__distribution"; + static final String GAUGE_PREFIX = "__gauge"; + + private final MetricsContainer metricsContainer; + private final RuntimeContext runtimeContext; + private final Map<String, Counter> flinkCounterCache; + private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache; + private final Map<String, FlinkGauge> flinkGaugeCache; + + public FlinkMetricContainer(String stepName, RuntimeContext runtimeContext) { + metricsContainer = new MetricsContainer(stepName); + this.runtimeContext = runtimeContext; + flinkCounterCache = new HashMap<>(); + flinkDistributionGaugeCache = new HashMap<>(); + flinkGaugeCache = new HashMap<>(); + } + + public MetricsContainer getMetricsContainer() { + return metricsContainer; + } + + public void updateMetrics() { + // update metrics + MetricUpdates updates = metricsContainer.getUpdates(); + if (updates != null) { + updateCounters(updates.counterUpdates()); + updateDistributions(updates.distributionUpdates()); + updateGauge(updates.gaugeUpdates()); + metricsContainer.commitUpdates(); + } + } + + private void updateCounters(Iterable<MetricUpdates.MetricUpdate<Long>> updates) { + + for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates) { + + String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey()); + Long update = metricUpdate.getUpdate(); + + // update flink metric + Counter counter = flinkCounterCache.get(flinkMetricName); + if (counter == null) { + counter = runtimeContext.getMetricGroup().counter(flinkMetricName); + flinkCounterCache.put(flinkMetricName, counter); + } + counter.dec(counter.getCount()); + counter.inc(update); + + // update flink accumulator + Accumulator<Long, Long> accumulator = runtimeContext.getAccumulator(flinkMetricName); + if (accumulator == null) { + accumulator = new LongCounter(update); + runtimeContext.addAccumulator(flinkMetricName, accumulator); + } else { + accumulator.resetLocal(); + accumulator.add(update); + } + } + } + + private void updateDistributions(Iterable<MetricUpdates.MetricUpdate<DistributionData>> updates) { + + for (MetricUpdates.MetricUpdate<DistributionData> metricUpdate : updates) { + + String flinkMetricName = + getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricUpdate.getKey()); + DistributionData update = metricUpdate.getUpdate(); + + // update flink metric + FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName); + if (gauge == null) { + gauge = runtimeContext.getMetricGroup() + .gauge(flinkMetricName, new FlinkDistributionGauge(update)); + flinkDistributionGaugeCache.put(flinkMetricName, gauge); + } else { + gauge.update(update); + } + + // update flink accumulator + Accumulator<DistributionData, DistributionData> accumulator = + runtimeContext.getAccumulator(flinkMetricName); + if (accumulator == null) { + accumulator = new FlinkDistributionDataAccumulator(update); + runtimeContext.addAccumulator(flinkMetricName, accumulator); + } else { + accumulator.resetLocal(); + accumulator.add(update); + } + } + } + + private void updateGauge(Iterable<MetricUpdates.MetricUpdate<GaugeData>> updates) { + for (MetricUpdates.MetricUpdate<GaugeData> metricUpdate : updates) { + + String flinkMetricName = + getFlinkMetricNameString(GAUGE_PREFIX, metricUpdate.getKey()); + GaugeData update = metricUpdate.getUpdate(); + + // update flink metric + FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName); + if (gauge == null) { + gauge = runtimeContext.getMetricGroup() + .gauge(flinkMetricName, new FlinkGauge(update)); + flinkGaugeCache.put(flinkMetricName, gauge); + } else { + gauge.update(update); + } + + // update flink accumulator + Accumulator<GaugeData, GaugeData> accumulator = + runtimeContext.getAccumulator(flinkMetricName); + if (accumulator == null) { + accumulator = new FlinkGaugeAccumulator(update); + runtimeContext.addAccumulator(flinkMetricName, accumulator); + } + accumulator.resetLocal(); + accumulator.add(update); + } + } + + private static String getFlinkMetricNameString(String prefix, MetricKey key) { + return prefix + + METRIC_KEY_SEPARATOR + key.stepName() + + METRIC_KEY_SEPARATOR + key.metricName().namespace() + + METRIC_KEY_SEPARATOR + key.metricName().name(); + } + + static MetricKey parseMetricKey(String flinkMetricName) { + String[] arr = flinkMetricName.split(METRIC_KEY_SEPARATOR); + return MetricKey.create(arr[2], MetricName.named(arr[3], arr[4])); + } + + /** + * Flink {@link Gauge} for {@link DistributionData}. + */ + public static class FlinkDistributionGauge implements Gauge<DistributionData> { + + DistributionData data; + + FlinkDistributionGauge(DistributionData data) { + this.data = data; + } + + void update(DistributionData data) { + this.data = data; + } + + @Override + public DistributionData getValue() { + return data; + } + } + + /** + * Flink {@link Gauge} for {@link GaugeData}. + */ + public static class FlinkGauge implements Gauge<GaugeData> { + + GaugeData data; + + FlinkGauge(GaugeData data) { + this.data = data; + } + + void update(GaugeData update) { + this.data = data.combine(update); + } + + @Override + public GaugeData getValue() { + return data; + } + } + + /** + * Flink {@link Accumulator} for {@link GaugeData}. + */ + public static class FlinkDistributionDataAccumulator implements + Accumulator<DistributionData, DistributionData> { + + private static final long serialVersionUID = 1L; + + private DistributionData data; + + public FlinkDistributionDataAccumulator(DistributionData data) { + this.data = data; + } + + @Override + public void add(DistributionData value) { + if (data == null) { + this.data = value; + } else { + this.data = this.data.combine(value); + } + } + + @Override + public DistributionData getLocalValue() { + return data; + } + + @Override + public void resetLocal() { + data = null; + } + + @Override + public void merge(Accumulator<DistributionData, DistributionData> other) { + data = data.combine(other.getLocalValue()); + } + + @Override + public Accumulator<DistributionData, DistributionData> clone() { + try { + super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + + return new FlinkDistributionDataAccumulator( + DistributionData.create(data.sum(), data.count(), data.min(), data.max())); + } + } + + /** + * Flink {@link Accumulator} for {@link GaugeData}. + */ + public static class FlinkGaugeAccumulator implements Accumulator<GaugeData, GaugeData> { + + private GaugeData data; + + public FlinkGaugeAccumulator(GaugeData data) { + this.data = data; + } + + @Override + public void add(GaugeData value) { + if (data == null) { + this.data = value; + } else { + this.data = this.data.combine(value); + } + } + + @Override + public GaugeData getLocalValue() { + return data; + } + + @Override + public void resetLocal() { + this.data = null; + } + + @Override + public void merge(Accumulator<GaugeData, GaugeData> other) { + data = data.combine(other.getLocalValue()); + } + + @Override + public Accumulator<GaugeData, GaugeData> clone() { + try { + super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + + return new FlinkGaugeAccumulator( + GaugeData.create(data.value())); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java new file mode 100644 index 0000000..263a68e --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + + +import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.COUNTER_PREFIX; +import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.DISTRIBUTION_PREFIX; +import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.GAUGE_PREFIX; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricFiltering; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; + +/** + * Implementation of {@link MetricResults} for the Flink Runner. + */ +public class FlinkMetricResults extends MetricResults { + + private Map<String, Object> aggregators; + + public FlinkMetricResults(Map<String, Object> aggregators) { + this.aggregators = aggregators; + } + + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { + return new FlinkMetricQueryResults(filter); + } + + private class FlinkMetricQueryResults implements MetricQueryResults { + + private MetricsFilter filter; + + FlinkMetricQueryResults(MetricsFilter filter) { + this.filter = filter; + } + + @Override + public Iterable<MetricResult<Long>> counters() { + List<MetricResult<Long>> result = new ArrayList<>(); + for (Map.Entry<String, Object> entry : aggregators.entrySet()) { + if (entry.getKey().startsWith(COUNTER_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); + if (MetricFiltering.matches(filter, metricKey)) { + result.add(new FlinkMetricResult<>( + metricKey.metricName(), metricKey.stepName(), (Long) entry.getValue())); + } + } + } + return result; + } + + @Override + public Iterable<MetricResult<DistributionResult>> distributions() { + List<MetricResult<DistributionResult>> result = new ArrayList<>(); + for (Map.Entry<String, Object> entry : aggregators.entrySet()) { + if (entry.getKey().startsWith(DISTRIBUTION_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); + DistributionData data = (DistributionData) entry.getValue(); + if (MetricFiltering.matches(filter, metricKey)) { + result.add(new FlinkMetricResult<>( + metricKey.metricName(), metricKey.stepName(), data.extractResult())); + } + } + } + return result; + } + + @Override + public Iterable<MetricResult<GaugeResult>> gauges() { + List<MetricResult<GaugeResult>> result = new ArrayList<>(); + for (Map.Entry<String, Object> entry : aggregators.entrySet()) { + if (entry.getKey().startsWith(GAUGE_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); + GaugeData data = (GaugeData) entry.getValue(); + if (MetricFiltering.matches(filter, metricKey)) { + result.add(new FlinkMetricResult<>( + metricKey.metricName(), metricKey.stepName(), data.extractResult())); + } + } + } + return result; + } + + } + + private static class FlinkMetricResult<T> implements MetricResult<T> { + private final MetricName name; + private final String step; + private final T result; + + FlinkMetricResult(MetricName name, String step, T result) { + this.name = name; + this.step = step; + this.result = result; + } + + @Override + public MetricName name() { + return name; + } + + @Override + public String step() { + return step; + } + + @Override + public T committed() { + throw new UnsupportedOperationException("Flink runner does not currently support committed" + + " metrics results. Please use 'attempted' instead."); + } + + @Override + public T attempted() { + return result; + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java new file mode 100644 index 0000000..cfe77e4 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal metrics implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.metrics; http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 51582af..68ac780 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -21,6 +21,8 @@ import java.util.Collections; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -50,6 +52,7 @@ public class FlinkDoFnFunction<InputT, OutputT> private final SerializedPipelineOptions serializedOptions; private final DoFn<InputT, OutputT> doFn; + private final String stepName; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; private final WindowingStrategy<?, ?> windowingStrategy; @@ -61,6 +64,7 @@ public class FlinkDoFnFunction<InputT, OutputT> public FlinkDoFnFunction( DoFn<InputT, OutputT> doFn, + String stepName, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options, @@ -68,6 +72,7 @@ public class FlinkDoFnFunction<InputT, OutputT> TupleTag<OutputT> mainOutputTag) { this.doFn = doFn; + this.stepName = stepName; this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(options); this.windowingStrategy = windowingStrategy; @@ -103,6 +108,11 @@ public class FlinkDoFnFunction<InputT, OutputT> new FlinkAggregatorFactory(runtimeContext), windowingStrategy); + if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + .getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + doFnRunner.startBundle(); for (WindowedValue<InputT> value : values) { http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index c8193d2..3e02bee 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -30,6 +30,8 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -55,6 +57,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> { private final DoFn<KV<K, V>, OutputT> dofn; + private String stepName; private final WindowingStrategy<?, ?> windowingStrategy; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; private final SerializedPipelineOptions serializedOptions; @@ -64,6 +67,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> public FlinkStatefulDoFnFunction( DoFn<KV<K, V>, OutputT> dofn, + String stepName, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions, @@ -71,6 +75,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> TupleTag<OutputT> mainOutputTag) { this.dofn = dofn; + this.stepName = stepName; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); @@ -129,6 +134,11 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> new FlinkAggregatorFactory(runtimeContext), windowingStrategy); + if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + .getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + doFnRunner.startBundle(); doFnRunner.processElement(currentValue); http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8a09286..d3d9078 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -47,6 +47,8 @@ import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; @@ -139,7 +141,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected transient FlinkStateInternals<?> stateInternals; - private Coder<WindowedValue<InputT>> inputCoder; + private final String stepName; + + private final Coder<WindowedValue<InputT>> inputCoder; private final Coder<?> keyCoder; @@ -155,6 +159,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> public DoFnOperator( DoFn<InputT, FnOutputT> doFn, + String stepName, Coder<WindowedValue<InputT>> inputCoder, TupleTag<FnOutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, @@ -165,6 +170,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> PipelineOptions options, Coder<?> keyCoder) { this.doFn = doFn; + this.stepName = stepName; this.inputCoder = inputCoder; this.mainOutputTag = mainOutputTag; this.additionalOutputTags = additionalOutputTags; @@ -321,6 +327,11 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> stateCleaner); } + if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + .getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + pushbackDoFnRunner = SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); } http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 40f70e4..fb6762d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -59,6 +59,7 @@ public class SplittableDoFnOperator< public SplittableDoFnOperator( DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn, + String stepName, Coder< WindowedValue< KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, @@ -72,6 +73,7 @@ public class SplittableDoFnOperator< Coder<?> keyCoder) { super( doFn, + stepName, inputCoder, mainOutputTag, additionalOutputTags, http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 9b2136c..9718734 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -52,6 +52,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> public WindowDoFnOperator( SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn, + String stepName, Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder, TupleTag<KV<K, OutputT>> mainOutputTag, List<TupleTag<?>> additionalOutputTags, @@ -63,6 +64,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> Coder<K> keyCoder) { super( null, + stepName, inputCoder, mainOutputTag, additionalOutputTags, http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 06187f6..9bc2c3d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -94,6 +95,13 @@ public class PipelineOptionsTest { } @Test + public void testEnableMetrics() { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setEnableMetrics(false); + assertFalse(options.getEnableMetrics()); + } + + @Test public void testCaching() { PipelineOptions deserializedOptions = serializedOptions.getPipelineOptions().as(PipelineOptions.class); @@ -113,6 +121,7 @@ public class PipelineOptionsTest { public void parDoBaseClassPipelineOptionsNullTest() { DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( new TestDoFn(), + "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), new TupleTag<String>("main-output"), Collections.<TupleTag<?>>emptyList(), @@ -133,6 +142,7 @@ public class PipelineOptionsTest { DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( new TestDoFn(), + "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), new TupleTag<String>("main-output"), Collections.<TupleTag<?>>emptyList(), http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 4c826d1..4e18ac2 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -112,6 +112,7 @@ public class DoFnOperatorTest { DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( new IdentityDoFn<String>(), + "stepName", windowedValueCoder, outputTag, Collections.<TupleTag<?>>emptyList(), @@ -154,6 +155,7 @@ public class DoFnOperatorTest { DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), + "stepName", windowedValueCoder, mainOutput, ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2), @@ -212,6 +214,7 @@ public class DoFnOperatorTest { DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>( fn, + "stepName", windowedValueCoder, outputTag, Collections.<TupleTag<?>>emptyList(), @@ -325,6 +328,7 @@ public class DoFnOperatorTest { KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator = new DoFnOperator<>( fn, + "stepName", windowedValueCoder, outputTag, Collections.<TupleTag<?>>emptyList(), @@ -420,6 +424,7 @@ public class DoFnOperatorTest { DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>( new IdentityDoFn<String>(), + "stepName", windowedValueCoder, outputTag, Collections.<TupleTag<?>>emptyList(),
