[ 
https://issues.apache.org/jira/browse/BEAM-3194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287091#comment-16287091
 ] 

Kenneth Knowles edited comment on BEAM-3194 at 1/30/18 5:00 AM:
----------------------------------------------------------------

kennknowles closed pull request #4135: [BEAM-3194] Add @RequiresStableInput 
annotation
URL: https://github.com/apache/beam/pull/4135


was (Author: githubbot):
kennknowles closed pull request #4135: [BEAM-3194] Add @RequiresStableInput 
annotation
URL: https://github.com/apache/beam/pull/4135
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 3e023db679d..9f8dd45d110 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -567,6 +567,29 @@ public Duration getAllowedTimestampSkew() {
   @Target(ElementType.METHOD)
   public @interface ProcessElement {}
 
+  /**
+   * <b><i>Experimental - no backwards compatibility guarantees. The exact 
name or usage of this
+   * feature may change.</i></b>
+   *
+   * <p>Annotation that may be added to a {@link ProcessElement} or {@link 
OnTimer} method to
+   * indicate that the runner must ensure that the observable contents of the 
input {@link
+   * PCollection} or mutable state must be stable upon retries.
+   *
+   * <p>This is important for sinks, which must ensure exactly-once semantics 
when writing to a
+   * storage medium outside of your pipeline. A general pattern for a basic 
sink is to write a
+   * {@link DoFn} that can perform an idempotent write, and annotate that it 
requires stable input.
+   * Combined, these allow the write to be freely retried until success.
+   *
+   * <p>An example of an unstable input would be anything computed using 
nondeterministic logic. In
+   * Beam, any user-defined function is permitted to be nondeterministic, and 
any {@link
+   * PCollection} is permitted to be recomputed in any manner.
+   */
+  @Documented
+  @Experimental
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface RequiresStableInput {}
+
   /**
    * Annotation for the method to use to finish processing a batch of elements.
    * The method annotated with this must satisfy the following constraints:
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index bfad69ea776..1e126611452 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -426,6 +426,12 @@ public static TimerParameter 
timerParameter(TimerDeclaration decl) {
     @Override
     public abstract List<Parameter> extraParameters();
 
+    /**
+     * Whether this method requires stable input, expressed via {@link
+     * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}.
+     */
+    public abstract boolean requiresStableInput();
+
     /** Concrete type of the {@link RestrictionTracker} parameter, if present. 
*/
     @Nullable
     public abstract TypeDescriptor<?> trackerT();
@@ -440,12 +446,14 @@ public static TimerParameter 
timerParameter(TimerDeclaration decl) {
     static ProcessElementMethod create(
         Method targetMethod,
         List<Parameter> extraParameters,
+        boolean requiresStableInput,
         TypeDescriptor<?> trackerT,
         @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
         boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
           targetMethod,
           Collections.unmodifiableList(extraParameters),
+          requiresStableInput,
           trackerT,
           windowT,
           hasReturnValue);
@@ -487,6 +495,13 @@ public boolean isSplittable() {
     @Override
     public abstract Method targetMethod();
 
+    /**
+     * Whether this method requires stable input, expressed via {@link
+     * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For timers, 
this means that any
+     * state must be stably persisted prior to calling it.
+     */
+    public abstract boolean requiresStableInput();
+
     /** The window type used by this method, if any. */
     @Nullable
     public abstract TypeDescriptor<? extends BoundedWindow> windowT();
@@ -498,10 +513,15 @@ public boolean isSplittable() {
     static OnTimerMethod create(
         Method targetMethod,
         String id,
+        boolean requiresStableInput,
         TypeDescriptor<? extends BoundedWindow> windowT,
         List<Parameter> extraParameters) {
       return new AutoValue_DoFnSignature_OnTimerMethod(
-          id, targetMethod, windowT, 
Collections.unmodifiableList(extraParameters));
+          id,
+          targetMethod,
+          requiresStableInput,
+          windowT,
+          Collections.unmodifiableList(extraParameters));
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 52607833f71..98742be3cbb 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -679,6 +679,8 @@ private static void verifyUnsplittableMethods(ErrorReporter 
errors, DoFnSignatur
 
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
 
+    boolean requiresStableInput = 
m.isAnnotationPresent(DoFn.RequiresStableInput.class);
+
     @Nullable TypeDescriptor<? extends BoundedWindow> windowT = 
getWindowType(fnClass, m);
 
     List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
@@ -706,7 +708,8 @@ private static void verifyUnsplittableMethods(ErrorReporter 
errors, DoFnSignatur
       extraParameters.add(parameter);
     }
 
-    return DoFnSignature.OnTimerMethod.create(m, timerId, windowT, 
extraParameters);
+    return DoFnSignature.OnTimerMethod.create(
+        m, timerId, requiresStableInput, windowT, extraParameters);
   }
 
   @VisibleForTesting
@@ -723,9 +726,10 @@ private static void 
verifyUnsplittableMethods(ErrorReporter errors, DoFnSignatur
         "Must return void or %s",
         DoFn.ProcessContinuation.class.getSimpleName());
 
-
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
 
+    boolean requiresStableInput = 
m.isAnnotationPresent(DoFn.RequiresStableInput.class);
+
     Type[] params = m.getGenericParameterTypes();
 
     TypeDescriptor<?> trackerT = getTrackerType(fnClass, m);
@@ -763,6 +767,7 @@ private static void verifyUnsplittableMethods(ErrorReporter 
errors, DoFnSignatur
     return DoFnSignature.ProcessElementMethod.create(
         m,
         methodContext.getExtraParameters(),
+        requiresStableInput,
         trackerT,
         windowT,
         DoFn.ProcessContinuation.class.equals(m.getReturnType()));
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index a961203ffed..d7b5cad48c8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -22,6 +22,7 @@
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -77,6 +78,17 @@ public void process(ProcessContext c) {}
         sig.processElement().extraParameters().get(0), 
instanceOf(ProcessContextParameter.class));
   }
 
+  @Test
+  public void testRequiresStableInputProcessElement() throws Exception {
+    DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() 
{
+      @ProcessElement
+      @RequiresStableInput
+      public void process(ProcessContext c) {}
+    }.getClass());
+
+    assertThat(sig.processElement().requiresStableInput(), is(true));
+  }
+
   @Test
   public void testBadExtraContext() throws Exception {
     thrown.expect(IllegalArgumentException.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support annotating that a DoFn requires stable / deterministic input for 
> replay/retry
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-3194
>                 URL: https://issues.apache.org/jira/browse/BEAM-3194
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Kenneth Knowles
>            Assignee: Eugene Kirpichov
>            Priority: Major
>
> See the thread: 
> https://lists.apache.org/thread.html/5fd81ce371aeaf642665348f8e6940e308e04275dd7072f380f9f945@%3Cdev.beam.apache.org%3E
> We need this in order to have truly cross-runner end-to-end exactly once via 
> replay + idempotence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to