[BEAM-843] Use New DoFn Directly in Flink Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aaaf8fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aaaf8fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aaaf8fb Branch: refs/heads/master Commit: 4aaaf8fb94222e6e283fa79d9c7dc8d9a730d278 Parents: 27cf68e Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Wed Jan 18 11:34:06 2017 +0800 Committer: Davor Bonaci <da...@google.com> Committed: Mon Jan 30 12:38:38 2017 -0800 ---------------------------------------------------------------------- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++++++++++++++++++ .../wrappers/streaming/DoFnOperator.java | 69 ++++---- .../wrappers/streaming/WindowDoFnOperator.java | 143 +++++++++-------- 3 files changed, 264 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java new file mode 100644 index 0000000..cff6e00 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Collection; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the + * {@link ReduceFnRunner}. + */ +@SystemDoFnInternal +public class GroupAlsoByWindowViaWindowSetNewDoFn< + K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>> + extends DoFn<RinT, KV<K, OutputT>> { + + private static final long serialVersionUID = 1L; + + public static <K, InputT, OutputT, W extends BoundedWindow> + DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create( + WindowingStrategy<?, W> strategy, + StateInternalsFactory<K> stateInternalsFactory, + TimerInternalsFactory<K> timerInternalsFactory, + SideInputReader sideInputReader, + SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn, + DoFnRunners.OutputManager outputManager, + TupleTag<KV<K, OutputT>> mainTag) { + return new GroupAlsoByWindowViaWindowSetNewDoFn<>( + strategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, + reduceFn, outputManager, mainTag); + } + + protected final Aggregator<Long, Long> droppedDueToClosedWindow = + createAggregator( + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + protected final Aggregator<Long, Long> droppedDueToLateness = + createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + private final WindowingStrategy<Object, W> windowingStrategy; + private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; + private transient StateInternalsFactory<K> stateInternalsFactory; + private transient TimerInternalsFactory<K> timerInternalsFactory; + private transient SideInputReader sideInputReader; + private transient DoFnRunners.OutputManager outputManager; + private TupleTag<KV<K, OutputT>> mainTag; + + public GroupAlsoByWindowViaWindowSetNewDoFn( + WindowingStrategy<?, W> windowingStrategy, + StateInternalsFactory<K> stateInternalsFactory, + TimerInternalsFactory<K> timerInternalsFactory, + SideInputReader sideInputReader, + SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn, + DoFnRunners.OutputManager outputManager, + TupleTag<KV<K, OutputT>> mainTag) { + this.timerInternalsFactory = timerInternalsFactory; + this.sideInputReader = sideInputReader; + this.outputManager = outputManager; + this.mainTag = mainTag; + @SuppressWarnings("unchecked") + WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy; + this.windowingStrategy = noWildcard; + this.reduceFn = reduceFn; + this.stateInternalsFactory = stateInternalsFactory; + } + + private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() { + return new OutputWindowedValue<KV<K, OutputT>>() { + @Override + public void outputWindowedValue( + KV<K, OutputT> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output(mainTag, + WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output(tag, + WindowedValue.of(output, timestamp, windows, pane)); + } + }; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + KeyedWorkItem<K, InputT> keyedWorkItem = c.element(); + + K key = keyedWorkItem.key(); + StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); + TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key); + + ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = + new ReduceFnRunner<>( + key, + windowingStrategy, + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), + stateInternals, + timerInternals, + outputWindowedValue(), + sideInputReader, + droppedDueToClosedWindow, + reduceFn, + c.getPipelineOptions()); + + reduceFnRunner.processElements(keyedWorkItem.elementsIterable()); + reduceFnRunner.onTimers(keyedWorkItem.timersIterable()); + reduceFnRunner.persist(); + } + + public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() { + throw new RuntimeException("Not implement!"); + } + + public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() { + return droppedDueToLateness; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index ac85b3c..de0264a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -28,12 +28,11 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -45,6 +44,8 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.NullSideInputReader; @@ -78,10 +79,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; /** - * Flink operator for executing {@link OldDoFn DoFns}. + * Flink operator for executing {@link DoFn DoFns}. * - * @param <InputT> the input type of the {@link OldDoFn} - * @param <FnOutputT> the output type of the {@link OldDoFn} + * @param <InputT> the input type of the {@link DoFn} + * @param <FnOutputT> the output type of the {@link DoFn} * @param <OutputT> the output type of the operator, this can be different from the fn output * type when we have side outputs */ @@ -90,7 +91,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>, TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> { - protected OldDoFn<InputT, FnOutputT> oldDoFn; + protected DoFn<InputT, FnOutputT> doFn; protected final SerializedPipelineOptions serializedOptions; @@ -108,6 +109,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected transient SideInputHandler sideInputHandler; + protected transient SideInputReader sideInputReader; + + protected transient DoFnRunners.OutputManager outputManager; + + private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker; + protected transient long currentInputWatermark; protected transient long currentOutputWatermark; @@ -120,9 +127,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState; - @Deprecated public DoFnOperator( - OldDoFn<InputT, FnOutputT> oldDoFn, + DoFn<InputT, FnOutputT> doFn, TypeInformation<WindowedValue<InputT>> inputType, TupleTag<FnOutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, @@ -131,7 +137,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> Map<Integer, PCollectionView<?>> sideInputTagMapping, Collection<PCollectionView<?>> sideInputs, PipelineOptions options) { - this.oldDoFn = oldDoFn; + this.doFn = doFn; this.mainOutputTag = mainOutputTag; this.sideOutputTags = sideOutputTags; this.sideInputTagMapping = sideInputTagMapping; @@ -152,44 +158,20 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> setChainingStrategy(ChainingStrategy.ALWAYS); } - public DoFnOperator( - DoFn<InputT, FnOutputT> doFn, - TypeInformation<WindowedValue<InputT>> inputType, - TupleTag<FnOutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - OutputManagerFactory<OutputT> outputManagerFactory, - WindowingStrategy<?, ?> windowingStrategy, - Map<Integer, PCollectionView<?>> sideInputTagMapping, - Collection<PCollectionView<?>> sideInputs, - PipelineOptions options) { - this( - DoFnAdapters.toOldDoFn(doFn), - inputType, - mainOutputTag, - sideOutputTags, - outputManagerFactory, - windowingStrategy, - sideInputTagMapping, - sideInputs, - options); - } - protected ExecutionContext.StepContext createStepContext() { return new StepContext(); } // allow overriding this in WindowDoFnOperator because this one dynamically creates // the DoFn - protected OldDoFn<InputT, FnOutputT> getOldDoFn() { - return oldDoFn; + protected DoFn<InputT, FnOutputT> getDoFn() { + return doFn; } @Override public void open() throws Exception { super.open(); - this.oldDoFn = getOldDoFn(); - currentInputWatermark = Long.MIN_VALUE; currentOutputWatermark = currentInputWatermark; @@ -214,7 +196,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> } }; - SideInputReader sideInputReader = NullSideInputReader.of(sideInputs); + sideInputReader = NullSideInputReader.of(sideInputs); + if (!sideInputs.isEmpty()) { String operatorIdentifier = this.getClass().getSimpleName() + "_" @@ -244,11 +227,18 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> sideInputReader = sideInputHandler; } + outputManager = outputManagerFactory.create(output); + + this.doFn = getDoFn(); + doFnInvoker = DoFnInvokers.invokerFor(doFn); + + doFnInvoker.invokeSetup(); + DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), - oldDoFn, + doFn, sideInputReader, - outputManagerFactory.create(output), + outputManager, mainOutputTag, sideOutputTags, createStepContext(), @@ -258,13 +248,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> pushbackDoFnRunner = PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); - oldDoFn.setup(); } @Override public void close() throws Exception { super.close(); - oldDoFn.teardown(); + doFnInvoker.invokeTeardown(); } protected final long getPushbackWatermarkHold() { http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index d4273b2..74614ad 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -38,11 +38,11 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; + import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; @@ -56,6 +56,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -91,6 +92,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures; private transient FlinkStateInternals<K> stateInternals; + private transient FlinkTimerInternals timerInternals; private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn; @@ -106,7 +108,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> PipelineOptions options, Coder<K> keyCoder) { super( - (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) null, + null, inputType, mainOutputTag, sideOutputTags, @@ -124,7 +126,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> } @Override - protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() { + protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() { StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() { @Override public StateInternals<K> stateInternalsForKey(K key) { @@ -133,15 +135,23 @@ public class WindowDoFnOperator<K, InputT, OutputT> return stateInternals; } }; + TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() { + @Override + public TimerInternals timerInternalsForKey(K key) { + //this will implicitly be keyed like the StateInternalsFactory + return timerInternals; + } + }; // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create // has the window type as generic parameter while WindowingStrategy is almost always // untyped. @SuppressWarnings("unchecked") - OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> oldDoFn = - GroupAlsoByWindowViaWindowSetDoFn.create( - windowingStrategy, stateInternalsFactory, (SystemReduceFn) systemReduceFn); - return oldDoFn; + DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn = + GroupAlsoByWindowViaWindowSetNewDoFn.create( + windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, + (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag); + return doFn; } @@ -183,6 +193,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> processingTimeTimerFutures = new HashMap<>(); stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder); + timerInternals = new FlinkTimerInternals(); // call super at the end because this will call getDoFn() which requires stateInternals // to be set @@ -448,75 +459,79 @@ public class WindowDoFnOperator<K, InputT, OutputT> @Override public TimerInternals timerInternals() { - return new TimerInternals() { - @Override - public void setTimer( + return timerInternals; + } + } + + private class FlinkTimerInternals implements TimerInternals { + + @Override + public void setTimer( StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); - } + throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); + } - @Deprecated - @Override - public void setTimer(TimerData timerKey) { - if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { - registerEventTimeTimer(timerKey); - } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { - registerProcessingTimeTimer(timerKey); - } else { - throw new UnsupportedOperationException( + @Deprecated + @Override + public void setTimer(TimerData timerKey) { + if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { + registerEventTimeTimer(timerKey); + } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { + registerProcessingTimeTimer(timerKey); + } else { + throw new UnsupportedOperationException( "Unsupported time domain: " + timerKey.getDomain()); - } - } + } + } - @Override - public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { - throw new UnsupportedOperationException( + @Deprecated + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException( "Canceling of a timer by ID is not yet supported."); - } + } - @Deprecated - @Override - public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException( - "Canceling of a timer by ID is not yet supported."); - } + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException( + "Canceling of a timer by ID is not yet supported."); + } - @Deprecated - @Override - public void deleteTimer(TimerData timerKey) { - if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { - deleteEventTimeTimer(timerKey); - } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { - deleteProcessingTimeTimer(timerKey); - } else { - throw new UnsupportedOperationException( + @Deprecated + @Override + public void deleteTimer(TimerData timerKey) { + if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { + deleteEventTimeTimer(timerKey); + } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { + deleteProcessingTimeTimer(timerKey); + } else { + throw new UnsupportedOperationException( "Unsupported time domain: " + timerKey.getDomain()); - } - } + } + } - @Override - public Instant currentProcessingTime() { - return new Instant(getCurrentProcessingTime()); - } + @Override + public Instant currentProcessingTime() { + return new Instant(getCurrentProcessingTime()); + } - @Nullable - @Override - public Instant currentSynchronizedProcessingTime() { - return new Instant(getCurrentProcessingTime()); - } + @Nullable + @Override + public Instant currentSynchronizedProcessingTime() { + return new Instant(getCurrentProcessingTime()); + } - @Override - public Instant currentInputWatermarkTime() { - return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold())); - } + @Override + public Instant currentInputWatermarkTime() { + return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold())); + } - @Nullable - @Override - public Instant currentOutputWatermarkTime() { - return new Instant(currentOutputWatermark); - } - }; + @Nullable + @Override + public Instant currentOutputWatermarkTime() { + return new Instant(currentOutputWatermark); } + } }