Address comments of Flink Side-Input PR
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ae4b6a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ae4b6a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ae4b6a3 Branch: refs/heads/gearpump-runner Commit: 6ae4b6a3df5cf3b834505fcb3f21df0e90473a0f Parents: 8007bdf Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Thu Aug 25 11:00:39 2016 +0200 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:11 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SideInputHandler.java | 6 +- .../apache/beam/runners/flink/FlinkRunner.java | 86 ++++++++++++++++++-- .../wrappers/streaming/DoFnOperator.java | 13 ++- .../wrappers/streaming/WindowDoFnOperator.java | 2 - 4 files changed, 89 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index a97d3f3..851ed37 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -60,7 +60,11 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { /** The list of side inputs that we're handling. */ protected final Collection<PCollectionView<?>> sideInputs; - /** State internals that are scoped not to the key of a value but instead to one key group. */ + /** + * State internals that are scoped not to the key of a value but are global. The state can still + * be keep locally but if side inputs are broadcast to all parallel operators then all will + * have the same view of the state. + */ private final StateInternals<Void> stateInternals; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 8b1f42e..d3c65c0 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -25,8 +25,13 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -35,6 +40,7 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -47,6 +53,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; + import org.apache.flink.api.common.JobExecutionResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,6 +116,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { private FlinkRunner(FlinkPipelineOptions options) { this.options = options; + this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder(); if (options.isStreaming()) { @@ -124,6 +133,8 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { @Override public FlinkRunnerResult run(Pipeline pipeline) { + logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); + LOG.info("Executing pipeline using FlinkRunner."); FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options); @@ -176,6 +187,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { PTransform<InputT, OutputT> customTransform = InstanceBuilder.ofType(customTransformClass) + .withArg(FlinkRunner.class, this) .withArg(transformClass, transform) .build(); @@ -223,6 +235,59 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { return files; } + /** A set of {@link View}s with non-deterministic key coders. */ + Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders; + + /** + * Records that the {@link PTransform} requires a deterministic key coder. + */ + private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) { + ptransformViewsWithNonDeterministicKeyCoders.add(ptransform); + } + + /** Outputs a warning about PCollection views without deterministic key coders. */ + private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) { + // We need to wait till this point to determine the names of the transforms since only + // at this time do we know the hierarchy of the transforms otherwise we could + // have just recorded the full names during apply time. + if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { + final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); + pipeline.traverseTopologically(new Pipeline.PipelineVisitor() { + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + } + }); + + LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} " + + "because the key coder is not deterministic. Falling back to singleton implementation " + + "which may cause memory and/or performance problems. Future major versions of " + + "the Flink runner will require deterministic key coders.", + ptransformViewNamesWithNonDeterministicKeyCoders); + } + } + + + ///////////////////////////////////////////////////////////////////////////// + /** * Specialized implementation for * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} @@ -231,8 +296,11 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { private static class StreamingViewAsMap<K, V> extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + private final FlinkRunner runner; + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsMap(View.AsMap<K, V> transform) { + public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) { + this.runner = runner; } @Override @@ -248,7 +316,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { try { inputCoder.getKeyCoder().verifyDeterministic(); } catch (Coder.NonDeterministicException e) { -// runner.recordViewUsesNonDeterministicKeyCoder(this); + runner.recordViewUsesNonDeterministicKeyCoder(this); } return input @@ -270,11 +338,14 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { private static class StreamingViewAsMultimap<K, V> extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + private final FlinkRunner runner; + /** * Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) { + public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) { + this.runner = runner; } @Override @@ -290,7 +361,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { try { inputCoder.getKeyCoder().verifyDeterministic(); } catch (Coder.NonDeterministicException e) { -// runner.recordViewUsesNonDeterministicKeyCoder(this); + runner.recordViewUsesNonDeterministicKeyCoder(this); } return input @@ -315,7 +386,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { * Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsList(View.AsList<T> transform) {} + public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {} @Override public PCollectionView<List<T>> apply(PCollection<T> input) { @@ -346,7 +417,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { * Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsIterable(View.AsIterable<T> transform) { } + public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { } @Override public PCollectionView<Iterable<T>> apply(PCollection<T> input) { @@ -386,7 +457,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { * Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsSingleton(View.AsSingleton<T> transform) { + public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) { this.transform = transform; } @@ -443,6 +514,7 @@ public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> { */ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() public StreamingCombineGloballyAsSingletonView( + FlinkRunner runner, Combine.GloballyAsSingletonView<InputT, OutputT> transform) { this.transform = transform; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 000d69f..2c7ebc6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -75,11 +75,12 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; /** - * Flink operator for executing {@link DoFn DoFns}. + * Flink operator for executing {@link OldDoFn DoFns}. * - * @param <InputT> - * @param <FnOutputT> - * @param <OutputT> + * @param <InputT> the input type of the {@link OldDoFn} + * @param <FnOutputT> the output type of the {@link OldDoFn} + * @param <OutputT> the output type of the operator, this can be different from the fn output type when we have + * side outputs */ public class DoFnOperator<InputT, FnOutputT, OutputT> extends AbstractStreamOperator<OutputT> @@ -95,8 +96,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected final Collection<PCollectionView<?>> sideInputs; protected final Map<Integer, PCollectionView<?>> sideInputTagMapping; - protected final boolean hasSideInputs; - protected final WindowingStrategy<?, ?> windowingStrategy; protected final OutputManagerFactory<OutputT> outputManagerFactory; @@ -136,8 +135,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> this.windowingStrategy = windowingStrategy; this.outputManagerFactory = outputManagerFactory; - this.hasSideInputs = !sideInputs.isEmpty(); - this.pushedBackWatermarkDescriptor = new ReducingStateDescriptor<>( "pushed-back-elements-watermark-hold", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ae4b6a3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index c6dde51..01cfa5b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -201,8 +201,6 @@ public class WindowDoFnOperator<K, InputT, OutputT> if (timer != null && timer.f1.getTimestamp().getMillis() < actualInputWatermark) { fire = true; - System.out.println("FIRING: " + timer); - watermarkTimersQueue.remove(); watermarkTimers.remove(timer);