Reintroduces DoFn.ProcessContinuation (Dataflow worker compatibility part)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bec32fe9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bec32fe9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bec32fe9 Branch: refs/heads/master Commit: bec32fe93c6b5c16563d7ea4b877a2dee3352fee Parents: 1ea1de4 Author: Eugene Kirpichov <[email protected]> Authored: Fri Jun 16 14:56:07 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Mon Jun 26 17:25:04 2017 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/transforms/DoFn.java | 3 +++ .../sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java | 6 ++++++ .../org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java | 4 +++- 3 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index e711ac2..fb6d0ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -677,6 +677,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD @Experimental(Kind.SPLITTABLE_DO_FN) public @interface UnboundedPerElement {} + /** Temporary, do not use. See https://issues.apache.org/jira/browse/BEAM-1904 */ + public class ProcessContinuation {} + /** * Finalize the {@link DoFn} construction to prepare for processing. * This method should be called by runners before any processing methods. http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 5d5887a..4f67db4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -49,6 +49,7 @@ import net.bytebuddy.implementation.bytecode.Throw; import net.bytebuddy.implementation.bytecode.assign.Assigner; import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing; import net.bytebuddy.implementation.bytecode.assign.TypeCasting; +import net.bytebuddy.implementation.bytecode.constant.NullConstant; import net.bytebuddy.implementation.bytecode.constant.TextConstant; import net.bytebuddy.implementation.bytecode.member.FieldAccess; import net.bytebuddy.implementation.bytecode.member.MethodInvocation; @@ -667,6 +668,11 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { } return new StackManipulation.Compound(pushParameters); } + + @Override + protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) { + return new StackManipulation.Compound(NullConstant.INSTANCE, MethodReturn.REFERENCE); + } } private static class UserCodeMethodInvocation implements StackManipulation { http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 6fd4052..ed81f42 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -53,8 +53,10 @@ public interface DoFnInvoker<InputT, OutputT> { * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}. * * @param extra Factory for producing extra parameter objects (such as window), if necessary. + * @return {@code null} - see <a href="https://issues.apache.org/jira/browse/BEAM-1904">JIRA</a> + * tracking the complete removal of {@link DoFn.ProcessContinuation}. */ - void invokeProcessElement(ArgumentProvider<InputT, OutputT> extra); + DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra); /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */ void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments);
