This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch timermgr in repository https://gitbox.apache.org/repos/asf/beam.git
commit a436d7dbb01f8324b1956ca2eb9248236fd7803e Author: Robert Bradshaw <[email protected]> AuthorDate: Tue Feb 14 13:21:45 2023 -0800 Add retry logic to Python boot script. This will allow runners to act more quickly on failures rather than wait for all workers to die before the exiting the container. --- sdks/python/container/boot.go | 47 ++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index a9ae893ab0c..4572f43f8a9 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -30,8 +30,8 @@ import ( "path/filepath" "regexp" "strings" - "syscall" "sync" + "syscall" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" @@ -216,10 +216,10 @@ func launchSDKProcess() error { // Keep track of child PIDs for clean shutdown without zombies childPids := struct { - v []int + v []int canceled bool - mu sync.Mutex - } {v: make([]int, 0, len(workerIds))} + mu sync.Mutex + }{v: make([]int, 0, len(workerIds))} // Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies go func() { @@ -251,20 +251,31 @@ func launchSDKProcess() error { go func(workerId string) { defer wg.Done() - childPids.mu.Lock() - if childPids.canceled { + errorCount := 0 + for { + childPids.mu.Lock() + if childPids.canceled { + childPids.mu.Unlock() + return + } + log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) + cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...) + childPids.v = append(childPids.v, cmd.Process.Pid) childPids.mu.Unlock() - return - } - log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) - cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...) - childPids.v = append(childPids.v, cmd.Process.Pid) - childPids.mu.Unlock() - - if err := cmd.Wait(); err != nil { - log.Printf("Python (worker %v) exited: %v", workerId, err) - } else { - log.Printf("Python (worker %v) exited.", workerId) + + if err := cmd.Wait(); err != nil { + // Retry on fatal errors, like OOMs and segfaults, not just + // DoFns throwing exceptions. + errorCount += 1 + if errorCount < 4 { + log.Printf("Python (worker %v) exited: %v", workerId, err) + } else { + log.Fatalf("Python (worker %v) exited: %v", workerId, err) + } + } else { + log.Printf("Python (worker %v) exited.", workerId) + break + } } }(workerId) } @@ -297,7 +308,7 @@ func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.C func setupVenv(baseDir, workerId string) (string, error) { log.Printf("Initializing temporary Python venv ...") - dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId) + dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId) if _, err := os.Stat(dir); !os.IsNotExist(err) { // Probably leftovers from a previous run log.Printf("Cleaning up previous venv ...")
