[
https://issues.apache.org/jira/browse/BEAM-5797?focusedWorklogId=157207&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-157207
]
ASF GitHub Bot logged work on BEAM-5797:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Oct/18 21:26
Start Date: 22/Oct/18 21:26
Worklog Time Spent: 10m
Work Description: mwylde opened a new pull request #6779: [BEAM-5797]
Ensure bundle factory and stage context are closed on dispose()
URL: https://github.com/apache/beam/pull/6779
This PR fixes BEAM-579 (python processes are not always cleaned up when job
finishes).
The `super.dispose()` call in ExecutableStagedoFnOperator can fail, for
example:
```
2018-10-19 15:54:52,905 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during
disposal of stream operator.
java.lang.RuntimeException: Failed to finish remote bundle
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:241)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:87)
at
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:674)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:391)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.dispose(ExecutableStageDoFnOperator.java:166)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Already closed.
at
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:95)
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:251)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:238)
... 9 more
Suppressed: java.lang.IllegalStateException: Processing bundle failed,
TODO: [BEAM-3962] abort bundle.
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:266)
... 10 more
```
Currently this means that the stageBundleFactory and stageContext are not
properly closed. The StageContext (by default an instance of
`ReferenceCountingFlinkExecutableStageContextFactory.WrappedContext`) is
responsible for tracking usage of the sdk harness processes. If this isn't
closed for a stage, the reference count will never get to 0 and the harness
process will never be killed.
With this change, the ExecutableStageDoFnOperator dispose() logic will
always be called regardless of whether `super.dispose()` fails. This PR just
ensures that things are cleaned up properly. We should also investigate this
particular error (tracked in BEAM-5816).
------------------------
Follow this checklist to help us incorporate your contribution quickly and
easily:
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
It will help us expedite review of your Pull Request if you tag someone
(e.g. `@username`) to look at it.
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
| --- | --- | --- | --- | --- | ---
Java | [](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
</br> [](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
| --- | --- | ---
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 157207)
Time Spent: 10m
Remaining Estimate: 0h
> SDK workers are not always killed when Flink pipeline finishes
> --------------------------------------------------------------
>
> Key: BEAM-5797
> URL: https://issues.apache.org/jira/browse/BEAM-5797
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Micah Wylde
> Priority: Major
> Labels: portability-flink
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Beam python workers are spun up as part of a pipeline execution, and killed
> once that pipeline has been cancelled or failed. However, in some situations
> we see the workers hanging around indefinitely until they are manually killed
> or the taskmanager is restarted. The behavior seems to only occur with
> streaming pipelines, and appearsĀ non-deterministic
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)