[ 
https://issues.apache.org/jira/browse/BEAM-5816?focusedWorklogId=194748&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-194748
 ]

ASF GitHub Bot logged work on BEAM-5816:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Feb/19 19:04
            Start Date: 05/Feb/19 19:04
    Worklog Time Spent: 10m 
      Work Description: mxm commented on issue #7719: [BEAM-5816] Finish Flink 
bundles exactly once
URL: https://github.com/apache/beam/pull/7719#issuecomment-460762728
 
 
   There is nothing about the general bundle logic that changed in this PR. The 
commit which introduced the finish bundle call before the teardown does not 
explain why it was necessary. Nevertheless, it seems reasonable to try to 
finish a pending bundle before tearing down the DoFn. The bundle is finished in 
dispose() because there it is guaranteed to be called because in case of errors 
close() will not be called.
   
   >What happens when the operator is removed after an exception? Are we going 
to "finish" partially processed bundles?
   
   It is ok to finish bundles even when there are exceptions. Bundles can be of 
arbitrary element size.
   
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 194748)
    Time Spent: 40m  (was: 0.5h)

> Flink runner starts new bundles while disposing operator 
> ---------------------------------------------------------
>
>                 Key: BEAM-5816
>                 URL: https://issues.apache.org/jira/browse/BEAM-5816
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Micah Wylde
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: portability-flink
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> We sometimes see exceptions when shutting down portable flink pipelines 
> (either due to cancellation or failure):
> {code}
> 2018-10-19 15:54:52,905 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during 
> disposal of stream operator.
> java.lang.RuntimeException: Failed to finish remote bundle
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:241)
>       at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
>       at 
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:674)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:391)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.dispose(ExecutableStageDoFnOperator.java:166)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Already closed.
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:251)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:238)
>       ... 9 more
>       Suppressed: java.lang.IllegalStateException: Processing bundle failed, 
> TODO: [BEAM-3962] abort bundle.
>               at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:266)
>               ... 10 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to