[
https://issues.apache.org/jira/browse/BEAM-6796?focusedWorklogId=210697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-210697
]
ASF GitHub Bot logged work on BEAM-6796:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Mar/19 19:52
Start Date: 10/Mar/19 19:52
Worklog Time Spent: 10m
Work Description: tweise commented on pull request #8023:
[BEAM-6796][flink] Fully finish bundle before timer callback
URL: https://github.com/apache/beam/pull/8023#discussion_r264055437
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -679,6 +685,12 @@ protected final void invokeFinishBundle() {
pushbackDoFnRunner.finishBundle();
elementCount = 0L;
lastFinishBundleTime =
getProcessingTimeService().getCurrentProcessingTime();
+ // callback only after current bundle was fully finalized
+ // it could start a new bundle, for example resulting from timer
processing
+ if (bundleFinishedCallback != null) {
+ bundleFinishedCallback.run();
+ bundleFinishedCallback = null;
Review comment:
On our 2.11 branch the callback fails because the current bundle isn't
finished and a new one won't be started (stacktrace below). Even in master it
will fail (though probably even harder to reproduce for the reason you mention).
I would like the callback to be in `DoFnOperator` because it is otherwise
hard to understand and easy to re-introduce issues. As side effect, it can also
be used elsewhere for other purposes.
```
TimerException{java.lang.RuntimeException: Failed to process pushed back
watermark after finished bundle.}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to process pushed back
watermark after finished bundle.
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.lambda$processWatermark$0(ExecutableStageDoFnOperator.java:488)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:632)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:679)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:673)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:378)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
... 7 more
Caused by: java.lang.NullPointerException
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.onTimer(ExecutableStageDoFnOperator.java:597)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:77)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:112)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:720)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.fireTimer(ExecutableStageDoFnOperator.java:394)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:703)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:572)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:542)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.lambda$processWatermark$0(ExecutableStageDoFnOperator.java:486)
... 14 more
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 210697)
Time Spent: 0.5h (was: 20m)
> Portable Flink runner error in finish bundle timer processing
> -------------------------------------------------------------
>
> Key: BEAM-6796
> URL: https://issues.apache.org/jira/browse/BEAM-6796
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.11.0
> Reporter: Thomas Weise
> Assignee: Thomas Weise
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> The timer callback fails since the bundle was already finished and a no new
> bundle started.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)