robertwb commented on a change in pull request #15642:
URL: https://github.com/apache/beam/pull/15642#discussion_r730021554



##########
File path: sdks/go/pkg/beam/util/execx/exec.go
##########
@@ -24,10 +24,22 @@ import (
 // Execute runs the program with the given arguments. It attaches stdio to the
 // child process.
 func Execute(prog string, args ...string) error {
+       return ExecuteEnv(nil, prog, args...)
+}
+
+// Execute runs the program with the given arguments. It attaches stdio to the

Review comment:
       Done.

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -1559,6 +1559,11 @@ message StandardProtocols {
     // cores.
     MULTI_CORE_BUNDLE_PROCESSING = 3 [(beam_urn) = 
"beam:protocol:multi_core_bundle_processing:v1"];
 
+    // Indicates this SDK can cheaply spawn sibling workers (e.g. within the
+    // same container) to work around the fact that it cannot take advantage

Review comment:
       Python will likely be the only case now, but this is a generic property 
any language with similar limitations could take. 

##########
File path: sdks/python/container/boot.go
##########
@@ -191,22 +191,31 @@ func main() {
                os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(info.GetStatusEndpoint()))
        }
 
-    if metadata := info.GetMetadata(); metadata != nil {
-        if jobName, nameExists := metadata["job_name"]; nameExists {
-            os.Setenv("JOB_NAME", jobName)
-        }
-        if jobID, idExists := metadata["job_id"]; idExists {
-            os.Setenv("JOB_ID", jobID)
-        }
-    }
+       if metadata := info.GetMetadata(); metadata != nil {
+               if jobName, nameExists := metadata["job_name"]; nameExists {

Review comment:
       go format did this, to line up with the indentation of the rest of the 
file (which uses tabs!)

##########
File path: sdks/go/pkg/beam/util/execx/exec.go
##########
@@ -24,10 +24,22 @@ import (
 // Execute runs the program with the given arguments. It attaches stdio to the
 // child process.
 func Execute(prog string, args ...string) error {
+       return ExecuteEnv(nil, prog, args...)
+}
+
+// Execute runs the program with the given arguments. It attaches stdio to the
+// child process.
+func ExecuteEnv(env map[string]string, prog string, args ...string) error {
        cmd := exec.Command(prog, args...)
        cmd.Stdin = os.Stdin
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
+       if env != nil {

Review comment:
       I don't see any tests for exec. Also, the logic is simple enough here 
that I'm fine with it being exercised as part of other tests.

##########
File path: model/fn-execution/src/main/proto/beam_provision_api.proto
##########
@@ -85,4 +85,8 @@ message ProvisionInfo {
     // (optional) Runtime environment metadata that are static throughout the
     // pipeline execution.
     map<string, string> metadata = 13;
+
+    // (optional) If this environment supports SIBLING_WORKERS, used to 
indicate
+    // the ids of sibling workers, if any, that should be started.
+    repeated string sibling_worker_ids = 14;

Review comment:
       Good point. Clarified.

##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -777,6 +777,10 @@ def python_sdk_capabilities():
   return list(_python_sdk_capabilities_iter())
 
 
+def python_sdk_docker_capabilities():

Review comment:
       The limitation is that it's currently only implemented for docker. 

##########
File path: sdks/python/container/boot.go
##########
@@ -191,22 +191,31 @@ func main() {
                os.Setenv("STATUS_API_SERVICE_DESCRIPTOR", 
proto.MarshalTextString(info.GetStatusEndpoint()))
        }
 
-    if metadata := info.GetMetadata(); metadata != nil {
-        if jobName, nameExists := metadata["job_name"]; nameExists {
-            os.Setenv("JOB_NAME", jobName)
-        }
-        if jobID, idExists := metadata["job_id"]; idExists {
-            os.Setenv("JOB_ID", jobID)
-        }
-    }
+       if metadata := info.GetMetadata(); metadata != nil {
+               if jobName, nameExists := metadata["job_name"]; nameExists {
+                       os.Setenv("JOB_NAME", jobName)
+               }
+               if jobID, idExists := metadata["job_id"]; idExists {
+                       os.Setenv("JOB_ID", jobID)
+               }
+       }
 
        args := []string{
                "-m",
                sdkHarnessEntrypoint,
        }
-       log.Printf("Executing: python %v", strings.Join(args, " "))
 
-       log.Fatalf("Python exited: %v", execx.Execute("python", args...))
+       workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+       var wg sync.WaitGroup
+       wg.Add(len(workerIds))
+       for _, workerId := range workerIds {
+               go func(workerId string) {
+                       log.Printf("Executing: python %v", strings.Join(args, " 
"))

Review comment:
       It will be exactly the same as before--each of these workers will 
connect to the runner harness (on the control, data, state channels) with their 
unique worker ids. The only thing that changes is who starts them up. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to