Repository: beam Updated Branches: refs/heads/master 3a8b0b68c -> 724eda37e
[BEAM-1612] Support real Bundle in Flink runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ceec7ce5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ceec7ce5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ceec7ce5 Branch: refs/heads/master Commit: ceec7ce5ba287ab40ee1f7c87129b72d4db1c1c7 Parents: 3a8b0b6 Author: JingsongLi <[email protected]> Authored: Thu Jun 15 17:48:59 2017 +0800 Committer: Pei He <[email protected]> Committed: Wed Aug 16 11:44:49 2017 +0800 ---------------------------------------------------------------------- .../runners/flink/FlinkPipelineOptions.java | 11 + .../FlinkStreamingTransformTranslators.java | 77 ++-- .../wrappers/streaming/DoFnOperator.java | 412 ++++++++++++++----- .../streaming/SplittableDoFnOperator.java | 4 +- .../wrappers/streaming/WindowDoFnOperator.java | 4 +- .../state/FlinkSplitStateInternals.java | 8 +- .../beam/runners/flink/PipelineOptionsTest.java | 21 +- .../flink/streaming/DoFnOperatorTest.java | 161 ++++++-- 8 files changed, 535 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index c255672..2432394 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -127,4 +127,15 @@ public interface FlinkPipelineOptions @Default.Boolean(false) Boolean getRetainExternalizedCheckpointsOnCancellation(); void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation); + + @Description("The maximum number of elements in a bundle.") + @Default.Long(1000) + Long getMaxBundleSize(); + void setMaxBundleSize(Long size); + + @Description("The maximum time to wait before finalising a bundle (in milliseconds).") + @Default.Long(1000) + Long getMaxBundleTimeMills(); + void setMaxBundleTimeMills(Long time); + } http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 3d7e81f..058e195 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -339,7 +339,9 @@ class FlinkStreamingTransformTranslators { List<TupleTag<?>> additionalOutputTags, FlinkStreamingTranslationContext context, WindowingStrategy<?, ?> windowingStrategy, - Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToLabels, + Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, + Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders, + Map<TupleTag<?>, Integer> tagsToIds, Coder<WindowedValue<InputT>> inputCoder, Coder keyCoder, Map<Integer, PCollectionView<?>> transformedSideInputs); @@ -360,15 +362,27 @@ class FlinkStreamingTransformTranslators { WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap(); + Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders = Maps.newHashMap(); + + // We associate output tags with ids, the Integer is easier to serialize than TupleTag. + // The return map of AppliedPTransform.getOutputs() is an ImmutableMap, its implementation is + // RegularImmutableMap, its entrySet order is the same with the order of insertion. + // So we can use the original AppliedPTransform.getOutputs() to produce deterministic ids. + Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap(); + int idCount = 0; + tagsToIds.put(mainOutputTag, idCount++); for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) { if (!tagsToOutputTags.containsKey(entry.getKey())) { tagsToOutputTags.put( entry.getKey(), - new OutputTag<WindowedValue<?>>( + new OutputTag<>( entry.getKey().getId(), (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue()) ) ); + tagsToCoders.put(entry.getKey(), + (Coder) context.getCoder((PCollection<OutputT>) entry.getValue())); + tagsToIds.put(entry.getKey(), idCount++); } } @@ -409,6 +423,8 @@ class FlinkStreamingTransformTranslators { context, windowingStrategy, tagsToOutputTags, + tagsToCoders, + tagsToIds, inputCoder, keyCoder, new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */); @@ -430,6 +446,8 @@ class FlinkStreamingTransformTranslators { context, windowingStrategy, tagsToOutputTags, + tagsToCoders, + tagsToIds, inputCoder, keyCoder, transformedSideInputs.f0); @@ -506,6 +524,8 @@ class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context, WindowingStrategy<?, ?> windowingStrategy, Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, + Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders, + Map<TupleTag<?>, Integer> tagsToIds, Coder<WindowedValue<InputT>> inputCoder, Coder keyCoder, Map<Integer, PCollectionView<?>> transformedSideInputs) { @@ -515,7 +535,8 @@ class FlinkStreamingTransformTranslators { inputCoder, mainOutputTag, additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags), + new DoFnOperator.MultiOutputOutputManagerFactory<>( + mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds), windowingStrategy, transformedSideInputs, sideInputs, @@ -551,25 +572,28 @@ class FlinkStreamingTransformTranslators { @Override public DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> createDoFnOperator( - DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn, - String stepName, - List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - FlinkStreamingTranslationContext context, - WindowingStrategy<?, ?> windowingStrategy, - Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, - Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> - inputCoder, - Coder keyCoder, - Map<Integer, PCollectionView<?>> transformedSideInputs) { + DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn, + String stepName, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + FlinkStreamingTranslationContext context, + WindowingStrategy<?, ?> windowingStrategy, + Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, + Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders, + Map<TupleTag<?>, Integer> tagsToIds, + Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> + inputCoder, + Coder keyCoder, + Map<Integer, PCollectionView<?>> transformedSideInputs) { return new SplittableDoFnOperator<>( doFn, stepName, inputCoder, mainOutputTag, additionalOutputTags, - new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags), + new DoFnOperator.MultiOutputOutputManagerFactory<>( + mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds), windowingStrategy, transformedSideInputs, sideInputs, @@ -693,20 +717,21 @@ class FlinkStreamingTransformTranslators { SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder()); + Coder<WindowedValue<KV<K, Iterable<InputT>>>> outputCoder = + context.getCoder(context.getOutput(transform)); TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform)); - DoFnOperator.DefaultOutputManagerFactory<KV<K, Iterable<InputT>>> outputManagerFactory = - new DoFnOperator.DefaultOutputManagerFactory<>(); + TupleTag<KV<K, Iterable<InputT>>> mainTag = new TupleTag<>("main output"); WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator = new WindowDoFnOperator<>( reduceFn, context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, Iterable<InputT>>>("main output"), + mainTag, Collections.<TupleTag<?>>emptyList(), - outputManagerFactory, + new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder), windowingStrategy, new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ @@ -793,6 +818,8 @@ class FlinkStreamingTransformTranslators { AppliedCombineFn.withInputCoder( transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder)); + Coder<WindowedValue<KV<K, OutputT>>> outputCoder = + context.getCoder(context.getOutput(transform)); TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform)); @@ -800,14 +827,15 @@ class FlinkStreamingTransformTranslators { if (sideInputs.isEmpty()) { + TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output"); WindowDoFnOperator<K, InputT, OutputT> doFnOperator = new WindowDoFnOperator<>( reduceFn, context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, OutputT>>("main output"), + mainTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder), windowingStrategy, new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ @@ -826,14 +854,15 @@ class FlinkStreamingTransformTranslators { Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs = transformSideInputs(sideInputs, context); + TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output"); WindowDoFnOperator<K, InputT, OutputT> doFnOperator = new WindowDoFnOperator<>( reduceFn, context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, - new TupleTag<KV<K, OutputT>>("main output"), + mainTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder), windowingStrategy, transformSideInputs.f0, sideInputs, http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 62de423..0bf860a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -21,14 +21,20 @@ import static org.apache.flink.util.Preconditions.checkArgument; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -45,6 +51,7 @@ import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.StatefulDoFnRunner; +import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; @@ -57,6 +64,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkS import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.TimeDomain; @@ -66,6 +75,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -87,6 +97,7 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.util.OutputTag; import org.joda.time.Instant; @@ -95,8 +106,6 @@ import org.joda.time.Instant; * * @param <InputT> the input type of the {@link DoFn} * @param <OutputT> the output type of the {@link DoFn} - * @param <OutputT> the output type of the operator, this can be different from the fn output - * type when we have additional tagged outputs */ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<WindowedValue<OutputT>> @@ -125,7 +134,7 @@ public class DoFnOperator<InputT, OutputT> protected transient SideInputReader sideInputReader; - protected transient DoFnRunners.OutputManager outputManager; + protected transient BufferedOutputManager<OutputT> outputManager; private transient DoFnInvoker<InputT, OutputT> doFnInvoker; @@ -137,7 +146,7 @@ public class DoFnOperator<InputT, OutputT> private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag; - protected transient FlinkStateInternals<?> stateInternals; + protected transient FlinkStateInternals<?> keyedStateInternals; private final String stepName; @@ -147,14 +156,24 @@ public class DoFnOperator<InputT, OutputT> private final TimerInternals.TimerDataCoder timerCoder; + private final long maxBundleSize; + + private final long maxBundleTimeMills; + protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService; protected transient FlinkTimerInternals timerInternals; - private transient StateInternals pushbackStateInternals; + private transient StateInternals nonKeyedStateInternals; private transient Optional<Long> pushedBackWatermark; + // bundle control + private transient boolean bundleStarted = false; + private transient long elementCount; + private transient long lastFinishBundleTime; + private transient ScheduledFuture<?> checkFinishBundleTimer; + public DoFnOperator( DoFn<InputT, OutputT> doFn, String stepName, @@ -184,10 +203,11 @@ public class DoFnOperator<InputT, OutputT> this.timerCoder = TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); - } - private org.apache.beam.runners.core.StepContext createStepContext() { - return new StepContext(); + FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); + + this.maxBundleSize = flinkOptions.getMaxBundleSize(); + this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills(); } // allow overriding this in WindowDoFnOperator because this one dynamically creates @@ -204,8 +224,21 @@ public class DoFnOperator<InputT, OutputT> setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + FlinkPipelineOptions options = + serializedOptions.get().as(FlinkPipelineOptions.class); sideInputReader = NullSideInputReader.of(sideInputs); + // maybe init by initializeState + if (nonKeyedStateInternals == null) { + if (keyCoder != null) { + nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, + getKeyedStateBackend()); + } else { + nonKeyedStateInternals = + new FlinkSplitStateInternals<>(getOperatorStateBackend()); + } + } + if (!sideInputs.isEmpty()) { pushedBackTag = StateTags.bag("pushed-back-values", inputCoder); @@ -217,26 +250,14 @@ public class DoFnOperator<InputT, OutputT> sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); sideInputReader = sideInputHandler; - // maybe init by initializeState - if (pushbackStateInternals == null) { - if (keyCoder != null) { - pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, - getKeyedStateBackend()); - } else { - pushbackStateInternals = - new FlinkSplitStateInternals<Object>(getOperatorStateBackend()); - } - } - pushedBackWatermark = Optional.absent(); - } - outputManager = outputManagerFactory.create(output); + outputManager = outputManagerFactory.create(output, nonKeyedStateInternals); // StatefulPardo or WindowDoFn if (keyCoder != null) { - stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(), + keyedStateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(), keyCoder); timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>) @@ -253,10 +274,10 @@ public class DoFnOperator<InputT, OutputT> doFnInvoker.invokeSetup(); - org.apache.beam.runners.core.StepContext stepContext = createStepContext(); + StepContext stepContext = new FlinkStepContext(); doFnRunner = DoFnRunners.simpleRunner( - serializedOptions.get(), + options, doFn, sideInputReader, outputManager, @@ -301,11 +322,24 @@ public class DoFnOperator<InputT, OutputT> stateCleaner); } - if ((serializedOptions.get().as(FlinkPipelineOptions.class)) - .getEnableMetrics()) { + if (options.getEnableMetrics()) { doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); } + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + + // Schedule timer to check timeout of finish bundle. + long bundleCheckPeriod = (maxBundleTimeMills + 1) / 2; + checkFinishBundleTimer = getProcessingTimeService().scheduleAtFixedRate( + new ProcessingTimeCallback() { + @Override + public void onProcessingTime(long timestamp) throws Exception { + checkInvokeFinishBundleByTime(); + } + }, + bundleCheckPeriod, bundleCheckPeriod); + pushbackDoFnRunner = SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); } @@ -315,9 +349,9 @@ public class DoFnOperator<InputT, OutputT> super.close(); // sanity check: these should have been flushed out by +Inf watermarks - if (pushbackStateInternals != null) { + if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) { BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag); Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); if (pushedBackContents != null) { @@ -328,10 +362,11 @@ public class DoFnOperator<InputT, OutputT> } } } + checkFinishBundleTimer.cancel(true); doFnInvoker.invokeTeardown(); } - protected final long getPushbackWatermarkHold() { + private long getPushbackWatermarkHold() { // if we don't have side inputs we never hold the watermark if (sideInputs.isEmpty()) { return Long.MAX_VALUE; @@ -351,7 +386,7 @@ public class DoFnOperator<InputT, OutputT> if (!pushedBackWatermark.isPresent()) { BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag); long min = Long.MAX_VALUE; for (WindowedValue<InputT> value : pushedBack.read()) { @@ -364,9 +399,9 @@ public class DoFnOperator<InputT, OutputT> @Override public final void processElement( StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception { - doFnRunner.startBundle(); + checkInvokeStartBundle(); doFnRunner.processElement(streamRecord.getValue()); - doFnRunner.finishBundle(); + checkInvokeFinishBundleByCount(); } private void setPushedBackWatermark(long watermark) { @@ -376,12 +411,12 @@ public class DoFnOperator<InputT, OutputT> @Override public final void processElement1( StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception { - pushbackDoFnRunner.startBundle(); + checkInvokeStartBundle(); Iterable<WindowedValue<InputT>> justPushedBack = pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue()); BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag); checkInitPushedBackWatermark(); @@ -391,13 +426,13 @@ public class DoFnOperator<InputT, OutputT> pushedBack.add(pushedBackValue); } setPushedBackWatermark(min); - pushbackDoFnRunner.finishBundle(); + checkInvokeFinishBundleByCount(); } @Override public final void processElement2( StreamRecord<RawUnionValue> streamRecord) throws Exception { - pushbackDoFnRunner.startBundle(); + checkInvokeStartBundle(); @SuppressWarnings("unchecked") WindowedValue<Iterable<?>> value = @@ -407,7 +442,7 @@ public class DoFnOperator<InputT, OutputT> sideInputHandler.addSideInputValue(sideInput, value); BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag); List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); @@ -433,7 +468,7 @@ public class DoFnOperator<InputT, OutputT> } setPushedBackWatermark(min); - pushbackDoFnRunner.finishBundle(); + checkInvokeFinishBundleByCount(); // maybe output a new watermark processWatermark1(new Watermark(currentInputWatermark)); @@ -446,6 +481,9 @@ public class DoFnOperator<InputT, OutputT> @Override public void processWatermark1(Watermark mark) throws Exception { + + checkInvokeStartBundle(); + // We do the check here because we are guaranteed to at least get the +Inf watermark on the // main input when the job finishes. if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { @@ -461,12 +499,9 @@ public class DoFnOperator<InputT, OutputT> Math.min(getPushbackWatermarkHold(), currentInputWatermark); if (potentialOutputWatermark > currentOutputWatermark) { setCurrentOutputWatermark(potentialOutputWatermark); - output.emitWatermark(new Watermark(currentOutputWatermark)); + emitWatermark(currentOutputWatermark); } } else { - // fireTimers, so we need startBundle. - pushbackDoFnRunner.startBundle(); - setCurrentInputWatermark(mark.getTimestamp()); // hold back by the pushed back values waiting for side inputs @@ -474,7 +509,7 @@ public class DoFnOperator<InputT, OutputT> timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark)); - Instant watermarkHold = stateInternals.watermarkHold(); + Instant watermarkHold = keyedStateInternals.watermarkHold(); long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); @@ -482,14 +517,23 @@ public class DoFnOperator<InputT, OutputT> if (potentialOutputWatermark > currentOutputWatermark) { setCurrentOutputWatermark(potentialOutputWatermark); - output.emitWatermark(new Watermark(currentOutputWatermark)); + emitWatermark(currentOutputWatermark); } - pushbackDoFnRunner.finishBundle(); } } + private void emitWatermark(long watermark) { + // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late events. + if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + invokeFinishBundle(); + } + output.emitWatermark(new Watermark(watermark)); + } + @Override public void processWatermark2(Watermark mark) throws Exception { + checkInvokeStartBundle(); + setCurrentSideInputWatermark(mark.getTimestamp()); if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { // this means we will never see any more side input @@ -498,6 +542,7 @@ public class DoFnOperator<InputT, OutputT> // maybe output a new watermark processWatermark1(new Watermark(currentInputWatermark)); } + } /** @@ -516,10 +561,9 @@ public class DoFnOperator<InputT, OutputT> * any future side input, i.e. that there is no point in waiting. */ private void emitAllPushedBackData() throws Exception { - pushbackDoFnRunner.startBundle(); BagState<WindowedValue<InputT>> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag); Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); if (pushedBackContents != null) { @@ -537,11 +581,65 @@ public class DoFnOperator<InputT, OutputT> setPushedBackWatermark(Long.MAX_VALUE); - pushbackDoFnRunner.finishBundle(); + } + + /** + * Check whether invoke startBundle, if it is, need to output elements that were + * buffered as part of finishing a bundle in snapshot() first. + * + * <p>In order to avoid having {@link DoFnRunner#processElement(WindowedValue)} or + * {@link DoFnRunner#onTimer(String, BoundedWindow, Instant, TimeDomain)} not between + * StartBundle and FinishBundle, this method needs to be called in each processElement + * and each processWatermark and onProcessingTime. Do not need to call in onEventTime, + * because it has been guaranteed in the processWatermark. + */ + private void checkInvokeStartBundle() { + if (!bundleStarted) { + outputManager.flushBuffer(); + pushbackDoFnRunner.startBundle(); + bundleStarted = true; + } + } + + /** + * Check whether invoke finishBundle by elements count. Called in processElement. + */ + private void checkInvokeFinishBundleByCount() { + elementCount++; + if (elementCount >= maxBundleSize) { + invokeFinishBundle(); + } + } + + /** + * Check whether invoke finishBundle by timeout. + */ + private void checkInvokeFinishBundleByTime() { + long now = getProcessingTimeService().getCurrentProcessingTime(); + if (now - lastFinishBundleTime >= maxBundleTimeMills) { + invokeFinishBundle(); + } + } + + private void invokeFinishBundle() { + if (bundleStarted) { + pushbackDoFnRunner.finishBundle(); + bundleStarted = false; + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { + + // Forced finish a bundle in checkpoint barrier otherwise may lose data. + // Careful, it use OperatorState or KeyGroupState to store outputs, So it + // must be called before their snapshot. + outputManager.openBuffer(); + invokeFinishBundle(); + outputManager.closeBuffer(); + // copy from AbstractStreamOperator if (getKeyedStateBackend() != null) { KeyedStateCheckpointOutputStream out; @@ -587,8 +685,8 @@ public class DoFnOperator<InputT, OutputT> @Override public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception { - if (!sideInputs.isEmpty() && keyCoder != null) { - ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState( + if (keyCoder != null) { + ((FlinkKeyGroupStateInternals) nonKeyedStateInternals).snapshotKeyGroupState( keyGroupIndex, out); } } @@ -626,23 +724,26 @@ public class DoFnOperator<InputT, OutputT> @Override public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception { - if (!sideInputs.isEmpty() && keyCoder != null) { - if (pushbackStateInternals == null) { - pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, + if (keyCoder != null) { + if (nonKeyedStateInternals == null) { + nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, getKeyedStateBackend()); } - ((FlinkKeyGroupStateInternals) pushbackStateInternals) + ((FlinkKeyGroupStateInternals) nonKeyedStateInternals) .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader()); } } @Override public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception { + // We don't have to cal checkInvokeStartBundle() because it's already called in + // processWatermark*(). fireTimer(timer); } @Override public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception { + checkInvokeStartBundle(); fireTimer(timer); } @@ -670,71 +771,186 @@ public class DoFnOperator<InputT, OutputT> } /** - * Factory for creating an {@link DoFnRunners.OutputManager} from + * Factory for creating an {@link BufferedOutputManager} from * a Flink {@link Output}. */ interface OutputManagerFactory<OutputT> extends Serializable { - DoFnRunners.OutputManager create(Output<StreamRecord<WindowedValue<OutputT>>> output); + BufferedOutputManager<OutputT> create( + Output<StreamRecord<WindowedValue<OutputT>>> output, + StateInternals stateInternals); } /** - * Default implementation of {@link OutputManagerFactory} that creates an - * {@link DoFnRunners.OutputManager} that only writes to - * a single logical output. + * A {@link DoFnRunners.OutputManager} that can buffer its outputs. + * Use {@link FlinkSplitStateInternals} or {@link FlinkKeyGroupStateInternals} + * to keep buffer data. */ - public static class DefaultOutputManagerFactory<OutputT> - implements OutputManagerFactory<OutputT> { + public static class BufferedOutputManager<OutputT> implements + DoFnRunners.OutputManager { + + private TupleTag<OutputT> mainTag; + private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags; + private Map<TupleTag<?>, Integer> tagsToIds; + private Map<Integer, TupleTag<?>> idsToTags; + protected Output<StreamRecord<WindowedValue<OutputT>>> output; + + private boolean openBuffer = false; + private BagState<KV<Integer, WindowedValue<?>>> bufferState; + + BufferedOutputManager( + Output<StreamRecord<WindowedValue<OutputT>>> output, + TupleTag<OutputT> mainTag, + Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, + final Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders, + Map<TupleTag<?>, Integer> tagsToIds, + StateInternals stateInternals) { + this.output = output; + this.mainTag = mainTag; + this.tagsToOutputTags = tagsToOutputTags; + this.tagsToIds = tagsToIds; + this.idsToTags = new HashMap<>(); + for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) { + idsToTags.put(entry.getValue(), entry.getKey()); + } + + ImmutableMap.Builder<Integer, Coder<WindowedValue<?>>> idsToCodersBuilder = + ImmutableMap.builder(); + for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) { + idsToCodersBuilder.put(entry.getValue(), tagsToCoders.get(entry.getKey())); + } + + StateTag<BagState<KV<Integer, WindowedValue<?>>>> bufferTag = + StateTags.bag("bundle-buffer-tag", + new TaggedKvCoder(idsToCodersBuilder.build())); + bufferState = stateInternals.state(StateNamespaces.global(), bufferTag); + } + + void openBuffer() { + this.openBuffer = true; + } + + void closeBuffer() { + this.openBuffer = false; + } + @Override - public DoFnRunners.OutputManager create( - final Output<StreamRecord<WindowedValue<OutputT>>> output) { - return new DoFnRunners.OutputManager() { - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> value) { - // with tagged outputs we can't get around this because we don't - // know our own output type... - @SuppressWarnings("unchecked") - WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value; - output.collect(new StreamRecord<>(castValue)); - } - }; + public <T> void output(TupleTag<T> tag, WindowedValue<T> value) { + if (!openBuffer) { + emit(tag, value); + } else { + bufferState.add(KV.<Integer, WindowedValue<?>>of(tagsToIds.get(tag), value)); + } + } + + /** + * Flush elements of bufferState to Flink Output. This method can't be invoke in + * {@link #snapshotState(StateSnapshotContext)} + */ + void flushBuffer() { + for (KV<Integer, WindowedValue<?>> taggedElem : bufferState.read()) { + emit(idsToTags.get(taggedElem.getKey()), (WindowedValue) taggedElem.getValue()); + } + bufferState.clear(); + } + + private <T> void emit(TupleTag<T> tag, WindowedValue<T> value) { + if (tag.equals(mainTag)) { + // with tagged outputs we can't get around this because we don't + // know our own output type... + @SuppressWarnings("unchecked") + WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value; + output.collect(new StreamRecord<>(castValue)); + } else { + @SuppressWarnings("unchecked") + OutputTag<WindowedValue<T>> outputTag = (OutputTag) tagsToOutputTags.get(tag); + output.collect(outputTag, new StreamRecord<>(value)); + } + } + } + + /** + * Coder for KV of id and value. It will be serialized in Flink checkpoint. + */ + private static class TaggedKvCoder extends StructuredCoder<KV<Integer, WindowedValue<?>>> { + + private Map<Integer, Coder<WindowedValue<?>>> idsToCoders; + + TaggedKvCoder(Map<Integer, Coder<WindowedValue<?>>> idsToCoders) { + this.idsToCoders = idsToCoders; + } + + @Override + public void encode(KV<Integer, WindowedValue<?>> kv, OutputStream out) + throws IOException { + Coder<WindowedValue<?>> coder = idsToCoders.get(kv.getKey()); + VarIntCoder.of().encode(kv.getKey(), out); + coder.encode(kv.getValue(), out); + } + + @Override + public KV<Integer, WindowedValue<?>> decode(InputStream in) + throws IOException { + Integer id = VarIntCoder.of().decode(in); + Coder<WindowedValue<?>> coder = idsToCoders.get(id); + WindowedValue<?> value = coder.decode(in); + return KV.<Integer, WindowedValue<?>>of(id, value); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return new ArrayList<>(idsToCoders.values()); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + for (Coder<?> coder : idsToCoders.values()) { + verifyDeterministic(this, "Coder must be deterministic", coder); + } } } /** * Implementation of {@link OutputManagerFactory} that creates an - * {@link DoFnRunners.OutputManager} that can write to multiple logical - * outputs by unioning them in a {@link RawUnionValue}. + * {@link BufferedOutputManager} that can write to multiple logical + * outputs by Flink side output. */ public static class MultiOutputOutputManagerFactory<OutputT> implements OutputManagerFactory<OutputT> { - private TupleTag<?> mainTag; - Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping; + private TupleTag<OutputT> mainTag; + private Map<TupleTag<?>, Integer> tagsToIds; + private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags; + private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders; + + // There is no side output. + @SuppressWarnings("unchecked") + public MultiOutputOutputManagerFactory( + TupleTag<OutputT> mainTag, Coder<WindowedValue<OutputT>> mainCoder) { + this(mainTag, + new HashMap<TupleTag<?>, OutputTag<WindowedValue<?>>>(), + ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder() + .put(mainTag, (Coder) mainCoder).build(), + ImmutableMap.<TupleTag<?>, Integer>builder() + .put(mainTag, 0).build()); + } public MultiOutputOutputManagerFactory( - TupleTag<?> mainTag, - Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping) { + TupleTag<OutputT> mainTag, + Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags, + Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders, + Map<TupleTag<?>, Integer> tagsToIds) { this.mainTag = mainTag; - this.mapping = mapping; + this.tagsToOutputTags = tagsToOutputTags; + this.tagsToCoders = tagsToCoders; + this.tagsToIds = tagsToIds; } @Override - public DoFnRunners.OutputManager create( - final Output<StreamRecord<WindowedValue<OutputT>>> output) { - return new DoFnRunners.OutputManager() { - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> value) { - if (tag.equals(mainTag)) { - @SuppressWarnings("unchecked") - WindowedValue<OutputT> outputValue = (WindowedValue<OutputT>) value; - output.collect(new StreamRecord<>(outputValue)); - } else { - @SuppressWarnings("unchecked") - OutputTag<WindowedValue<T>> outputTag = (OutputTag) mapping.get(tag); - output.<WindowedValue<T>>collect(outputTag, new StreamRecord<>(value)); - } - } - }; + public BufferedOutputManager<OutputT> create( + Output<StreamRecord<WindowedValue<OutputT>>> output, + StateInternals stateInternals) { + return new BufferedOutputManager<>( + output, mainTag, tagsToOutputTags, tagsToCoders, tagsToIds, stateInternals); } } @@ -742,11 +958,11 @@ public class DoFnOperator<InputT, OutputT> * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow * accessing state or timer internals. */ - protected class StepContext implements org.apache.beam.runners.core.StepContext { + protected class FlinkStepContext implements StepContext { @Override public StateInternals stateInternals() { - return stateInternals; + return keyedStateInternals; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index be758a6..b255bb4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -97,7 +97,7 @@ public class SplittableDoFnOperator< public StateInternals stateInternalsForKey(String key) { //this will implicitly be keyed by the key of the incoming // element or by the key of a firing timer - return (StateInternals) stateInternals; + return (StateInternals) keyedStateInternals; } }; TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() { @@ -148,7 +148,7 @@ public class SplittableDoFnOperator< public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) { doFnRunner.processElement(WindowedValue.valueInGlobalWindow( KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem( - (String) stateInternals.getKey(), + (String) keyedStateInternals.getKey(), Collections.singletonList(timer.getNamespace())))); } http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 78d585e..b1fb398 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -86,7 +86,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> public StateInternals stateInternalsForKey(K key) { //this will implicitly be keyed by the key of the incoming // element or by the key of a firing timer - return (StateInternals) stateInternals; + return (StateInternals) keyedStateInternals; } }; TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() { @@ -112,7 +112,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> public void fireTimer(InternalTimer<?, TimerData> timer) { doFnRunner.processElement(WindowedValue.valueInGlobalWindow( KeyedWorkItems.<K, InputT>timersWorkItem( - (K) stateInternals.getKey(), + (K) keyedStateInternals.getKey(), Collections.singletonList(timer.getNamespace())))); } http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java index bb2a9ff..09e59fd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java @@ -167,7 +167,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals { @Override public void add(T input) { try { - flinkStateBackend.getOperatorState(descriptor).add(input); + flinkStateBackend.getListState(descriptor).add(input); } catch (Exception e) { throw new RuntimeException("Error updating state.", e); } @@ -181,7 +181,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals { @Override public Iterable<T> read() { try { - Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get(); + Iterable<T> result = flinkStateBackend.getListState(descriptor).get(); return result != null ? result : Collections.<T>emptyList(); } catch (Exception e) { throw new RuntimeException("Error updating state.", e); @@ -194,7 +194,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals { @Override public Boolean read() { try { - Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get(); + Iterable<T> result = flinkStateBackend.getListState(descriptor).get(); // PartitionableListState.get() return empty collection When there is no element, // KeyedListState different. (return null) return result == null || Iterators.size(result.iterator()) == 0; @@ -214,7 +214,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals { @Override public void clear() { try { - flinkStateBackend.getOperatorState(descriptor).clear(); + flinkStateBackend.getListState(descriptor).clear(); } catch (Exception e) { throw new RuntimeException("Error reading state.", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index eb06026..57086df 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.flink; import java.util.Collections; import java.util.HashMap; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -60,13 +61,15 @@ public class PipelineOptionsTest { @Test(expected = Exception.class) public void parDoBaseClassPipelineOptionsNullTest() { - new DoFnOperator<>( + TupleTag<String> mainTag = new TupleTag<>("main-output"); + Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); + DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", - WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), - new TupleTag<String>("main-output"), + coder, + mainTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<String>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder), WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), Collections.<PCollectionView<?>>emptyList(), @@ -81,13 +84,16 @@ public class PipelineOptionsTest { @Test public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { + TupleTag<String> mainTag = new TupleTag<>("main-output"); + + Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new TestDoFn(), "stepName", - WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), - new TupleTag<String>("main-output"), + coder, + mainTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<String>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder), WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), Collections.<PCollectionView<?>>emptyList(), @@ -105,7 +111,6 @@ public class PipelineOptionsTest { OneInputStreamOperatorTestHarness<WindowedValue<Object>, WindowedValue<Object>> testHarness = new OneInputStreamOperatorTestHarness<>(deserialized, typeInformation.createSerializer(new ExecutionConfig())); - testHarness.open(); // execute once to access options http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 4d2a912..ad17de8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -62,6 +63,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -107,18 +109,17 @@ public class DoFnOperatorTest { @SuppressWarnings("unchecked") public void testSingleOutput() throws Exception { - WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder = - WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); + Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); TupleTag<String> outputTag = new TupleTag<>("main-output"); DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new IdentityDoFn<String>(), "stepName", - windowedValueCoder, + coder, outputTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder), WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ @@ -143,26 +144,38 @@ public class DoFnOperatorTest { @SuppressWarnings("unchecked") public void testMultiOutputOutput() throws Exception { - WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder = + WindowedValue.ValueOnlyWindowedValueCoder<String> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); TupleTag<String> mainOutput = new TupleTag<>("main-output"); TupleTag<String> additionalOutput1 = new TupleTag<>("output-1"); TupleTag<String> additionalOutput2 = new TupleTag<>("output-2"); - ImmutableMap<TupleTag<?>, OutputTag<?>> outputMapping = + ImmutableMap<TupleTag<?>, OutputTag<?>> tagsToOutputTags = ImmutableMap.<TupleTag<?>, OutputTag<?>>builder() - .put(mainOutput, new OutputTag<String>(mainOutput.getId()){}) .put(additionalOutput1, new OutputTag<String>(additionalOutput1.getId()){}) .put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){}) .build(); + ImmutableMap<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders = + ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder() + .put(mainOutput, (Coder) coder) + .put(additionalOutput1, coder) + .put(additionalOutput2, coder) + .build(); + ImmutableMap<TupleTag<?>, Integer> tagsToIds = + ImmutableMap.<TupleTag<?>, Integer>builder() + .put(mainOutput, 0) + .put(additionalOutput1, 1) + .put(additionalOutput2, 2) + .build(); DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), "stepName", - windowedValueCoder, + coder, mainOutput, ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2), - new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, outputMapping), + new DoFnOperator.MultiOutputOutputManagerFactory( + mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds), WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ @@ -184,13 +197,13 @@ public class DoFnOperatorTest { WindowedValue.valueInGlobalWindow("got: hello"))); assertThat( - this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput1))), + this.stripStreamRecord(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1))), contains( WindowedValue.valueInGlobalWindow("extra: one"), WindowedValue.valueInGlobalWindow("got: hello"))); assertThat( - this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput2))), + this.stripStreamRecord(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput2))), contains( WindowedValue.valueInGlobalWindow("extra: two"), WindowedValue.valueInGlobalWindow("got: hello"))); @@ -255,7 +268,7 @@ public class DoFnOperatorTest { inputCoder, outputTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<String>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder), windowingStrategy, new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ @@ -329,20 +342,20 @@ public class DoFnOperatorTest { } }; - WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder = - WindowedValue.getFullCoder( - VarIntCoder.of(), - windowingStrategy.getWindowFn().windowCoder()); + Coder<WindowedValue<Integer>> inputCoder = WindowedValue.getFullCoder( + VarIntCoder.of(), windowingStrategy.getWindowFn().windowCoder()); + Coder<WindowedValue<String>> outputCoder = WindowedValue.getFullCoder( + StringUtf8Coder.of(), windowingStrategy.getWindowFn().windowCoder()); TupleTag<String> outputTag = new TupleTag<>("main-output"); DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>( fn, "stepName", - windowedValueCoder, + inputCoder, outputTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<String>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder), windowingStrategy, new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ @@ -441,7 +454,7 @@ public class DoFnOperatorTest { } }; - WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder = + WindowedValue.FullWindowedValueCoder<KV<String, Integer>> coder = WindowedValue.getFullCoder( KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), windowingStrategy.getWindowFn().windowCoder()); @@ -452,10 +465,10 @@ public class DoFnOperatorTest { new DoFnOperator<>( fn, "stepName", - windowedValueCoder, + coder, outputTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<KV<String, Integer>>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder), windowingStrategy, new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ @@ -531,8 +544,7 @@ public class DoFnOperatorTest { public void testSideInputs(boolean keyed) throws Exception { - WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder = - WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); + Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); TupleTag<String> outputTag = new TupleTag<>("main-output"); @@ -550,10 +562,10 @@ public class DoFnOperatorTest { DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( new IdentityDoFn<String>(), "stepName", - windowedValueCoder, + coder, outputTag, Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<String>(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder), WindowingStrategy.globalDefault(), sideInputMapping, /* side-input mapping */ ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */ @@ -631,6 +643,105 @@ public class DoFnOperatorTest { testSideInputs(true); } + @Test + @SuppressWarnings("unchecked") + public void testBundle() throws Exception { + + WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder = + WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); + + TupleTag<String> outputTag = new TupleTag<>("main-output"); + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setMaxBundleSize(2L); + options.setMaxBundleTimeMills(10L); + + IdentityDoFn<String> doFn = new IdentityDoFn<String>() { + @FinishBundle + public void finishBundle(FinishBundleContext context) { + context.output( + "finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); + } + }; + + DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)); + + DoFnOperator<String, String> doFnOperator = new DoFnOperator<>( + doFn, + "stepName", + windowedValueCoder, + outputTag, + Collections.<TupleTag<?>>emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + options, + null); + + OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("a"))); + testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("b"))); + testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("c"))); + + // draw a snapshot + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + // There is a finishBundle in snapshot() + // Elements will be buffered as part of finishing a bundle in snapshot() + assertThat( + this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"))); + + testHarness.close(); + + DoFnOperator<String, String> newDoFnOperator = new DoFnOperator<>( + doFn, + "stepName", + windowedValueCoder, + outputTag, + Collections.<TupleTag<?>>emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + options, + null); + + OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> newHarness = + new OneInputStreamOperatorTestHarness<>(newDoFnOperator); + + // restore snapshot + newHarness.initializeState(snapshot); + + newHarness.open(); + + // startBundle will output the buffered elements. + newHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("d"))); + + // check finishBundle by timeout + newHarness.setProcessingTime(10); + + assertThat( + this.<String>stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + + newHarness.close(); + } + private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue( Iterable<Object> input) {
