Repository: beam Updated Branches: refs/heads/master 8cfb3d125 -> 038ebd3bf
[BEAM-1116] Support for new Timer API in Flink runner (streaming) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f86facb2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f86facb2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f86facb2 Branch: refs/heads/master Commit: f86facb27170c34cc9ef2d702a51bdcd7e53836d Parents: 0e9173d Author: JingsongLi <[email protected]> Authored: Fri Feb 17 17:48:23 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Feb 21 09:53:05 2017 +0100 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 1 - .../FlinkStreamingTransformTranslators.java | 16 ------ .../wrappers/streaming/DoFnOperator.java | 58 +++++++++++++++----- .../wrappers/streaming/WindowDoFnOperator.java | 48 ---------------- 4 files changed, 44 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index fbe2686..a7fae5d 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -90,7 +90,6 @@ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, - org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 757cdd2..cd0ef03 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -320,20 +320,6 @@ public class FlinkStreamingTransformTranslators { } } - private static void rejectTimers(DoFn<?, ?> doFn) { - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - - if (signature.timerDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", - DoFn.TimerId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - FlinkRunner.class.getSimpleName())); - } - } - private static class ParDoBoundStreamingTranslator<InputT, OutputT> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< ParDo.Bound<InputT, OutputT>> { @@ -345,7 +331,6 @@ public class FlinkStreamingTransformTranslators { DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); - rejectTimers(doFn); WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy(); @@ -531,7 +516,6 @@ public class FlinkStreamingTransformTranslators { DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); - rejectTimers(doFn); // we assume that the transformation does not change the windowing strategy. WindowingStrategy<?, ?> windowingStrategy = http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8a3dad2..29b6fbc 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; @@ -41,6 +40,7 @@ import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals; @@ -179,7 +179,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); } - protected ExecutionContext.StepContext createStepContext() { + private ExecutionContext.StepContext createStepContext() { return new StepContext(); } @@ -306,7 +306,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected final long getPushbackWatermarkHold() { // if we don't have side inputs we never hold the watermark if (sideInputs.isEmpty()) { - return Long.MAX_VALUE; + return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); } try { @@ -325,7 +325,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> BagState<WindowedValue<InputT>> pushedBack = pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - long min = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE); + long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); for (WindowedValue<InputT> value : pushedBack.read()) { min = Math.min(min, value.getTimestamp().getMillis()); } @@ -398,7 +398,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> } pushedBack.clear(); - long min = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE); + long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); for (WindowedValue<InputT> pushedBackValue : newPushedBack) { min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); pushedBack.add(pushedBackValue); @@ -418,12 +418,36 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public void processWatermark1(Watermark mark) throws Exception { - this.currentInputWatermark = mark.getTimestamp(); - long potentialOutputWatermark = - Math.min(getPushbackWatermarkHold(), currentInputWatermark); - if (potentialOutputWatermark > currentOutputWatermark) { - currentOutputWatermark = potentialOutputWatermark; - output.emitWatermark(new Watermark(currentOutputWatermark)); + if (keyCoder == null) { + this.currentInputWatermark = mark.getTimestamp(); + long potentialOutputWatermark = + Math.min(getPushbackWatermarkHold(), currentInputWatermark); + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + output.emitWatermark(new Watermark(currentOutputWatermark)); + } + } else { + // fireTimers, so we need startBundle. + pushbackDoFnRunner.startBundle(); + + this.currentInputWatermark = mark.getTimestamp(); + + // hold back by the pushed back values waiting for side inputs + long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); + + timerService.advanceWatermark(actualInputWatermark); + + Instant watermarkHold = stateInternals.watermarkHold(); + + long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); + + long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold); + + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + output.emitWatermark(new Watermark(currentOutputWatermark)); + } + pushbackDoFnRunner.finishBundle(); } } @@ -538,9 +562,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> fireTimer(timer); } + // allow overriding this in WindowDoFnOperator public void fireTimer(InternalTimer<?, TimerData> timer) { - // Now not implement timers in StatefulPardo - throw new RuntimeException("The fireTimer should not be invoke in DoFnOperator."); + TimerInternals.TimerData timerData = timer.getNamespace(); + StateNamespace namespace = timerData.getNamespace(); + // This is a user timer, so namespace must be WindowNamespace + checkArgument(namespace instanceof WindowNamespace); + BoundedWindow window = ((WindowNamespace) namespace).getWindow(); + pushbackDoFnRunner.onTimer(timerData.getTimerId(), window, + timerData.getTimestamp(), timerData.getDomain()); } /** @@ -638,7 +668,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public TimerInternals timerInternals() { - throw new UnsupportedOperationException("Not supported for regular DoFns."); + return timerInternals; } } http://git-wip-us.apache.org/repos/asf/beam/blob/f86facb2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index d5594e6..b015f66 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; @@ -42,8 +41,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.streaming.api.operators.InternalTimer; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.joda.time.Instant; /** * Flink operator for executing window {@link DoFn DoFns}. @@ -81,7 +78,6 @@ public class WindowDoFnOperator<K, InputT, OutputT> this.systemReduceFn = systemReduceFn; - } @Override @@ -114,11 +110,6 @@ public class WindowDoFnOperator<K, InputT, OutputT> } @Override - protected ExecutionContext.StepContext createStepContext() { - return new WindowDoFnOperator.StepContext(); - } - - @Override public void fireTimer(InternalTimer<?, TimerData> timer) { pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow( KeyedWorkItems.<K, InputT>timersWorkItem( @@ -126,43 +117,4 @@ public class WindowDoFnOperator<K, InputT, OutputT> Collections.singletonList(timer.getNamespace())))); } - @Override - public void processWatermark1(Watermark mark) throws Exception { - pushbackDoFnRunner.startBundle(); - - this.currentInputWatermark = mark.getTimestamp(); - - // hold back by the pushed back values waiting for side inputs - long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); - - timerService.advanceWatermark(actualInputWatermark); - - Instant watermarkHold = stateInternals.watermarkHold(); - - long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); - - long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold); - - if (potentialOutputWatermark > currentOutputWatermark) { - currentOutputWatermark = potentialOutputWatermark; - output.emitWatermark(new Watermark(currentOutputWatermark)); - } - pushbackDoFnRunner.finishBundle(); - - } - - /** - * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does now allow - * accessing state or timer internals. - */ - protected class StepContext extends DoFnOperator.StepContext { - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - } - - - }
