robertwb commented on a change in pull request #15642:
URL: https://github.com/apache/beam/pull/15642#discussion_r730021554
##########
File path: sdks/go/pkg/beam/util/execx/exec.go
##########
@@ -24,10 +24,22 @@ import (
// Execute runs the program with the given arguments. It attaches stdio to the
// child process.
func Execute(prog string, args ...string) error {
+ return ExecuteEnv(nil, prog, args...)
+}
+
+// Execute runs the program with the given arguments. It attaches stdio to the
Review comment:
Done.
##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -1559,6 +1559,11 @@ message StandardProtocols {
// cores.
MULTI_CORE_BUNDLE_PROCESSING = 3 [(beam_urn) =
"beam:protocol:multi_core_bundle_processing:v1"];
+ // Indicates this SDK can cheaply spawn sibling workers (e.g. within the
+ // same container) to work around the fact that it cannot take advantage
Review comment:
Python will likely be the only case now, but this is a generic property
any language with similar limitations could take.
##########
File path: sdks/python/container/boot.go
##########
@@ -191,22 +191,31 @@ func main() {
os.Setenv("STATUS_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(info.GetStatusEndpoint()))
}
- if metadata := info.GetMetadata(); metadata != nil {
- if jobName, nameExists := metadata["job_name"]; nameExists {
- os.Setenv("JOB_NAME", jobName)
- }
- if jobID, idExists := metadata["job_id"]; idExists {
- os.Setenv("JOB_ID", jobID)
- }
- }
+ if metadata := info.GetMetadata(); metadata != nil {
+ if jobName, nameExists := metadata["job_name"]; nameExists {
Review comment:
go format did this, to line up with the indentation of the rest of the
file (which uses tabs!)
##########
File path: sdks/go/pkg/beam/util/execx/exec.go
##########
@@ -24,10 +24,22 @@ import (
// Execute runs the program with the given arguments. It attaches stdio to the
// child process.
func Execute(prog string, args ...string) error {
+ return ExecuteEnv(nil, prog, args...)
+}
+
+// Execute runs the program with the given arguments. It attaches stdio to the
+// child process.
+func ExecuteEnv(env map[string]string, prog string, args ...string) error {
cmd := exec.Command(prog, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
+ if env != nil {
Review comment:
I don't see any tests for exec. Also, the logic is simple enough here
that I'm fine with it being exercised as part of other tests.
##########
File path: model/fn-execution/src/main/proto/beam_provision_api.proto
##########
@@ -85,4 +85,8 @@ message ProvisionInfo {
// (optional) Runtime environment metadata that are static throughout the
// pipeline execution.
map<string, string> metadata = 13;
+
+ // (optional) If this environment supports SIBLING_WORKERS, used to
indicate
+ // the ids of sibling workers, if any, that should be started.
+ repeated string sibling_worker_ids = 14;
Review comment:
Good point. Clarified.
##########
File path: sdks/python/apache_beam/transforms/environments.py
##########
@@ -777,6 +777,10 @@ def python_sdk_capabilities():
return list(_python_sdk_capabilities_iter())
+def python_sdk_docker_capabilities():
Review comment:
The limitation is that it's currently only implemented for docker.
##########
File path: sdks/python/container/boot.go
##########
@@ -191,22 +191,31 @@ func main() {
os.Setenv("STATUS_API_SERVICE_DESCRIPTOR",
proto.MarshalTextString(info.GetStatusEndpoint()))
}
- if metadata := info.GetMetadata(); metadata != nil {
- if jobName, nameExists := metadata["job_name"]; nameExists {
- os.Setenv("JOB_NAME", jobName)
- }
- if jobID, idExists := metadata["job_id"]; idExists {
- os.Setenv("JOB_ID", jobID)
- }
- }
+ if metadata := info.GetMetadata(); metadata != nil {
+ if jobName, nameExists := metadata["job_name"]; nameExists {
+ os.Setenv("JOB_NAME", jobName)
+ }
+ if jobID, idExists := metadata["job_id"]; idExists {
+ os.Setenv("JOB_ID", jobID)
+ }
+ }
args := []string{
"-m",
sdkHarnessEntrypoint,
}
- log.Printf("Executing: python %v", strings.Join(args, " "))
- log.Fatalf("Python exited: %v", execx.Execute("python", args...))
+ workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+ var wg sync.WaitGroup
+ wg.Add(len(workerIds))
+ for _, workerId := range workerIds {
+ go func(workerId string) {
+ log.Printf("Executing: python %v", strings.Join(args, "
"))
Review comment:
It will be exactly the same as before--each of these workers will
connect to the runner harness (on the control, data, state channels) with their
unique worker ids. The only thing that changes is who starts them up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]