[ 
https://issues.apache.org/jira/browse/BEAM-5797?focusedWorklogId=157207&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-157207
 ]

ASF GitHub Bot logged work on BEAM-5797:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Oct/18 21:26
            Start Date: 22/Oct/18 21:26
    Worklog Time Spent: 10m 
      Work Description: mwylde opened a new pull request #6779: [BEAM-5797] 
Ensure bundle factory and stage context are closed on dispose()
URL: https://github.com/apache/beam/pull/6779
 
 
   This PR fixes BEAM-579 (python processes are not always cleaned up when job 
finishes).
   
   The `super.dispose()` call in ExecutableStagedoFnOperator can fail, for 
example:
   
   ```
   2018-10-19 15:54:52,905 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during 
disposal of stream operator.
   java.lang.RuntimeException: Failed to finish remote bundle
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:241)
        at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
        at 
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:674)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:391)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.dispose(ExecutableStageDoFnOperator.java:166)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.IllegalStateException: Already closed.
        at 
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
        at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:251)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:238)
        ... 9 more
        Suppressed: java.lang.IllegalStateException: Processing bundle failed, 
TODO: [BEAM-3962] abort bundle.
                at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:266)
                ... 10 more
   
   ```
   
   Currently this means that the stageBundleFactory and stageContext are not 
properly closed. The StageContext (by default an instance of 
`ReferenceCountingFlinkExecutableStageContextFactory.WrappedContext`) is 
responsible for tracking usage of the sdk harness processes. If this isn't 
closed for a stage, the reference count will never get to 0 and the harness 
process will never be killed.
   
   With this change, the ExecutableStageDoFnOperator dispose() logic will 
always be called regardless of whether `super.dispose()` fails. This PR just 
ensures that things are cleaned up properly. We should also investigate this 
particular error (tracked in BEAM-5816).
   
   ------------------------
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
 </br> [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 157207)
            Time Spent: 10m
    Remaining Estimate: 0h

> SDK workers are not always killed when Flink pipeline finishes
> --------------------------------------------------------------
>
>                 Key: BEAM-5797
>                 URL: https://issues.apache.org/jira/browse/BEAM-5797
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Micah Wylde
>            Priority: Major
>              Labels: portability-flink
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Beam python workers are spun up as part of a pipeline execution, and killed 
> once that pipeline has been cancelled or failed. However, in some situations 
> we see the workers hanging around indefinitely until they are manually killed 
> or the taskmanager is restarted. The behavior seems to only occur with 
> streaming pipelines, and appearsĀ non-deterministic



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to