[ 
https://issues.apache.org/jira/browse/BEAM-10940?focusedWorklogId=509532&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-509532
 ]

ASF GitHub Bot logged work on BEAM-10940:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Nov/20 06:32
            Start Date: 10/Nov/20 06:32
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520322204



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < 
Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       I found that `Thread.sleep(5s)` is not a correct solution. The reason 
why it worked occasionally  is that the bundle is small enough and I got luck 
on race condition. I think we should figure out a way to be able to fire the 
SDF processing time timer when a bundle is closed within the life cycle of one 
`ExecutableStageDoFnOperator`.
   Please correct me if I'm understanding it wrong:
   * Flink starts all operators at the same time and closes the operators when 
the input watermark reaches MAX_TIMESTAMP, or it closes operators in a reverse 
topological order and `close()` is a blocking call?
   * The processing time timers will not be fired anymore by the system once 
the `operator.close()` is invoked.
   * The assumption around `ExecutableStageDoFnOperator` is that there is only 
one bundle executing inside one operator. When the output watermark advances to 
MAX_TIMESTAMP, we consider this bundle completed.
   
   With supporting SDF initiated checkpoint, we do need to have several bundles 
invoked inside one `ExecutableStageDoFnOperator` life cycle, which means we 
either:
   
   * Enable Flink to fire processing time timers after `Operator.close()` is 
invoked -- this may not be preferrable.
   * Or we try to close the bundle before we reach to the `Operator.close()`.
   * Or we manually drain SDF timers with scarifying the ability of 
`resumeDelay()`. For example, the user may want to reschedule the SDF residuals 
in 5 mins but we have to fire it now.
   
   Do you have any ideas/suggestions? Thanks for your help!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 509532)
    Time Spent: 11h 50m  (was: 11h 40m)

> Portable Flink runner should handle DelayedBundleApplication from 
> ProcessBundleResponse.
> ----------------------------------------------------------------------------------------
>
>                 Key: BEAM-10940
>                 URL: https://issues.apache.org/jira/browse/BEAM-10940
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: P2
>          Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> SDF can produce residuals by self-checkpoint, which will be returned to 
> runner by ProcessBundleResponse.DelayedBundleApplication. The portable runner 
> should be able to handle the DelayedBundleApplication and reschedule it based 
> on the timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to