[
https://issues.apache.org/jira/browse/BEAM-5816?focusedWorklogId=194748&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194748
]
ASF GitHub Bot logged work on BEAM-5816:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Feb/19 19:04
Start Date: 05/Feb/19 19:04
Worklog Time Spent: 10m
Work Description: mxm commented on issue #7719: [BEAM-5816] Finish Flink
bundles exactly once
URL: https://github.com/apache/beam/pull/7719#issuecomment-460762728
There is nothing about the general bundle logic that changed in this PR. The
commit which introduced the finish bundle call before the teardown does not
explain why it was necessary. Nevertheless, it seems reasonable to try to
finish a pending bundle before tearing down the DoFn. The bundle is finished in
dispose() because there it is guaranteed to be called because in case of errors
close() will not be called.
>What happens when the operator is removed after an exception? Are we going
to "finish" partially processed bundles?
It is ok to finish bundles even when there are exceptions. Bundles can be of
arbitrary element size.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 194748)
Time Spent: 40m (was: 0.5h)
> Flink runner starts new bundles while disposing operator
> ---------------------------------------------------------
>
> Key: BEAM-5816
> URL: https://issues.apache.org/jira/browse/BEAM-5816
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Micah Wylde
> Assignee: Maximilian Michels
> Priority: Major
> Labels: portability-flink
> Time Spent: 40m
> Remaining Estimate: 0h
>
> We sometimes see exceptions when shutting down portable flink pipelines
> (either due to cancellation or failure):
> {code}
> 2018-10-19 15:54:52,905 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask - Error during
> disposal of stream operator.
> java.lang.RuntimeException: Failed to finish remote bundle
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:241)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
> at
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:674)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:391)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.dispose(ExecutableStageDoFnOperator.java:166)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Already closed.
> at
> org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:251)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:238)
> ... 9 more
> Suppressed: java.lang.IllegalStateException: Processing bundle failed,
> TODO: [BEAM-3962] abort bundle.
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:266)
> ... 10 more
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)