[
https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=169505&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169505
]
ASF GitHub Bot logged work on BEAM-2939:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Nov/18 21:28
Start Date: 26/Nov/18 21:28
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #7094: [BEAM-2939] Bundle
finalization API changes for SplittableDoFn Java SDK.
URL: https://github.com/apache/beam/pull/7094
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 88556eaf4863..b737a9ffef8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -517,10 +517,16 @@ public Duration getAllowedTimestampSkew() {
* method annotated with this must satisfy the following constraints:
*
* <ul>
- * <li>It must have exactly zero or one arguments.
- * <li>If it has any arguments, its only argument must be a {@link
DoFn.StartBundleContext}.
+ * <li>If one of the parameters is of type {@link
DoFn.StartBundleContext}, then it will be
+ * passed a context object for the current execution.
+ * <li>If one of the parameters is of type {@link BundleFinalizer}, then
it will be passed a
+ * mechanism to register a callback that will be invoked after the
runner successfully
+ * commits the output of this bundle. See <a
+ * href="https://s.apache.org/beam-finalizing-bundles">Apache Beam
Portability API: How to
+ * Finalize Bundles</a> for further details.
* </ul>
*/
+ // TODO: Add support for bundle finalization parameter.
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@@ -555,6 +561,11 @@ public Duration getAllowedTimestampSkew() {
* output receiver for outputting elements to the default output.
* <li>If one of the parameters is of type {@link MultiOutputReceiver},
then it will be passed
* an output receiver for outputting to multiple tagged outputs.
+ * <li>If one of the parameters is of type {@link BundleFinalizer}, then
it will be passed a
+ * mechanism to register a callback that will be invoked after the
runner successfully
+ * commits the output of this bundle. See <a
+ * href="https://s.apache.org/beam-finalizing-bundles">Apache Beam
Portability API: How to
+ * Finalize Bundles</a> for further details.
* <li>It must return {@code void}.
* </ul>
*
@@ -639,9 +650,18 @@ public Duration getAllowedTimestampSkew() {
* with this must satisfy the following constraints:
*
* <ul>
- * <li>It must have exactly zero or one arguments.
- * <li>If it has any arguments, its only argument must be a {@link
DoFn.FinishBundleContext}.
+ * <li>If one of the parameters is of type {@link
DoFn.FinishBundleContext}, then it will be
+ * passed a context object for the current execution.
+ * <li>If one of the parameters is of type {@link BundleFinalizer}, then
it will be passed a
+ * mechanism to register a callback that will be invoked after the
runner successfully
+ * commits the output of this bundle. See <a
+ * href="https://s.apache.org/beam-finalizing-bundles">Apache Beam
Portability API: How to
+ * Finalize Bundles</a> for further details.
* </ul>
+ *
+ * <p>Note that {@link FinishBundle @FinishBundle} is invoked before the
runner commits the output
+ * while {@link BundleFinalizer.Callback bundle finalizer callbacks} are
invoked after the runner
+ * has committed the output of a successful bundle.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@@ -694,9 +714,8 @@ public Duration getAllowedTimestampSkew() {
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*
* <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);}
- *
- * <p>TODO: Make the InputT parameter optional.
*/
+ // TODO: Make the InputT parameter optional.
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@@ -730,10 +749,8 @@ public Duration getAllowedTimestampSkew() {
*
* <p>Optional: if this method is omitted, the restriction will not be split
(equivalent to
* defining the method and returning {@code
Collections.singletonList(restriction)}).
- *
- * <p>TODO: Introduce a parameter for controlling granularity of splitting,
e.g. numParts. TODO:
- * Make the InputT parameter optional.
*/
+ // TODO: Make the InputT parameter optional.
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@@ -838,4 +855,56 @@ public final void prepareForProcessing() {}
*/
@Override
public void populateDisplayData(DisplayData.Builder builder) {}
+
+ /**
+ * A parameter that is accessible during {@link StartBundle @StartBundle},
{@link
+ * ProcessElement @ProcessElement} and {@link FinishBundle @FinishBundle}
that allows the caller
+ * to register a callback that will be invoked after the bundle has been
successfully completed
+ * and the runner has commit the output.
+ *
+ * <p>A common usage would be to perform any acknowledgements required by an
external system such
+ * as acking messages from a message queue since this callback is only
invoked after the output of
+ * the bundle has been durably persisted by the runner.
+ *
+ * <p>Note that a runner may make the output of the bundle available
immediately to downstream
+ * consumers without waiting for finalization to succeed. For pipelines that
are sensitive to
+ * duplicate messages, they must perform output deduplication in the
pipeline.
+ */
+ // TODO: Add support for a deduplication PTransform.
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public interface BundleFinalizer {
+ /**
+ * The provided function will be called after the runner successfully
commits the output of a
+ * successful bundle. Throwing during finalization represents that bundle
finalization may have
+ * failed and the runner may choose to attempt finalization again. The
provided {@code
+ * callbackExpiry} controls how long the finalization is valid for before
it is garbage
+ * collected and no longer able to be invoked.
+ *
+ * <p>Note that finalization is best effort and it is expected that the
external system will
+ * self recover state if finalization never happens or consistently fails.
For example, a queue
+ * based system that requires message acknowledgement would replay
messages if that
+ * acknowledgement was never received within the provided time bound.
+ *
+ * <p>See <a href="https://s.apache.org/beam-finalizing-bundles">Apache
Beam Portability API:
+ * How to Finalize Bundles</a> for further details.
+ *
+ * @param callbackExpiry When the finalization callback expires. If the
runner cannot commit
+ * results and execute the callback within this duration, the callback
will not be invoked.
+ * @param callback The finalization callback method for the runner to
invoke after processing
+ * results have been successfully committed.
+ */
+ void afterBundleCommit(Instant callbackExpiry, Callback callback);
+
+ /**
+ * An instance of a function that will be invoked after bundle
finalization.
+ *
+ * <p>Note that this function should maintain all state necessary outside
of a DoFn's context to
+ * be able to perform bundle finalization and should not rely on mutable
state stored within a
+ * DoFn instance.
+ */
+ @FunctionalInterface
+ interface Callback {
+ void onBundleSuccess() throws Exception;
+ }
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 3b4591a88de4..d1c7b9dbe3d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -103,6 +103,9 @@
* provided, will be called on the {@link DoFn} instance.
* <li>If a runner will no longer use a {@link DoFn}, the {@link
DoFn.Teardown} method, if
* provided, will be called on the discarded instance.
+ * <li>If a bundle requested bundle finalization by registering a {@link
+ * DoFn.BundleFinalizer.Callback bundle finalization callback}, the
callback will be invoked
+ * after the runner has successfully committed the output of a
successful bundle.
* </ol>
*
* <p>Note also that calls to {@link DoFn.Teardown} are best effort, and may
not be called before a
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 169505)
Time Spent: 9h 50m (was: 9h 40m)
> Fn API streaming SDF support
> ----------------------------
>
> Key: BEAM-2939
> URL: https://issues.apache.org/jira/browse/BEAM-2939
> Project: Beam
> Issue Type: Improvement
> Components: beam-model
> Reporter: Henning Rohde
> Assignee: Luke Cwik
> Priority: Major
> Labels: portability
> Time Spent: 9h 50m
> Remaining Estimate: 0h
>
> The Fn API should support streaming SDF. Detailed design TBD.
> Once design is ready, expand subtasks similarly to BEAM-2822.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)