lostluck commented on code in PR #16658:
URL: https://github.com/apache/beam/pull/16658#discussion_r991574892


##########
sdks/python/container/boot.go:
##########
@@ -73,13 +74,20 @@ const (
 )
 
 func main() {
+       if err := mainError(); err != nil {
+               log.Print(err)
+               os.Exit(1)

Review Comment:
   This is already what happens when log.Fatal is being called, so it's unclear 
why this whole change was made?
   
   https://pkg.go.dev/log#Fatal



##########
sdks/python/container/boot.go:
##########
@@ -137,46 +145,49 @@ func main() {
 
        options, err := provision.ProtoToJSON(info.GetPipelineOptions())
        if err != nil {
-               log.Fatalf("Failed to convert pipeline options: %v", err)
+               return fmt.Errorf("Failed to convert pipeline options: %v", err)
        }
 
        // (2) Retrieve and install the staged packages.
        //
-       // Guard from concurrent artifact retrieval and installation,
-       // when called by child processes in a worker pool.
+       // No log.Fatalf() from here on, otherwise deferred cleanups will not 
be called!

Review Comment:
   Then why not simply put the parts that require defer cleanups in a function, 
instead of heavy handedly making all of main a new function with log fatal?
   
   In this case, I'd put the latter half into a function 
"launchWorkerProcesses" which handles the defer required portions, with the 
artifact/provision contract portions handled in a "setupBeamRequirements" or 
similarly named function. This would make the "phases" obvious in main.



##########
sdks/python/container/boot.go:
##########
@@ -73,13 +74,20 @@ const (
 )
 
 func main() {
+       if err := mainError(); err != nil {
+               log.Print(err)
+               os.Exit(1)
+       }
+}
+
+func mainError() error {

Review Comment:
   mainError isn't a good name. Convention would call it "tryMain", but that's 
also a bad name.
   If anything, it should be "launchSDKProcess" or similar which documents it's 
purpose.



##########
sdks/python/container/boot.go:
##########
@@ -137,46 +145,49 @@ func main() {
 
        options, err := provision.ProtoToJSON(info.GetPipelineOptions())
        if err != nil {
-               log.Fatalf("Failed to convert pipeline options: %v", err)
+               return fmt.Errorf("Failed to convert pipeline options: %v", err)
        }
 
        // (2) Retrieve and install the staged packages.
        //
-       // Guard from concurrent artifact retrieval and installation,
-       // when called by child processes in a worker pool.
+       // No log.Fatalf() from here on, otherwise deferred cleanups will not 
be called!

Review Comment:
   As an aside, the boot.go mains are long overdue for refactoring to be more 
testable, and share code with each other. They all duplicate very common 
portions leading to repeated work in implementing features/capabilities in any 
of the pre-amble.
   
   That's out of scope for this PR though.



##########
sdks/python/container/boot.go:
##########
@@ -73,13 +74,20 @@ const (
 )
 
 func main() {
+       if err := mainError(); err != nil {
+               log.Print(err)
+               os.Exit(1)
+       }
+}
+
+func mainError() error {
        flag.Parse()

Review Comment:
   Conventionally, flag.Parse() should remain in main(), rather some called out 
function.



##########
sdks/python/container/boot.go:
##########
@@ -200,24 +211,112 @@ func main() {
                }
        }
 
+       workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+
+       // Keep track of child PIDs for clean shutdown without zombies
+       childPids := struct {
+               v []int
+               canceled bool
+               mu sync.Mutex
+       } {v: make([]int, 0, len(workerIds))}
+
+       // Forward trapped signals to child process groups in order to 
terminate them gracefully and avoid zombies
+       go func() {
+               log.Printf("Received signal: %v", <-signalChannel)
+               childPids.mu.Lock()
+               childPids.canceled = true
+               for _, pid := range childPids.v {
+                       go func(pid int) {
+                               // This goroutine will be canceled if the main 
process exits before the 5 seconds
+                               // have elapsed, i.e., as soon as all 
subprocesses have returned from Wait().
+                               time.Sleep(5 * time.Second)
+                               if err := syscall.Kill(-pid, syscall.SIGKILL); 
err == nil {
+                                       log.Printf("Worker process %v did not 
respond, killed it.", pid)
+                               }
+                       }(pid)
+                       syscall.Kill(-pid, syscall.SIGTERM)

Review Comment:
   Q: Why is the negative pid being used here? I'd have expected simply `pid`.
   
   I haven't used syscall.Kill before, and it doesn't have any doc comment, 
https://pkg.go.dev/syscall#Kill, so it's very odd to me. If there's some magic 
here, please add a comment explaining the oddity.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to