Repository: incubator-beam Updated Branches: refs/heads/master 9b71f1636 -> 03b89c065
Add DoFnInvoker for OldDoFn, for migration ease This allows any runner to use DoFnInvokers.invokerFor(Object) to be agnostic as to whether they are running a DoFn or OldDoFn. Thus, the migration of the runner can occur in advance of further changes to the SDK and deployment can be independent. For example, a backend need not know whether it is deserializing a DoFn or OldDoFn. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2a24f3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2a24f3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2a24f3c Branch: refs/heads/master Commit: e2a24f3c2668c7341b38cc56d331cefd3a69f27f Parents: 8462acb Author: Kenneth Knowles <k...@google.com> Authored: Tue Oct 4 13:56:13 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Oct 6 20:16:09 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/DoFn.java | 40 ++++++ .../beam/sdk/transforms/DoFnAdapters.java | 142 ++++++++++++++++++- .../sdk/transforms/reflect/DoFnInvokers.java | 86 +++++++++++ .../transforms/reflect/DoFnInvokersTest.java | 46 ++++++ .../transforms/DoFnInvokersBenchmark.java | 7 + 5 files changed, 318 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 59c8323..fb7fbd4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -29,6 +29,8 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.HashMap; import java.util.Map; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; @@ -37,6 +39,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; @@ -185,6 +188,23 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD */ public abstract <T> void sideOutputWithTimestamp( TupleTag<T> tag, T output, Instant timestamp); + + /** + * Creates an {@link Aggregator} in the {@link DoFn} context with the specified name and + * aggregation logic specified by {@link CombineFn}. This is to be overridden by a particular + * runner context with an implementation that delivers the values as appropriate. + * + * <p>The aggregators declared on the {@link DoFn} will be wired up to aggregators allocated via + * this method. + * + * @param name the name of the aggregator + * @param combiner the {@link CombineFn} to use in the aggregator + * @return an aggregator for the provided name and {@link CombineFn} in this context + */ + @Experimental(Kind.AGGREGATOR) + protected abstract <AggInputT, AggOutputT> + Aggregator<AggInputT, AggOutputT> createAggregator( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner); } /** @@ -306,6 +326,21 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * A placeholder for testing purposes. */ OutputReceiver<OutputT> outputReceiver(); + + /** + * For migration from {@link OldDoFn} to {@link DoFn}, provide + * a {@link WindowingInternals} so an {@link OldDoFn} can be run + * via {@link DoFnInvoker}. + * + * <p>This is <i>not</i> exposed via the reflective capabilities + * of {@link DoFn}. + * + * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require + * state and timers, they will need to wait for the arrival of those features. Do not introduce + * new uses of this method. + */ + @Deprecated + WindowingInternals<InputT, OutputT> windowingInternals(); } /** A placeholder for testing handling of output types during {@link DoFn} reflection. */ @@ -335,6 +370,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD public OutputReceiver<OutputT> outputReceiver() { return null; } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return null; + } } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index 77a71e9..7b259aa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms; import java.io.IOException; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; @@ -26,6 +27,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; @@ -33,7 +35,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; /** - * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}. + * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. * * @deprecated This class will go away when we start running {@link DoFn}'s directly (using * {@link DoFnInvoker}) rather than via {@link OldDoFn}. @@ -65,6 +67,113 @@ public class DoFnAdapters { } } + /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */ + public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext( + OldDoFn<InputT, OutputT> fn, + final DoFn<InputT, OutputT>.ProcessContext c, + final DoFn.ExtraContextFactory<InputT, OutputT> extra) { + return fn.new ProcessContext() { + @Override + public InputT element() { + return c.element(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return c.sideInput(view); + } + + @Override + public Instant timestamp() { + return c.timestamp(); + } + + @Override + public BoundedWindow window() { + return extra.window(); + } + + @Override + public PaneInfo pane() { + return c.pane(); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return extra.windowingInternals(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return c.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + c.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + c.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + c.sideOutput(tag, output); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + c.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return c.createAggregator(name, combiner); + } + }; + } + + /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */ + public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext( + OldDoFn<InputT, OutputT> fn, + final DoFn<InputT, OutputT>.Context c) { + return fn.new Context() { + @Override + public PipelineOptions getPipelineOptions() { + return c.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + c.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + c.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + c.sideOutput(tag, output); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + c.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return c.createAggregator(name, combiner); + } + }; + } + /** * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link * OldDoFn}. @@ -183,10 +292,26 @@ public class DoFnAdapters { } @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, + CombineFn<AggInputT, ?, AggOutputT> combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override public BoundedWindow window() { - // The DoFn doesn't allow us to ask for these outside ProcessElements, so this + // The OldDoFn doesn't allow us to ask for these outside processElement, so this // should be unreachable. - throw new UnsupportedOperationException("Can only get the window in ProcessElements"); + throw new UnsupportedOperationException( + "Can only get the window in processElement; elsewhere there is no defined window."); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this + // should be unreachable. + throw new UnsupportedOperationException( + "Can only get WindowingInternals in processElement"); } @Override @@ -247,6 +372,12 @@ public class DoFnAdapters { } @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override public InputT element() { return context.element(); } @@ -267,6 +398,11 @@ public class DoFnAdapters { } @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return context.windowingInternals(); + } + + @Override public DoFn.InputProvider<InputT> inputProvider() { throw new UnsupportedOperationException("inputProvider() exists only for testing"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index edc1dc0..041eb60 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -58,7 +58,10 @@ import net.bytebuddy.jar.asm.Opcodes; import net.bytebuddy.jar.asm.Type; import net.bytebuddy.matcher.ElementMatchers; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFnAdapters; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.UserCodeException; /** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */ @@ -77,6 +80,89 @@ public class DoFnInvokers { private DoFnInvokers() {} + /** + * Creates a {@link DoFnInvoker} for the given {@link Object}, which should be either a + * {@link DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's + * function as an {@link Object} and then pass it to this method, so there is no need to + * statically specify what sort of object it is. + * + * @deprecated this is to be used only as a migration path for decoupling upgrades + */ + @Deprecated + public DoFnInvoker<?, ?> invokerFor(Object deserializedFn) { + if (deserializedFn instanceof DoFn) { + return newByteBuddyInvoker((DoFn<?, ?>) deserializedFn); + } else if (deserializedFn instanceof OldDoFn){ + return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn); + } else { + throw new IllegalArgumentException(String.format( + "Cannot create a %s for %s; it should be either a %s or an %s.", + DoFnInvoker.class.getSimpleName(), + deserializedFn.toString(), + DoFn.class.getSimpleName(), + OldDoFn.class.getSimpleName())); + } + } + + static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT> { + + private final OldDoFn<InputT, OutputT> fn; + + public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) { + this.fn = fn; + } + + @Override + public void invokeProcessElement( + DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra) { + OldDoFn<InputT, OutputT>.ProcessContext oldCtx = + DoFnAdapters.adaptProcessContext(fn, c, extra); + try { + fn.processElement(oldCtx); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeStartBundle(DoFn.Context c) { + OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c); + try { + fn.startBundle(oldCtx); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeFinishBundle(DoFn.Context c) { + OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c); + try { + fn.finishBundle(oldCtx); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeSetup() { + try { + fn.setup(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + + @Override + public void invokeTeardown() { + try { + fn.teardown(); + } catch (Throwable exc) { + throw UserCodeException.wrap(exc); + } + } + } + /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */ public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker( DoFn<InputT, OutputT> fn) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index e59cce8..97d810c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -18,13 +18,16 @@ package org.apache.beam.sdk.transforms.reflect; import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowingInternals; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -43,6 +46,9 @@ public class DoFnInvokersTest { @Mock private BoundedWindow mockWindow; @Mock private DoFn.InputProvider<String> mockInputProvider; @Mock private DoFn.OutputReceiver<String> mockOutputReceiver; + @Mock private WindowingInternals<String, String> mockWindowingInternals; + + @Mock private OldDoFn<String, String> mockOldDoFn; private DoFn.ExtraContextFactory<String, String> extraContextFactory; @@ -65,6 +71,11 @@ public class DoFnInvokersTest { public DoFn.OutputReceiver<String> outputReceiver() { return mockOutputReceiver; } + + @Override + public WindowingInternals<String, String> windowingInternals() { + return mockWindowingInternals; + } }; } @@ -326,4 +337,39 @@ public class DoFnInvokersTest { thrown.expectMessage("bogus"); invoker.invokeFinishBundle(null); } + + private class OldDoFnIdentity extends OldDoFn<String, String> { + public void processElement(ProcessContext c) {} + } + + @Test + public void testOldDoFnProcessElement() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn) + .invokeProcessElement(mockContext, extraContextFactory); + verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class)); + } + + @Test + public void testOldDoFnStartBundle() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockContext); + verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class)); + } + + @Test + public void testOldDoFnFinishBundle() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockContext); + verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class)); + } + + @Test + public void testOldDoFnSetup() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup(); + verify(mockOldDoFn).setup(); + } + + @Test + public void testOldDoFnTeardown() throws Exception { + new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown(); + verify(mockOldDoFn).teardown(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java index a574ed8..80324b9 100644 --- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java @@ -220,5 +220,12 @@ public class DoFnInvokersBenchmark { @Override public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {} + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, + CombineFn<AggInputT, ?, AggOutputT> combiner) { + return null; + } } }