[
https://issues.apache.org/jira/browse/BEAM-11998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311703#comment-17311703
]
Boyuan Zhang commented on BEAM-11998:
-------------------------------------
The most easy way is to issue ProcessBundleSplitRequest(fraction_of_remainder =
0) from runner in a output-bounded or time-bounded manner:
* runner should have a mechanism to issue ProcessBundleSplitRequest regularly.
If it's in a time-bounded and output-bounded manner, the logic could be similar
to
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
and the implementation can be shared among different java runners.
* A custom BundleSplitHandler should be created to handle split response. If
the way of rescheduling residuals is using timer and state, the logic could be
similar to StateAndTimerBundleCheckpointHandler:
https://github.com/apache/beam/blob/a16bbf78bb5b3d3a14d13fb39ed442c612d0b493/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java#L53
and the implementation can be shared among different java runners.
* Each runners needs to be hooked up with these 2 components mentioned above,
including: hold watermark correctly, persist and reschedule residuals as
expected.
> Portable runners should be able to issue checkpoints to Splittable DoFn
> -----------------------------------------------------------------------
>
> Key: BEAM-11998
> URL: https://issues.apache.org/jira/browse/BEAM-11998
> Project: Beam
> Issue Type: New Feature
> Components: cross-language, runner-flink, runner-spark
> Reporter: Boyuan Zhang
> Priority: P2
>
> To execute unbounded Splittable DoFn over fnapi in streaming mode properly,
> portable runners should issue split(ProcessBundleSplitRequest with
> fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest
> with fraction_of_remainder == 0) to SDK regularly to make current bundle
> finished processing instead of running forever.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)