Introduce Flink-specific state GC implementations We now set the GC timer for window.maxTimestamp() + 1 to ensure that a user timer set for window.maxTimestamp() still has all state.
This also adds tests for late data dropping and state GC specifically for the Flink DoFnOperator. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8fa718db Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8fa718db Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8fa718db Branch: refs/heads/release-0.6.0 Commit: 8fa718db5bc14efd1beefc2c757c331a5bdbf927 Parents: a18b5b1 Author: Aljoscha Krettek <[email protected]> Authored: Fri Mar 10 11:07:00 2017 +0100 Committer: Ahmet Altay <[email protected]> Committed: Fri Mar 10 17:13:49 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/runners/core/DoFnRunners.java | 15 +- .../beam/runners/core/StatefulDoFnRunner.java | 87 ------- .../runners/core/StatefulDoFnRunnerTest.java | 110 ++++++++- .../wrappers/streaming/DoFnOperator.java | 111 ++++++++- .../flink/streaming/DoFnOperatorTest.java | 225 +++++++++++++++++++ 5 files changed, 439 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 9455eea..a1b7c8b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -21,9 +21,6 @@ import java.util.List; import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer; import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner; -import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner; -import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; @@ -135,18 +132,13 @@ public class DoFnRunners { DoFnRunner<InputT, OutputT> doFnRunner, StepContext stepContext, AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { + WindowingStrategy<?, ?> windowingStrategy, + CleanupTimer cleanupTimer, + StateCleaner<W> stateCleaner) { Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn( fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); - CleanupTimer cleanupTimer = - new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy); - - Coder<W> windowCoder = (Coder<W>) windowingStrategy.getWindowFn().windowCoder(); - StateCleaner<W> stateCleaner = - new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), windowCoder); - return new StatefulDoFnRunner<>( doFnRunner, windowingStrategy, @@ -154,5 +146,4 @@ public class DoFnRunners { stateCleaner, droppedDueToLateness); } - } http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 926345e..c672902 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -17,12 +17,8 @@ */ package org.apache.beam.runners.core; -import java.util.Map; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -30,8 +26,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateSpec; import org.joda.time.Instant; /** @@ -45,7 +39,6 @@ import org.joda.time.Instant; public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> implements DoFnRunner<InputT, OutputT> { - public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId"; public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped"; private final DoFnRunner<InputT, OutputT> doFnRunner; @@ -167,84 +160,4 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> void clearForWindow(W window); } - - /** - * A {@link CleanupTimer} implemented by TimerInternals. - */ - public static class TimeInternalsCleanupTimer implements CleanupTimer { - - private final TimerInternals timerInternals; - private final WindowingStrategy<?, ?> windowingStrategy; - private final Coder<BoundedWindow> windowCoder; - - public TimeInternalsCleanupTimer( - TimerInternals timerInternals, - WindowingStrategy<?, ?> windowingStrategy) { - this.windowingStrategy = windowingStrategy; - WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); - windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder(); - this.timerInternals = timerInternals; - } - - @Override - public Instant currentInputWatermarkTime() { - return timerInternals.currentInputWatermarkTime(); - } - - @Override - public void setForWindow(BoundedWindow window) { - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - timerInternals.setTimer(StateNamespaces.window(windowCoder, window), - GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); - } - - @Override - public boolean isForWindow( - String timerId, - BoundedWindow window, - Instant timestamp, - TimeDomain timeDomain) { - boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); - } - } - - /** - * A {@link StateCleaner} implemented by StateInternals. - */ - public static class StateInternalsStateCleaner<W extends BoundedWindow> - implements StateCleaner<W> { - - private final DoFn<?, ?> fn; - private final DoFnSignature signature; - private final StateInternals<?> stateInternals; - private final Coder<W> windowCoder; - - public StateInternalsStateCleaner( - DoFn<?, ?> fn, - StateInternals<?> stateInternals, - Coder<W> windowCoder) { - this.fn = fn; - this.signature = DoFnSignatures.getSignature(fn.getClass()); - this.stateInternals = stateInternals; - this.windowCoder = windowCoder; - } - - @Override - public void clearForWindow(W window) { - for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : - signature.stateDeclarations().entrySet()) { - try { - StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); - State state = stateInternals.state(StateNamespaces.window(windowCoder, window), - StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); - state.clear(); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - } - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index 54ac77e..fd6a73c 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import com.google.common.base.MoreObjects; import java.util.Collections; +import java.util.Map; import org.apache.beam.runners.core.BaseExecutionContext.StepContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -31,14 +32,18 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; @@ -114,7 +119,14 @@ public class StatefulDoFnRunnerTest { DoFn<KV<String, Integer>, Integer> fn = new MyDoFn(); DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner( - fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY); + fn, + getDoFnRunner(fn), + mockStepContext, + aggregatorFactory, + WINDOWING_STRATEGY, + new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY), + new StateInternalsStateCleaner<>( + fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder())); runner.startBundle(); @@ -125,13 +137,6 @@ public class StatefulDoFnRunnerTest { WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING)); assertEquals(1L, droppedDueToLateness.sum); - runner.onTimer("processTimer", window, timestamp, TimeDomain.PROCESSING_TIME); - assertEquals(2L, droppedDueToLateness.sum); - - runner.onTimer("synchronizedProcessTimer", window, timestamp, - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - assertEquals(3L, droppedDueToLateness.sum); - runner.finishBundle(); } @@ -143,7 +148,14 @@ public class StatefulDoFnRunnerTest { StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState); DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner( - fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY); + fn, + getDoFnRunner(fn), + mockStepContext, + aggregatorFactory, + WINDOWING_STRATEGY, + new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY), + new StateInternalsStateCleaner<>( + fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder())); Instant elementTime = new Instant(1); @@ -252,4 +264,84 @@ public class StatefulDoFnRunnerTest { } } + /** + * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals. + */ + public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer { + + public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId"; + + private final TimerInternals timerInternals; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Coder<BoundedWindow> windowCoder; + + public TimeInternalsCleanupTimer( + TimerInternals timerInternals, + WindowingStrategy<?, ?> windowingStrategy) { + this.windowingStrategy = windowingStrategy; + WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); + windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder(); + this.timerInternals = timerInternals; + } + + @Override + public Instant currentInputWatermarkTime() { + return timerInternals.currentInputWatermarkTime(); + } + + @Override + public void setForWindow(BoundedWindow window) { + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + timerInternals.setTimer(StateNamespaces.window(windowCoder, window), + GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); + } + + @Override + public boolean isForWindow( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); + } + } + + /** + * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals. + */ + public static class StateInternalsStateCleaner<W extends BoundedWindow> + implements StatefulDoFnRunner.StateCleaner<W> { + + private final DoFn<?, ?> fn; + private final DoFnSignature signature; + private final StateInternals<?> stateInternals; + private final Coder<W> windowCoder; + + public StateInternalsStateCleaner( + DoFn<?, ?> fn, + StateInternals<?> stateInternals, + Coder<W> windowCoder) { + this.fn = fn; + this.signature = DoFnSignatures.getSignature(fn.getClass()); + this.stateInternals = stateInternals; + this.windowCoder = windowCoder; + } + + @Override + public void clearForWindow(W window) { + for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : + signature.stateDeclarations().entrySet()) { + try { + StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); + State state = stateInternals.state(StateNamespaces.window(windowCoder, window), + StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); + state.clear(); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index c4622ba..a8ce680 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -43,6 +43,7 @@ import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; @@ -61,13 +62,18 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -286,6 +292,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> // // for some K, V + doFnRunner = DoFnRunners.lateDataDroppingRunner( (DoFnRunner) doFnRunner, stepContext, @@ -293,8 +300,27 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator()); } else if (keyCoder != null) { // It is a stateful DoFn + + StatefulDoFnRunner.CleanupTimer cleanupTimer = + new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy); + + // we don't know the window type + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + StatefulDoFnRunner.StateCleaner<?> stateCleaner = + new StateInternalsStateCleaner<>( + doFn, stepContext.stateInternals(), windowCoder); + doFnRunner = DoFnRunners.defaultStatefulDoFnRunner( - doFn, doFnRunner, stepContext, aggregatorFactory, windowingStrategy); + doFn, + doFnRunner, + stepContext, + aggregatorFactory, + windowingStrategy, + cleanupTimer, + stateCleaner); } pushbackDoFnRunner = @@ -746,7 +772,90 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> public Instant currentOutputWatermarkTime() { return new Instant(currentOutputWatermark); } + } + + + /** + * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals. + */ + public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer { + + public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId"; + + private final TimerInternals timerInternals; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Coder<BoundedWindow> windowCoder; + + public TimeInternalsCleanupTimer( + TimerInternals timerInternals, + WindowingStrategy<?, ?> windowingStrategy) { + this.windowingStrategy = windowingStrategy; + WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); + windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder(); + this.timerInternals = timerInternals; + } + @Override + public Instant currentInputWatermarkTime() { + return timerInternals.currentInputWatermarkTime(); + } + + @Override + public void setForWindow(BoundedWindow window) { + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + // make sure this fires after any window.maxTimestamp() timers + gcTime = gcTime.plus(1L); + timerInternals.setTimer(StateNamespaces.window(windowCoder, window), + GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); + } + + @Override + public boolean isForWindow( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + gcTime = gcTime.plus(1L); + return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); + } } + /** + * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals. + */ + public static class StateInternalsStateCleaner<W extends BoundedWindow> + implements StatefulDoFnRunner.StateCleaner<W> { + + private final DoFn<?, ?> fn; + private final DoFnSignature signature; + private final StateInternals<?> stateInternals; + private final Coder<W> windowCoder; + + public StateInternalsStateCleaner( + DoFn<?, ?> fn, + StateInternals<?> stateInternals, + Coder<W> windowCoder) { + this.fn = fn; + this.signature = DoFnSignatures.getSignature(fn.getClass()); + this.stateInternals = stateInternals; + this.windowCoder = windowCoder; + } + + @Override + public void clearForWindow(W window) { + for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : + signature.stateDeclarations().entrySet()) { + try { + StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); + State state = stateInternals.state(StateNamespaces.window(windowCoder, window), + StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); + state.clear(); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/8fa718db/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 7d14a87..bbd3428 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -17,7 +17,9 @@ */ package org.apache.beam.runners.flink.streaming; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import com.google.common.base.Function; @@ -29,9 +31,12 @@ import java.util.Collections; import java.util.HashMap; import javax.annotation.Nullable; import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PCollectionViewTesting; import org.apache.beam.sdk.transforms.DoFn; @@ -40,14 +45,23 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; @@ -169,6 +183,217 @@ public class DoFnOperatorTest { testHarness.close(); } + @Test + public void testLateDroppingForStatefulFn() throws Exception { + + WindowingStrategy<Object, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(new Duration(10))); + + DoFn<Integer, String> fn = new DoFn<Integer, String>() { + + @StateId("state") + private final StateSpec<Object, ValueState<String>> stateSpec = + StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void processElement(ProcessContext context) { + context.output(context.element().toString()); + } + }; + + WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder = + WindowedValue.getFullCoder( + VarIntCoder.of(), + windowingStrategy.getWindowFn().windowCoder()); + + TupleTag<String> outputTag = new TupleTag<>("main-output"); + + DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>( + fn, + windowedValueCoder, + outputTag, + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<String>>(), + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + VarIntCoder.of() /* key coder */); + + OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + doFnOperator, + new KeySelector<WindowedValue<Integer>, Integer>() { + @Override + public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws Exception { + return integerWindowedValue.getValue(); + } + }, + new CoderTypeInformation<>(VarIntCoder.of())); + + testHarness.open(); + + testHarness.processWatermark(0); + + IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10)); + + // this should not be late + testHarness.processElement( + new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING))); + + assertThat( + this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains(WindowedValue.of("13", new Instant(0), window1, PaneInfo.NO_FIRING))); + + testHarness.getOutput().clear(); + + testHarness.processWatermark(9); + + // this should still not be considered late + testHarness.processElement( + new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING))); + + assertThat( + this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains(WindowedValue.of("17", new Instant(0), window1, PaneInfo.NO_FIRING))); + + testHarness.getOutput().clear(); + + testHarness.processWatermark(10); + + // this should now be considered late + testHarness.processElement( + new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING))); + + assertThat( + this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + emptyIterable()); + + testHarness.close(); + } + + @Test + public void testStateGCForStatefulFn() throws Exception { + + WindowingStrategy<Object, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(new Duration(10))); + + final String timerId = "boo"; + final String stateId = "dazzle"; + + final int offset = 5000; + final int timerOutput = 4093; + + DoFn<KV<String, Integer>, KV<String, Integer>> fn = + new DoFn<KV<String, Integer>, KV<String, Integer>>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @StateId(stateId) + private final StateSpec<Object, ValueState<String>> stateSpec = + StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext context, + @TimerId(timerId) Timer timer, + @StateId(stateId) ValueState<String> state, + BoundedWindow window) { + timer.set(window.maxTimestamp()); + state.write(context.element().getKey()); + context.output( + KV.of(context.element().getKey(), context.element().getValue() + offset)); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String> state) { + context.output(KV.of(state.read(), timerOutput)); + } + }; + + WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder = + WindowedValue.getFullCoder( + KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), + windowingStrategy.getWindowFn().windowCoder()); + + TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output"); + + DoFnOperator< + KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator = + new DoFnOperator<>( + fn, + windowedValueCoder, + outputTag, + Collections.<TupleTag<?>>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<String, Integer>>>(), + windowingStrategy, + new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ + Collections.<PCollectionView<?>>emptyList(), /* side inputs */ + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + StringUtf8Coder.of() /* key coder */); + + KeyedOneInputStreamOperatorTestHarness< + String, + WindowedValue<KV<String, Integer>>, + WindowedValue<KV<String, Integer>>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + doFnOperator, + new KeySelector<WindowedValue<KV<String, Integer>>, String>() { + @Override + public String getKey( + WindowedValue<KV<String, Integer>> kvWindowedValue) throws Exception { + return kvWindowedValue.getValue().getKey(); + } + }, + new CoderTypeInformation<>(StringUtf8Coder.of())); + + testHarness.open(); + + testHarness.processWatermark(0); + + assertEquals(0, testHarness.numKeyedStateEntries()); + + IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10)); + + testHarness.processElement( + new StreamRecord<>( + WindowedValue.of(KV.of("key1", 5), new Instant(1), window1, PaneInfo.NO_FIRING))); + + testHarness.processElement( + new StreamRecord<>( + WindowedValue.of(KV.of("key2", 7), new Instant(3), window1, PaneInfo.NO_FIRING))); + + assertThat( + this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.of( + KV.of("key1", 5 + offset), new Instant(1), window1, PaneInfo.NO_FIRING), + WindowedValue.of( + KV.of("key2", 7 + offset), new Instant(3), window1, PaneInfo.NO_FIRING))); + + assertEquals(2, testHarness.numKeyedStateEntries()); + + testHarness.getOutput().clear(); + + // this should trigger both the window.maxTimestamp() timer and the GC timer + // this tests that the GC timer fires after the user timer + testHarness.processWatermark(15); + + assertThat( + this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.of( + KV.of("key1", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING), + WindowedValue.of( + KV.of("key2", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING))); + + // ensure the state was garbage collected + assertEquals(0, testHarness.numKeyedStateEntries()); + + testHarness.close(); + } + public void testSideInputs(boolean keyed) throws Exception { WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
