Repository: incubator-beam Updated Branches: refs/heads/master cf1464465 -> 7809f6bd2
Port runners to use GroupAlsoByWindows via StateInternalsFactory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/902997d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/902997d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/902997d0 Branch: refs/heads/master Commit: 902997d040023c83d23e57362bdfb2d62c53d142 Parents: d2594e0 Author: Kenneth Knowles <[email protected]> Authored: Mon Jul 18 14:33:58 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Jul 25 09:30:32 2016 -0700 ---------------------------------------------------------------------- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 16 ++-- .../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 10 ++- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 10 +-- .../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 79 +++++++++++++++++--- .../sdk/util/GroupAlsoByWindowsProperties.java | 50 +++++++++++-- ...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 8 +- .../GroupAlsoByWindowEvaluatorFactory.java | 30 ++++++++ .../beam/runners/direct/ParDoEvaluator.java | 4 +- .../direct/ParDoMultiEvaluatorFactory.java | 6 ++ .../direct/ParDoSingleEvaluatorFactory.java | 6 ++ .../beam/runners/direct/ParDoEvaluatorTest.java | 1 + .../FlinkGroupAlsoByWindowWrapper.java | 31 +++++--- .../spark/translation/TransformTranslator.java | 70 ++++++++++++++++- .../apache/beam/sdk/transforms/DoFnTester.java | 41 +++++++++- 14 files changed, 309 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 73244f7..0d320bc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; /** @@ -44,8 +45,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn< public static <K, InputT, OutputT, W extends BoundedWindow> DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create( - WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { - return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, reduceFn); + WindowingStrategy<?, W> strategy, + StateInternalsFactory<K> stateInternalsFactory, + SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { + return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn); } protected final Aggregator<Long, Long> droppedDueToClosedWindow = @@ -55,15 +58,18 @@ public class GroupAlsoByWindowViaWindowSetDoFn< createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn()); private final WindowingStrategy<Object, W> windowingStrategy; + private final StateInternalsFactory<K> stateInternalsFactory; private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; private GroupAlsoByWindowViaWindowSetDoFn( WindowingStrategy<?, W> windowingStrategy, + StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { @SuppressWarnings("unchecked") WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy; this.windowingStrategy = noWildcard; this.reduceFn = reduceFn; + this.stateInternalsFactory = stateInternalsFactory; } @Override @@ -72,11 +78,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn< K key = c.element().key(); TimerInternals timerInternals = c.windowingInternals().timerInternals(); - - // It is the responsibility of the user of GroupAlsoByWindowsViaWindowSet to only - // provide a WindowingInternals instance with the appropriate key type for StateInternals. - @SuppressWarnings("unchecked") - StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals(); + StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = new ReduceFnRunner<>( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java index f5de0bc..b575559 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; /** @@ -51,9 +52,12 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound * @param windowingStrategy The window function and trigger to use for grouping * @param inputCoder the input coder to use */ - public static <K, V, W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> - createDefault(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) { + public static <K, V, W extends BoundedWindow> + GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createDefault( + WindowingStrategy<?, W> windowingStrategy, + StateInternalsFactory<K> stateInternalsFactory, + Coder<V> inputCoder) { return new GroupAlsoByWindowsViaOutputBufferDoFn<>( - windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder)); + windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, W>buffering(inputCoder)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java index d364168..d185a24 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import com.google.common.collect.Iterables; @@ -37,13 +38,16 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> { private final WindowingStrategy<?, W> strategy; + private final StateInternalsFactory<K> stateInternalsFactory; private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; public GroupAlsoByWindowsViaOutputBufferDoFn( WindowingStrategy<?, W> windowingStrategy, + StateInternalsFactory<K> stateInternalsFactory, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { this.strategy = windowingStrategy; this.reduceFn = reduceFn; + this.stateInternalsFactory = stateInternalsFactory; } @Override @@ -55,11 +59,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends // timer manager from the context because it doesn't exist. So we create one and emulate the // watermark, knowing that we have all data and it is in timestamp order. BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now()); - - // It is the responsibility of the user of GroupAlsoByWindowsViaOutputBufferDoFn to only - // provide a WindowingInternals instance with the appropriate key type for StateInternals. - @SuppressWarnings("unchecked") - StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals(); + StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = new ReduceFnRunner<K, InputT, OutputT, W>( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java index 9450495..8a0152e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util; +import static com.google.common.base.Preconditions.checkArgument; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -26,6 +28,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -68,7 +71,7 @@ import java.util.List; public class GroupByKeyViaGroupByKeyOnly<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { - private GroupByKey<K, V> gbkTransform; + private final GroupByKey<K, V> gbkTransform; public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) { this.gbkTransform = originalTransform; @@ -161,13 +164,12 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> } /** - * Helper transform that takes a collection of timestamp-ordered - * values associated with each key, groups the values by window, - * combines windows as needed, and for each window in each key, - * outputs a collection of key/value-list pairs implicitly assigned - * to the window and with the timestamp derived from that window. + * Runner-specific primitive that takes a collection of timestamp-ordered values associated with + * each key, groups the values by window, merges windows as needed, and for each window in each + * key, outputs a collection of key/value-list pairs implicitly assigned to the window and with + * the timestamp derived from that window. */ - private static class GroupAlsoByWindow<K, V> + public static class GroupAlsoByWindow<K, V> extends PTransform< PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> { private final WindowingStrategy<?, ?> windowingStrategy; @@ -176,8 +178,57 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> this.windowingStrategy = windowingStrategy; } + public WindowingStrategy<?, ?> getWindowingStrategy() { + return windowingStrategy; + } + + private KvCoder<K, Iterable<WindowedValue<V>>> getKvCoder( + Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) { + // Coder<KV<...>> --> KvCoder<...> + checkArgument(inputCoder instanceof KvCoder, + "%s requires a %s<...> but got %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + inputCoder); + @SuppressWarnings("unchecked") + KvCoder<K, Iterable<WindowedValue<V>>> kvCoder = + (KvCoder<K, Iterable<WindowedValue<V>>>) inputCoder; + return kvCoder; + } + + public Coder<K> getKeyCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) { + return getKvCoder(inputCoder).getKeyCoder(); + } + + public Coder<V> getValueCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) { + // Coder<Iterable<...>> --> IterableCoder<...> + Coder<Iterable<WindowedValue<V>>> iterableWindowedValueCoder = + getKvCoder(inputCoder).getValueCoder(); + checkArgument(iterableWindowedValueCoder instanceof IterableCoder, + "%s requires a %s<..., %s> but got a %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + IterableCoder.class.getSimpleName(), + iterableWindowedValueCoder); + IterableCoder<WindowedValue<V>> iterableCoder = + (IterableCoder<WindowedValue<V>>) iterableWindowedValueCoder; + + // Coder<WindowedValue<...>> --> WindowedValueCoder<...> + Coder<WindowedValue<V>> iterableElementCoder = iterableCoder.getElemCoder(); + checkArgument(iterableElementCoder instanceof WindowedValueCoder, + "%s requires a %s<..., %s<%s>> but got a %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + IterableCoder.class.getSimpleName(), + WindowedValueCoder.class.getSimpleName(), + iterableElementCoder); + WindowedValueCoder<V> windowedValueCoder = + (WindowedValueCoder<V>) iterableElementCoder; + + return windowedValueCoder.getValueCoder(); + } + @Override - @SuppressWarnings("unchecked") public PCollection<KV<K, Iterable<V>>> apply( PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { @SuppressWarnings("unchecked") @@ -197,16 +248,20 @@ public class GroupByKeyViaGroupByKeyOnly<K, V> Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder); Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); - return input - .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder))) + return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( + input.getPipeline(), windowingStrategy, input.isBounded()) .setCoder(outputKvCoder); } private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn( - WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) { + WindowingStrategy<?, W> strategy, + StateInternalsFactory<K> stateInternalsFactory, + Coder<V> inputIterableElementValueCoder) { return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( - strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); + strategy, + stateInternalsFactory, + SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java index fe2a495..43c287e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -32,10 +33,15 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -61,7 +67,7 @@ public class GroupAlsoByWindowsProperties { */ public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> { <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> - forStrategy(WindowingStrategy<?, W> strategy); + forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); } /** @@ -77,10 +83,15 @@ public class GroupAlsoByWindowsProperties { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); + // This key should never actually be used, though it is eagerly passed to the + // StateInternalsFactory so must be non-null + @SuppressWarnings("unchecked") + K fakeKey = (K) "this key should never be used"; + DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW( gabwFactory, windowingStrategy, - (K) null, // key should never be used + fakeKey, Collections.<WindowedValue<InputT>>emptyList()); assertThat(result.peekOutputElements(), hasSize(0)); @@ -599,11 +610,14 @@ public class GroupAlsoByWindowsProperties { K key, Collection<WindowedValue<InputT>> values) throws Exception { - TupleTag<KV<K, OutputT>> outputTag = new TupleTag<>(); - DoFnRunnerBase.ListOutputManager outputManager = new DoFnRunnerBase.ListOutputManager(); + final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>(); DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester = - DoFnTester.of(gabwFactory.forStrategy(windowingStrategy)); + DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache)); + + // Though we use a DoFnTester, the function itself is instantiated directly by the + // runner and should not be serialized; it may not even be serializable. + tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); tester.startBundle(); tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values)); tester.finishBundle(); @@ -620,4 +634,28 @@ public class GroupAlsoByWindowsProperties { return new IntervalWindow(new Instant(start), new Instant(end)); } + private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> { + private final LoadingCache<K, StateInternals<K>> stateInternalsCache; + + private CachingStateInternalsFactory() { + this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>()); + } + + @Override + @SuppressWarnings("unchecked") + public StateInternals<K> stateInternalsForKey(K key) { + try { + return stateInternalsCache.get(key); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } + } + + private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> { + @Override + public StateInternals<K> load(K key) throws Exception { + return InMemoryStateInternals.forKey(key); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java index 4ac6164..1f02a8f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,10 +43,13 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest { } @Override - public <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> - forStrategy(WindowingStrategy<?, W> windowingStrategy) { + public <W extends BoundedWindow> + GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy( + WindowingStrategy<?, W> windowingStrategy, + StateInternalsFactory<K> stateInternalsFactory) { return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>( windowingStrategy, + stateInternalsFactory, SystemReduceFn.<K, InputT, W>buffering(inputCoder)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 9782ab1..5d3ab3f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.coders.Coder; @@ -26,11 +27,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SystemReduceFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -94,9 +98,19 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { WindowingStrategy<?, BoundedWindow> windowingStrategy = (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy(); + DirectStepContext stepContext = + evaluationContext + .getExecutionContext(application, inputBundle.getKey()) + .getOrCreateStepContext( + evaluationContext.getStepName(application), application.getTransform().getName()); + + StateInternals<K> stateInternals = (StateInternals<K>) stepContext.stateInternals(); + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn = GroupAlsoByWindowViaWindowSetDoFn.create( windowingStrategy, + // new DirectStateInternalsFactory<K, V>(stepContext), + new ConstantStateInternalsFactory<K>(stateInternals), SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder)); TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {}; @@ -105,6 +119,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { this.gabwParDoEvaluator = ParDoEvaluator.create( evaluationContext, + stepContext, inputBundle, application, gabwDoFn, @@ -124,4 +139,19 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { return gabwParDoEvaluator.finishBundle(); } } + + private static final class ConstantStateInternalsFactory<K> + implements StateInternalsFactory<K> { + private final StateInternals<K> stateInternals; + + private ConstantStateInternalsFactory(StateInternals<K> stateInternals) { + this.stateInternals = stateInternals; + } + + @Override + @SuppressWarnings("unchecked") + public StateInternals<K> stateInternalsForKey(K key) { + return stateInternals; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 58cee4d..485cf4b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -46,6 +46,7 @@ import java.util.Map; class ParDoEvaluator<T> implements TransformEvaluator<T> { public static <InputT, OutputT> ParDoEvaluator<InputT> create( EvaluationContext evaluationContext, + DirectStepContext stepContext, CommittedBundle<InputT> inputBundle, AppliedPTransform<PCollection<InputT>, ?, ?> application, DoFn<InputT, OutputT> fn, @@ -55,9 +56,6 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> { Map<TupleTag<?>, PCollection<?>> outputs) { DirectExecutionContext executionContext = evaluationContext.getExecutionContext(application, inputBundle.getKey()); - String stepName = evaluationContext.getStepName(application); - DirectStepContext stepContext = - executionContext.getOrCreateStepContext(stepName, stepName); CounterSet counters = evaluationContext.createCounterSet(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java index b87cd3e..eda3db4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; @@ -77,10 +78,15 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InT, OuT>> fnLocal = (ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application); + String stepName = evaluationContext.getStepName(application); + DirectStepContext stepContext = + evaluationContext.getExecutionContext(application, inputBundle.getKey()) + .getOrCreateStepContext(stepName, stepName); try { TransformEvaluator<InT> parDoEvaluator = ParDoEvaluator.create( evaluationContext, + stepContext, inputBundle, application, fnLocal.get(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java index e9c7dd6..044abdc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; @@ -73,6 +74,10 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { CommittedBundle<InputT> inputBundle, EvaluationContext evaluationContext) { TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); + String stepName = evaluationContext.getStepName(application); + DirectStepContext stepContext = + evaluationContext.getExecutionContext(application, inputBundle.getKey()) + .getOrCreateStepContext(stepName, stepName); @SuppressWarnings({"unchecked", "rawtypes"}) ThreadLocal<DoFn<InputT, OutputT>> fnLocal = @@ -81,6 +86,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { ParDoEvaluator<InputT> parDoEvaluator = ParDoEvaluator.create( evaluationContext, + stepContext, inputBundle, application, fnLocal.get(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 3c9c9ee..bce37e4 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -160,6 +160,7 @@ public class ParDoEvaluatorTest { return ParDoEvaluator.create( evaluationContext, + stepContext, inputBundle, (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(), fn, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 3f845cf..0e977db 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.Serializable; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -47,6 +48,8 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -262,12 +265,15 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> */ private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { if (this.operator == null) { + + StateInternalsFactory<K> stateInternalsFactory = new GroupAlsoByWindowWrapperStateInternalsFactory(); + if (this.combineFn == null) { // Thus VOUT == Iterable<VIN> Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create( - (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); + (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); } else { Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); @@ -275,14 +281,14 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> .withInputCoder(combineFn, coderRegistry, inputKvCoder); this.operator = GroupAlsoByWindowViaWindowSetDoFn.create( - (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn)); + (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn)); } } return this.operator; } private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception { - context.setElement(workItem, getStateInternalsForKey(workItem.key())); + context.setElement(workItem); operator.processElement(context); } @@ -438,8 +444,6 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector; - private FlinkStateInternals<K> stateInternals; - private KeyedWorkItem<K, VIN> element; public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, @@ -452,10 +456,8 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> this.timerInternals = checkNotNull(timerInternals); } - public void setElement(KeyedWorkItem<K, VIN> element, - FlinkStateInternals<K> stateForKey) { + public void setElement(KeyedWorkItem<K, VIN> element) { this.element = element; - this.stateInternals = stateForKey; } public void setCurrentInputWatermark(Instant watermark) { @@ -509,8 +511,8 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() { @Override - public org.apache.beam.sdk.util.state.StateInternals stateInternals() { - return stateInternals; + public StateInternals stateInternals() { + throw new UnsupportedOperationException("stateInternals() is not available"); } @Override @@ -628,4 +630,13 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> // restore the timerInternals. this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder); } + + private class GroupAlsoByWindowWrapperStateInternalsFactory implements + StateInternalsFactory<K>, Serializable { + + @Override + public StateInternals<K> stateInternalsForKey(K key) { + return getStateInternalsForKey(key); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 34a0ede..c5d5802 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -16,6 +16,7 @@ * limitations under the License. */ + package org.apache.beam.runners.spark.translation; import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; @@ -32,6 +33,7 @@ import org.apache.beam.runners.spark.util.BroadcastHelper; import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; @@ -47,8 +49,16 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AssignWindowsDoFn; +import org.apache.beam.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.sdk.util.SystemReduceFn; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -78,12 +88,12 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import java.io.IOException; +import java.io.Serializable; import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; - import scala.Tuple2; /** @@ -161,6 +171,55 @@ public final class TransformTranslator { }; } + private static <K, V, W extends BoundedWindow> + TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() { + return new TransformEvaluator<GroupAlsoByWindow<K, V>>() { + @Override + public void evaluate(GroupAlsoByWindow<K, V> transform, EvaluationContext context) { + @SuppressWarnings("unchecked") + JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?> inRDD = + (JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?>) + context.getInputRDD(transform); + + Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder = + context.getInput(transform).getCoder(); + Coder<K> keyCoder = transform.getKeyCoder(inputCoder); + Coder<V> valueCoder = transform.getValueCoder(inputCoder); + + @SuppressWarnings("unchecked") + KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = + (KvCoder<K, Iterable<WindowedValue<V>>>) context.getInput(transform).getCoder(); + Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder(); + + IterableCoder<WindowedValue<V>> inputIterableValueCoder = + (IterableCoder<WindowedValue<V>>) inputValueCoder; + Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder(); + WindowedValueCoder<V> inputIterableWindowedValueCoder = + (WindowedValueCoder<V>) inputIterableElementCoder; + + Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder(); + + @SuppressWarnings("unchecked") + WindowingStrategy<?, W> windowingStrategy = + (WindowingStrategy<?, W>) transform.getWindowingStrategy(); + + DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn = + new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( + windowingStrategy, + new InMemoryStateInternalsFactory<K>(), + SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); + + // GroupAlsoByWindow current uses a dummy in-memory StateInternals + JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD = + inRDD.mapPartitions( + new DoFnFunction<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>>( + gabwDoFn, context.getRuntimeContext(), null)); + + context.setOutputRDD(transform, outRDD); + } + }; + } + private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class); private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> @@ -815,6 +874,7 @@ public final class TransformTranslator { EVALUATORS.put(ParDo.Bound.class, parDo()); EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); EVALUATORS.put(GroupByKeyOnly.class, gbk()); + EVALUATORS.put(GroupAlsoByWindow.class, gabw()); EVALUATORS.put(Combine.GroupedValues.class, grouped()); EVALUATORS.put(Combine.Globally.class, combineGlobally()); EVALUATORS.put(Combine.PerKey.class, combinePerKey()); @@ -853,4 +913,12 @@ public final class TransformTranslator { return getTransformEvaluator(clazz); } } + + private static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, + Serializable { + @Override + public StateInternals<K> stateInternalsForKey(K key) { + return InMemoryStateInternals.forKey(key); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index c38f0ab..c8bd5de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -131,6 +131,28 @@ public class DoFnTester<InputT, OutputT> { } /** + * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test. + */ + public enum CloningBehavior { + CLONE, + DO_NOT_CLONE; + } + + /** + * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test. + */ + public void setCloningBehavior(CloningBehavior newValue) { + this.cloningBehavior = newValue; + } + + /** + * Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test. + */ + public CloningBehavior getCloningBehavior() { + return cloningBehavior; + } + + /** * A convenience operation that first calls {@link #startBundle}, * then calls {@link #processElement} on each of the input elements, then * calls {@link #finishBundle}, then returns the result of @@ -644,6 +666,13 @@ public class DoFnTester<InputT, OutputT> { /** The original DoFn under test. */ private final DoFn<InputT, OutputT> origFn; + /** + * Whether to clone the original {@link DoFn} or just use it as-is. + * + * <p></p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be. + */ + private CloningBehavior cloningBehavior = CloningBehavior.CLONE; + /** The side input values to provide to the DoFn under test. */ private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs = new HashMap<>(); @@ -676,10 +705,14 @@ public class DoFnTester<InputT, OutputT> { @SuppressWarnings("unchecked") private void initializeState() { - fn = (DoFn<InputT, OutputT>) - SerializableUtils.deserializeFromByteArray( - SerializableUtils.serializeToByteArray(origFn), - origFn.toString()); + if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) { + fn = origFn; + } else { + fn = (DoFn<InputT, OutputT>) + SerializableUtils.deserializeFromByteArray( + SerializableUtils.serializeToByteArray(origFn), + origFn.toString()); + } outputs = new HashMap<>(); accumulators = new HashMap<>(); }
