[
https://issues.apache.org/jira/browse/BEAM-12792?focusedWorklogId=737059&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-737059
]
ASF GitHub Bot logged work on BEAM-12792:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Mar/22 10:13
Start Date: 05/Mar/22 10:13
Worklog Time Spent: 10m
Work Description: phoerious commented on a change in pull request #16658:
URL: https://github.com/apache/beam/pull/16658#discussion_r820078506
##########
File path: sdks/python/container/boot.go
##########
@@ -214,41 +211,105 @@ func mainError() error {
}
}
+ workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+
+ // Keep track of child PIDs for clean shutdown without zombies
+ childPids := struct {
+ v []int
+ canceled bool
+ mu sync.Mutex
+ } {v: make([]int, 0, len(workerIds))}
+
+ // Forward trapped signals to child process groups in order to
terminate them gracefully and avoid zombies
+ go func() {
+ log.Printf("Received signal: %v", <-signalChannel)
+ childPids.mu.Lock()
+ childPids.canceled = true
+ for _, pid := range childPids.v {
+ syscall.Kill(-pid, syscall.SIGTERM)
+ go func() {
+ // This goroutine will be canceled if the main
process exits before the 5 seconds
+ // have elapsed, i.e., as soon as all
subprocesses have returned from Wait().
+ time.Sleep(5 * time.Second)
+ log.Printf("Worker process did not respond,
killing it.")
+ syscall.Kill(-pid, syscall.SIGKILL)
+ }()
+ }
+ childPids.mu.Unlock()
+ }()
+
args := []string{
"-m",
sdkHarnessEntrypoint,
}
- workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
defer wg.Done()
+
+ childPids.mu.Lock()
+ if childPids.canceled {
+ childPids.mu.Unlock()
+ return
+ }
log.Printf("Executing Python (worker %v): python %v",
workerId, strings.Join(args, " "))
- log.Printf("Python (worker %v) exited with code: %v",
workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python",
args...))
+ cmd := StartCommandEnv(map[string]string{"WORKER_ID":
workerId}, "python", args...)
+ childPids.v = append(childPids.v, cmd.Process.Pid)
+ childPids.mu.Unlock()
+
+ if err := cmd.Wait(); err != nil {
+ log.Printf("Python (worker %v) exited: %v",
workerId, err)
Review comment:
No, since Python workers are often terminated by SIGTERM even if they
were successful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 737059)
Time Spent: 12h 40m (was: 12.5h)
> Multiple jobs running on Flink session cluster reuse the persistent Python
> environment.
> ---------------------------------------------------------------------------------------
>
> Key: BEAM-12792
> URL: https://issues.apache.org/jira/browse/BEAM-12792
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.27.0, 2.28.0, 2.29.0, 2.30.0, 2.31.0
> Environment: Kubernetes 1.20 on Ubuntu 18.04.
> Reporter: Jens Wiren
> Priority: P1
> Labels: FlinkRunner, beam
> Time Spent: 12h 40m
> Remaining Estimate: 0h
>
> I'm running TFX pipelines on a Flink cluster using Beam in k8s. However,
> extra python packages passed to the Flink runner (or rather beam worker
> side-car) are only installed once per deployment cycle. Example:
> # Flink is deployed and is up and running
> # A TFX pipeline starts, submits a job to Flink along with a python whl of
> custom code and beam ops.
> # The beam worker installs the package and the pipeline finishes succesfully.
> # A new TFX pipeline is build where a new beam fn is introduced, the pipline
> is started and the new whl is submitted as in step 2).
> # This time, the new package is not being installed in the beam worker
> causing the job to fail due to a reference which does not exist in the beam
> worker, since it didn't install the new package.
>
> I started using Flink from beam version 2.27 and it has been an issue all the
> time.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)