Repository: beam Updated Branches: refs/heads/master 2a2337460 -> 2e072a032
[BEAM-1116] Support for new Timer API in Flink Batch runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e7f825cd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e7f825cd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e7f825cd Branch: refs/heads/master Commit: e7f825cd7f78250e51326f51a0015bbc96d10d84 Parents: 2a23374 Author: JingsongLi <[email protected]> Authored: Tue Feb 28 19:00:48 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Feb 28 14:13:04 2017 +0100 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 1 - .../flink/FlinkBatchTransformTranslators.java | 16 ----- .../functions/FlinkStatefulDoFnFunction.java | 64 ++++++++++++++++++++ 3 files changed, 64 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e7f825cd/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 8cc65b0..13d5b10 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -55,7 +55,6 @@ <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> <excludedGroups> org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, - org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics http://git-wip-us.apache.org/repos/asf/beam/blob/e7f825cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index ed2f4aa..30e9d68 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -498,20 +498,6 @@ class FlinkBatchTransformTranslators { } } - private static void rejectTimers(DoFn<?, ?> doFn) { - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - - if (signature.timerDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", - DoFn.TimerId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - FlinkRunner.class.getSimpleName())); - } - } - private static class ParDoBoundTranslatorBatch<InputT, OutputT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator< ParDo.Bound<InputT, OutputT>> { @@ -524,7 +510,6 @@ class FlinkBatchTransformTranslators { FlinkBatchTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); - rejectTimers(doFn); DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); @@ -597,7 +582,6 @@ class FlinkBatchTransformTranslators { FlinkBatchTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); - rejectTimers(doFn); DataSet<WindowedValue<InputT>> inputDataSet = context.getInputDataSet(context.getInput(transform)); http://git-wip-us.apache.org/repos/asf/beam/blob/e7f825cd/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index fca7691..0d8399e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -17,19 +17,26 @@ */ package org.apache.beam.runners.flink.translation.functions; +import static org.apache.flink.util.Preconditions.checkArgument; + import java.util.Collections; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -39,6 +46,7 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; +import org.joda.time.Instant; /** * A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink Batch Runner. @@ -93,6 +101,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> final K key = currentValue.getValue().getKey(); final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key); + + // Used with Batch, we know that all the data is available for this key. We can't use the + // timer manager from the context because it doesn't exist. So we create one and advance + // time to the end after processing all elements. + final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + timerInternals.advanceProcessingTime(Instant.now()); + timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), dofn, new FlinkSideInputReader(sideInputs, runtimeContext), @@ -105,6 +121,10 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> public StateInternals<?> stateInternals() { return stateInternals; } + @Override + public TimerInternals timerInternals() { + return timerInternals; + } }, new FlinkAggregatorFactory(runtimeContext), windowingStrategy); @@ -117,9 +137,53 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> doFnRunner.processElement(currentValue); } + // Finish any pending windows by advancing the input watermark to infinity. + timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + + // Finally, advance the processing time to infinity to fire any timers. + timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + + fireEligibleTimers(timerInternals, doFnRunner); + doFnRunner.finishBundle(); } + private void fireEligibleTimers( + InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner) + throws Exception { + + while (true) { + + TimerInternals.TimerData timer; + boolean hasFired = false; + + while ((timer = timerInternals.removeNextEventTimer()) != null) { + hasFired = true; + fireTimer(timer, runner); + } + while ((timer = timerInternals.removeNextProcessingTimer()) != null) { + hasFired = true; + fireTimer(timer, runner); + } + while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { + hasFired = true; + fireTimer(timer, runner); + } + if (!hasFired) { + break; + } + } + } + + private void fireTimer( + TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner) { + StateNamespace namespace = timer.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); + doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain()); + } + @Override public void open(Configuration parameters) throws Exception { doFnInvoker = DoFnInvokers.invokerFor(dofn);
