Repository: beam Updated Branches: refs/heads/master 9ad01fb15 -> e362e6b49
[BEAM-1689] Apply changes for Flink's StatefulDoFnRunner to the primary StatefulDoFnRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7621472 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7621472 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7621472 Branch: refs/heads/master Commit: f7621472826c82b355f9affb0bebfeef4f550004 Parents: 9ad01fb Author: Aljoscha Krettek <[email protected]> Authored: Mon Mar 13 10:20:13 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Mar 14 11:51:33 2017 +0100 ---------------------------------------------------------------------- .../beam/runners/core/StatefulDoFnRunner.java | 96 ++++++++++++++++ .../runners/core/StatefulDoFnRunnerTest.java | 113 ++++--------------- .../wrappers/streaming/DoFnOperator.java | 95 +--------------- .../flink/streaming/DoFnOperatorTest.java | 9 +- 4 files changed, 126 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index d27193c..4f15822 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -17,8 +17,12 @@ */ package org.apache.beam.runners.core; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -26,6 +30,8 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateSpec; import org.joda.time.Instant; /** @@ -168,4 +174,94 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> void clearForWindow(W window); } + + /** + * A {@link StatefulDoFnRunner.CleanupTimer} implemented via {@link TimerInternals}. + */ + public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer { + + public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId"; + + /** + * The amount of milliseconds by which to delay cleanup. We use this to ensure that state is + * still available when a user timer for {@code window.maxTimestamp()} fires. + */ + public static final long GC_DELAY_MS = 1; + + private final TimerInternals timerInternals; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Coder<BoundedWindow> windowCoder; + + public TimeInternalsCleanupTimer( + TimerInternals timerInternals, + WindowingStrategy<?, ?> windowingStrategy) { + this.windowingStrategy = windowingStrategy; + WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); + windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder(); + this.timerInternals = timerInternals; + } + + @Override + public Instant currentInputWatermarkTime() { + return timerInternals.currentInputWatermarkTime(); + } + + @Override + public void setForWindow(BoundedWindow window) { + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + // make sure this fires after any window.maxTimestamp() timers + gcTime = gcTime.plus(GC_DELAY_MS); + timerInternals.setTimer(StateNamespaces.window(windowCoder, window), + GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); + } + + @Override + public boolean isForWindow( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + gcTime = gcTime.plus(GC_DELAY_MS); + return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); + } + } + + /** + * A {@link StatefulDoFnRunner.StateCleaner} implemented via {@link StateInternals}. + */ + public static class StateInternalsStateCleaner<W extends BoundedWindow> + implements StatefulDoFnRunner.StateCleaner<W> { + + private final DoFn<?, ?> fn; + private final DoFnSignature signature; + private final StateInternals<?> stateInternals; + private final Coder<W> windowCoder; + + public StateInternalsStateCleaner( + DoFn<?, ?> fn, + StateInternals<?> stateInternals, + Coder<W> windowCoder) { + this.fn = fn; + this.signature = DoFnSignatures.getSignature(fn.getClass()); + this.stateInternals = stateInternals; + this.windowCoder = windowCoder; + } + + @Override + public void clearForWindow(W window) { + for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : + signature.stateDeclarations().entrySet()) { + try { + StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); + State state = stateInternals.state(StateNamespaces.window(windowCoder, window), + StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); + state.clear(); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index fd6a73c..46cbd7d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.when; import com.google.common.base.MoreObjects; import java.util.Collections; -import java.util.Map; import org.apache.beam.runners.core.BaseExecutionContext.StepContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -32,18 +31,13 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NullSideInputReader; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; @@ -124,8 +118,8 @@ public class StatefulDoFnRunnerTest { mockStepContext, aggregatorFactory, WINDOWING_STRATEGY, - new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY), - new StateInternalsStateCleaner<>( + new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY), + new StatefulDoFnRunner.StateInternalsStateCleaner<>( fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder())); runner.startBundle(); @@ -153,8 +147,8 @@ public class StatefulDoFnRunnerTest { mockStepContext, aggregatorFactory, WINDOWING_STRATEGY, - new TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY), - new StateInternalsStateCleaner<>( + new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY), + new StatefulDoFnRunner.StateInternalsStateCleaner<>( fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder())); Instant elementTime = new Instant(1); @@ -179,8 +173,16 @@ public class StatefulDoFnRunnerTest { 2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read()); // advance watermark past end of WINDOW_1 + allowed lateness + // the cleanup timer is set to window.maxTimestamp() + allowed lateness + 1 + // to ensure that state is still available when a user timer for window.maxTimestamp() fires advanceInputWatermark( - timerInternals, WINDOW_1.maxTimestamp().plus(ALLOWED_LATENESS + 1), runner); + timerInternals, + WINDOW_1.maxTimestamp() + .plus(ALLOWED_LATENESS) + .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS) + .plus(1), // so the watermark is past the GC horizon, not on it + runner); + assertTrue( stateInternals.isEmptyForTesting( stateInternals.state(windowNamespace(WINDOW_1), stateTag))); @@ -190,7 +192,13 @@ public class StatefulDoFnRunnerTest { // advance watermark past end of WINDOW_2 + allowed lateness advanceInputWatermark( - timerInternals, WINDOW_2.maxTimestamp().plus(ALLOWED_LATENESS + 1), runner); + timerInternals, + WINDOW_2.maxTimestamp() + .plus(ALLOWED_LATENESS) + .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS) + .plus(1), // so the watermark is past the GC horizon, not on it + runner); + assertTrue( stateInternals.isEmptyForTesting( stateInternals.state(windowNamespace(WINDOW_2), stateTag))); @@ -263,85 +271,4 @@ public class StatefulDoFnRunnerTest { return Sum.ofLongs(); } } - - /** - * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals. - */ - public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer { - - public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId"; - - private final TimerInternals timerInternals; - private final WindowingStrategy<?, ?> windowingStrategy; - private final Coder<BoundedWindow> windowCoder; - - public TimeInternalsCleanupTimer( - TimerInternals timerInternals, - WindowingStrategy<?, ?> windowingStrategy) { - this.windowingStrategy = windowingStrategy; - WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); - windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder(); - this.timerInternals = timerInternals; - } - - @Override - public Instant currentInputWatermarkTime() { - return timerInternals.currentInputWatermarkTime(); - } - - @Override - public void setForWindow(BoundedWindow window) { - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - timerInternals.setTimer(StateNamespaces.window(windowCoder, window), - GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); - } - - @Override - public boolean isForWindow( - String timerId, - BoundedWindow window, - Instant timestamp, - TimeDomain timeDomain) { - boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); - } - } - - /** - * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals. - */ - public static class StateInternalsStateCleaner<W extends BoundedWindow> - implements StatefulDoFnRunner.StateCleaner<W> { - - private final DoFn<?, ?> fn; - private final DoFnSignature signature; - private final StateInternals<?> stateInternals; - private final Coder<W> windowCoder; - - public StateInternalsStateCleaner( - DoFn<?, ?> fn, - StateInternals<?> stateInternals, - Coder<W> windowCoder) { - this.fn = fn; - this.signature = DoFnSignatures.getSignature(fn.getClass()); - this.stateInternals = stateInternals; - this.windowCoder = windowCoder; - } - - @Override - public void clearForWindow(W window) { - for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : - signature.stateDeclarations().entrySet()) { - try { - StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); - State state = stateInternals.state(StateNamespaces.window(windowCoder, window), - StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); - state.clear(); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index a8ce680..9a66a2f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -62,18 +62,13 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -302,7 +297,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> // It is a stateful DoFn StatefulDoFnRunner.CleanupTimer cleanupTimer = - new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy); + new StatefulDoFnRunner.TimeInternalsCleanupTimer( + stepContext.timerInternals(), windowingStrategy); // we don't know the window type @SuppressWarnings({"unchecked", "rawtypes"}) @@ -310,7 +306,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @SuppressWarnings({"unchecked", "rawtypes"}) StatefulDoFnRunner.StateCleaner<?> stateCleaner = - new StateInternalsStateCleaner<>( + new StatefulDoFnRunner.StateInternalsStateCleaner<>( doFn, stepContext.stateInternals(), windowCoder); doFnRunner = DoFnRunners.defaultStatefulDoFnRunner( @@ -773,89 +769,4 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> return new Instant(currentOutputWatermark); } } - - - /** - * A {@link StatefulDoFnRunner.CleanupTimer} implemented by TimerInternals. - */ - public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer { - - public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId"; - - private final TimerInternals timerInternals; - private final WindowingStrategy<?, ?> windowingStrategy; - private final Coder<BoundedWindow> windowCoder; - - public TimeInternalsCleanupTimer( - TimerInternals timerInternals, - WindowingStrategy<?, ?> windowingStrategy) { - this.windowingStrategy = windowingStrategy; - WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); - windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder(); - this.timerInternals = timerInternals; - } - - @Override - public Instant currentInputWatermarkTime() { - return timerInternals.currentInputWatermarkTime(); - } - - @Override - public void setForWindow(BoundedWindow window) { - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - // make sure this fires after any window.maxTimestamp() timers - gcTime = gcTime.plus(1L); - timerInternals.setTimer(StateNamespaces.window(windowCoder, window), - GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); - } - - @Override - public boolean isForWindow( - String timerId, - BoundedWindow window, - Instant timestamp, - TimeDomain timeDomain) { - boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - gcTime = gcTime.plus(1L); - return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); - } - } - - /** - * A {@link StatefulDoFnRunner.StateCleaner} implemented by StateInternals. - */ - public static class StateInternalsStateCleaner<W extends BoundedWindow> - implements StatefulDoFnRunner.StateCleaner<W> { - - private final DoFn<?, ?> fn; - private final DoFnSignature signature; - private final StateInternals<?> stateInternals; - private final Coder<W> windowCoder; - - public StateInternalsStateCleaner( - DoFn<?, ?> fn, - StateInternals<?> stateInternals, - Coder<W> windowCoder) { - this.fn = fn; - this.signature = DoFnSignatures.getSignature(fn.getClass()); - this.stateInternals = stateInternals; - this.windowCoder = windowCoder; - } - - @Override - public void clearForWindow(W window) { - for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : - signature.stateDeclarations().entrySet()) { - try { - StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); - State state = stateInternals.state(StateNamespaces.window(windowCoder, window), - StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); - state.clear(); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/f7621472/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index bbd3428..25154fa 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.HashMap; import javax.annotation.Nullable; +import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; @@ -276,7 +277,7 @@ public class DoFnOperatorTest { public void testStateGCForStatefulFn() throws Exception { WindowingStrategy<Object, IntervalWindow> windowingStrategy = - WindowingStrategy.of(FixedWindows.of(new Duration(10))); + WindowingStrategy.of(FixedWindows.of(new Duration(10))).withAllowedLateness(Duration.ZERO); final String timerId = "boo"; final String stateId = "dazzle"; @@ -378,7 +379,11 @@ public class DoFnOperatorTest { // this should trigger both the window.maxTimestamp() timer and the GC timer // this tests that the GC timer fires after the user timer - testHarness.processWatermark(15); + testHarness.processWatermark( + window1.maxTimestamp() + .plus(windowingStrategy.getAllowedLateness()) + .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS) + .getMillis()); assertThat( this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
