Reintroduces DoFn.ProcessContinuation (Dataflow worker compatibility part)

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bec32fe9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bec32fe9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bec32fe9

Branch: refs/heads/master
Commit: bec32fe93c6b5c16563d7ea4b877a2dee3352fee
Parents: 1ea1de4
Author: Eugene Kirpichov <[email protected]>
Authored: Fri Jun 16 14:56:07 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Mon Jun 26 17:25:04 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/transforms/DoFn.java     | 3 +++
 .../sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java    | 6 ++++++
 .../org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java    | 4 +++-
 3 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index e711ac2..fb6d0ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -677,6 +677,9 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface UnboundedPerElement {}
 
+  /** Temporary, do not use. See 
https://issues.apache.org/jira/browse/BEAM-1904 */
+  public class ProcessContinuation {}
+
   /**
    * Finalize the {@link DoFn} construction to prepare for processing.
    * This method should be called by runners before any processing methods.

http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 5d5887a..4f67db4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -49,6 +49,7 @@ import net.bytebuddy.implementation.bytecode.Throw;
 import net.bytebuddy.implementation.bytecode.assign.Assigner;
 import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
 import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.constant.NullConstant;
 import net.bytebuddy.implementation.bytecode.constant.TextConstant;
 import net.bytebuddy.implementation.bytecode.member.FieldAccess;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
@@ -667,6 +668,11 @@ public class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
       }
       return new StackManipulation.Compound(pushParameters);
     }
+
+    @Override
+    protected StackManipulation afterDelegation(MethodDescription 
instrumentedMethod) {
+      return new StackManipulation.Compound(NullConstant.INSTANCE, 
MethodReturn.REFERENCE);
+    }
   }
 
   private static class UserCodeMethodInvocation implements StackManipulation {

http://git-wip-us.apache.org/repos/asf/beam/blob/bec32fe9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 6fd4052..ed81f42 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -53,8 +53,10 @@ public interface DoFnInvoker<InputT, OutputT> {
    * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
    *
    * @param extra Factory for producing extra parameter objects (such as 
window), if necessary.
+   * @return {@code null} - see <a 
href="https://issues.apache.org/jira/browse/BEAM-1904";>JIRA</a>
+   *     tracking the complete removal of {@link DoFn.ProcessContinuation}.
    */
-  void invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
+  DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, 
OutputT> extra);
 
   /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link 
DoFn}. */
   void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> 
arguments);

Reply via email to