[ 
https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=169505&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169505
 ]

ASF GitHub Bot logged work on BEAM-2939:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Nov/18 21:28
            Start Date: 26/Nov/18 21:28
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7094: [BEAM-2939] Bundle 
finalization API changes for SplittableDoFn Java SDK.
URL: https://github.com/apache/beam/pull/7094
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 88556eaf4863..b737a9ffef8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -517,10 +517,16 @@ public Duration getAllowedTimestampSkew() {
    * method annotated with this must satisfy the following constraints:
    *
    * <ul>
-   *   <li>It must have exactly zero or one arguments.
-   *   <li>If it has any arguments, its only argument must be a {@link 
DoFn.StartBundleContext}.
+   *   <li>If one of the parameters is of type {@link 
DoFn.StartBundleContext}, then it will be
+   *       passed a context object for the current execution.
+   *   <li>If one of the parameters is of type {@link BundleFinalizer}, then 
it will be passed a
+   *       mechanism to register a callback that will be invoked after the 
runner successfully
+   *       commits the output of this bundle. See <a
+   *       href="https://s.apache.org/beam-finalizing-bundles";>Apache Beam 
Portability API: How to
+   *       Finalize Bundles</a> for further details.
    * </ul>
    */
+  // TODO: Add support for bundle finalization parameter.
   @Documented
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
@@ -555,6 +561,11 @@ public Duration getAllowedTimestampSkew() {
    *       output receiver for outputting elements to the default output.
    *   <li>If one of the parameters is of type {@link MultiOutputReceiver}, 
then it will be passed
    *       an output receiver for outputting to multiple tagged outputs.
+   *   <li>If one of the parameters is of type {@link BundleFinalizer}, then 
it will be passed a
+   *       mechanism to register a callback that will be invoked after the 
runner successfully
+   *       commits the output of this bundle. See <a
+   *       href="https://s.apache.org/beam-finalizing-bundles";>Apache Beam 
Portability API: How to
+   *       Finalize Bundles</a> for further details.
    *   <li>It must return {@code void}.
    * </ul>
    *
@@ -639,9 +650,18 @@ public Duration getAllowedTimestampSkew() {
    * with this must satisfy the following constraints:
    *
    * <ul>
-   *   <li>It must have exactly zero or one arguments.
-   *   <li>If it has any arguments, its only argument must be a {@link 
DoFn.FinishBundleContext}.
+   *   <li>If one of the parameters is of type {@link 
DoFn.FinishBundleContext}, then it will be
+   *       passed a context object for the current execution.
+   *   <li>If one of the parameters is of type {@link BundleFinalizer}, then 
it will be passed a
+   *       mechanism to register a callback that will be invoked after the 
runner successfully
+   *       commits the output of this bundle. See <a
+   *       href="https://s.apache.org/beam-finalizing-bundles";>Apache Beam 
Portability API: How to
+   *       Finalize Bundles</a> for further details.
    * </ul>
+   *
+   * <p>Note that {@link FinishBundle @FinishBundle} is invoked before the 
runner commits the output
+   * while {@link BundleFinalizer.Callback bundle finalizer callbacks} are 
invoked after the runner
+   * has committed the output of a successful bundle.
    */
   @Documented
   @Retention(RetentionPolicy.RUNTIME)
@@ -694,9 +714,8 @@ public Duration getAllowedTimestampSkew() {
    * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
    *
    * <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);}
-   *
-   * <p>TODO: Make the InputT parameter optional.
    */
+  // TODO: Make the InputT parameter optional.
   @Documented
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
@@ -730,10 +749,8 @@ public Duration getAllowedTimestampSkew() {
    *
    * <p>Optional: if this method is omitted, the restriction will not be split 
(equivalent to
    * defining the method and returning {@code 
Collections.singletonList(restriction)}).
-   *
-   * <p>TODO: Introduce a parameter for controlling granularity of splitting, 
e.g. numParts. TODO:
-   * Make the InputT parameter optional.
    */
+  // TODO: Make the InputT parameter optional.
   @Documented
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
@@ -838,4 +855,56 @@ public final void prepareForProcessing() {}
    */
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {}
+
+  /**
+   * A parameter that is accessible during {@link StartBundle @StartBundle}, 
{@link
+   * ProcessElement @ProcessElement} and {@link FinishBundle @FinishBundle} 
that allows the caller
+   * to register a callback that will be invoked after the bundle has been 
successfully completed
+   * and the runner has commit the output.
+   *
+   * <p>A common usage would be to perform any acknowledgements required by an 
external system such
+   * as acking messages from a message queue since this callback is only 
invoked after the output of
+   * the bundle has been durably persisted by the runner.
+   *
+   * <p>Note that a runner may make the output of the bundle available 
immediately to downstream
+   * consumers without waiting for finalization to succeed. For pipelines that 
are sensitive to
+   * duplicate messages, they must perform output deduplication in the 
pipeline.
+   */
+  // TODO: Add support for a deduplication PTransform.
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public interface BundleFinalizer {
+    /**
+     * The provided function will be called after the runner successfully 
commits the output of a
+     * successful bundle. Throwing during finalization represents that bundle 
finalization may have
+     * failed and the runner may choose to attempt finalization again. The 
provided {@code
+     * callbackExpiry} controls how long the finalization is valid for before 
it is garbage
+     * collected and no longer able to be invoked.
+     *
+     * <p>Note that finalization is best effort and it is expected that the 
external system will
+     * self recover state if finalization never happens or consistently fails. 
For example, a queue
+     * based system that requires message acknowledgement would replay 
messages if that
+     * acknowledgement was never received within the provided time bound.
+     *
+     * <p>See <a href="https://s.apache.org/beam-finalizing-bundles";>Apache 
Beam Portability API:
+     * How to Finalize Bundles</a> for further details.
+     *
+     * @param callbackExpiry When the finalization callback expires. If the 
runner cannot commit
+     *     results and execute the callback within this duration, the callback 
will not be invoked.
+     * @param callback The finalization callback method for the runner to 
invoke after processing
+     *     results have been successfully committed.
+     */
+    void afterBundleCommit(Instant callbackExpiry, Callback callback);
+
+    /**
+     * An instance of a function that will be invoked after bundle 
finalization.
+     *
+     * <p>Note that this function should maintain all state necessary outside 
of a DoFn's context to
+     * be able to perform bundle finalization and should not rely on mutable 
state stored within a
+     * DoFn instance.
+     */
+    @FunctionalInterface
+    interface Callback {
+      void onBundleSuccess() throws Exception;
+    }
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 3b4591a88de4..d1c7b9dbe3d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -103,6 +103,9 @@
  *       provided, will be called on the {@link DoFn} instance.
  *   <li>If a runner will no longer use a {@link DoFn}, the {@link 
DoFn.Teardown} method, if
  *       provided, will be called on the discarded instance.
+ *   <li>If a bundle requested bundle finalization by registering a {@link
+ *       DoFn.BundleFinalizer.Callback bundle finalization callback}, the 
callback will be invoked
+ *       after the runner has successfully committed the output of a 
successful bundle.
  * </ol>
  *
  * <p>Note also that calls to {@link DoFn.Teardown} are best effort, and may 
not be called before a


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 169505)
    Time Spent: 9h 50m  (was: 9h 40m)

> Fn API streaming SDF support
> ----------------------------
>
>                 Key: BEAM-2939
>                 URL: https://issues.apache.org/jira/browse/BEAM-2939
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Henning Rohde
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> The Fn API should support streaming SDF. Detailed design TBD.
> Once design is ready, expand subtasks similarly to BEAM-2822.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to