This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch timermgr
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a436d7dbb01f8324b1956ca2eb9248236fd7803e
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Feb 14 13:21:45 2023 -0800

    Add retry logic to Python boot script.
    
    This will allow runners to act more quickly on failures
    rather than wait for all workers to die before the exiting
    the container.
---
 sdks/python/container/boot.go | 47 ++++++++++++++++++++++++++-----------------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index a9ae893ab0c..4572f43f8a9 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -30,8 +30,8 @@ import (
        "path/filepath"
        "regexp"
        "strings"
-       "syscall"
        "sync"
+       "syscall"
        "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
@@ -216,10 +216,10 @@ func launchSDKProcess() error {
 
        // Keep track of child PIDs for clean shutdown without zombies
        childPids := struct {
-               v []int
+               v        []int
                canceled bool
-               mu sync.Mutex
-       } {v: make([]int, 0, len(workerIds))}
+               mu       sync.Mutex
+       }{v: make([]int, 0, len(workerIds))}
 
        // Forward trapped signals to child process groups in order to 
terminate them gracefully and avoid zombies
        go func() {
@@ -251,20 +251,31 @@ func launchSDKProcess() error {
                go func(workerId string) {
                        defer wg.Done()
 
-                       childPids.mu.Lock()
-                       if childPids.canceled {
+                       errorCount := 0
+                       for {
+                               childPids.mu.Lock()
+                               if childPids.canceled {
+                                       childPids.mu.Unlock()
+                                       return
+                               }
+                               log.Printf("Executing Python (worker %v): 
python %v", workerId, strings.Join(args, " "))
+                               cmd := 
StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
+                               childPids.v = append(childPids.v, 
cmd.Process.Pid)
                                childPids.mu.Unlock()
-                               return
-                       }
-                       log.Printf("Executing Python (worker %v): python %v", 
workerId, strings.Join(args, " "))
-                       cmd := StartCommandEnv(map[string]string{"WORKER_ID": 
workerId}, "python", args...)
-                       childPids.v = append(childPids.v, cmd.Process.Pid)
-                       childPids.mu.Unlock()
-
-                       if err := cmd.Wait(); err != nil {
-                               log.Printf("Python (worker %v) exited: %v", 
workerId, err)
-                       } else {
-                               log.Printf("Python (worker %v) exited.", 
workerId)
+
+                               if err := cmd.Wait(); err != nil {
+                                       // Retry on fatal errors, like OOMs and 
segfaults, not just
+                                       // DoFns throwing exceptions.
+                                       errorCount += 1
+                                       if errorCount < 4 {
+                                               log.Printf("Python (worker %v) 
exited: %v", workerId, err)
+                                       } else {
+                                               log.Fatalf("Python (worker %v) 
exited: %v", workerId, err)
+                                       }
+                               } else {
+                                       log.Printf("Python (worker %v) 
exited.", workerId)
+                                       break
+                               }
                        }
                }(workerId)
        }
@@ -297,7 +308,7 @@ func StartCommandEnv(env map[string]string, prog string, 
args ...string) *exec.C
 func setupVenv(baseDir, workerId string) (string, error) {
        log.Printf("Initializing temporary Python venv ...")
 
-       dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId)
+       dir := filepath.Join(baseDir, "beam-venv-worker-"+workerId)
        if _, err := os.Stat(dir); !os.IsNotExist(err) {
                // Probably leftovers from a previous run
                log.Printf("Cleaning up previous venv ...")

Reply via email to