damccorm commented on a change in pull request #16833:
URL: https://github.com/apache/beam/pull/16833#discussion_r805003478
##########
File path: sdks/go/container/boot.go
##########
@@ -99,60 +87,87 @@ func main() {
log.Fatalf("Failed to retrieve staged files: %v", err)
}
- // TODO(BEAM-13647): Remove legacy hack once aged out.
+ name, err := getGoWorkerArtifactName(artifacts)
+ if err != nil {
+ log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
+ }
+
+ // (3) The persist dir may be on a noexec volume, so we must
+ // copy the binary to a different location to execute.
+ const prog = "/bin/worker"
+ if err := copyExe(filepath.Join(dir, name), prog); err != nil {
+ log.Fatalf("Failed to copy worker binary: %v", err)
+ }
+
+ args := []string{
+ "--worker=true",
+ "--id=" + *id,
+ "--logging_endpoint=" + *loggingEndpoint,
+ "--control_endpoint=" + *controlEndpoint,
+ "--semi_persist_dir=" + *semiPersistDir,
+ "--options=" + options,
+ }
+ if info.GetStatusEndpoint() != nil {
+ args = append(args,
"--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+ }
+
+ log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+}
+
+func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string,
error) {
Review comment:
This diff is pretty rough 😢 but lines 95-115 are unchanged, most of the
stuff in this function was just directly above it
##########
File path: sdks/go/container/boot.go
##########
@@ -99,60 +87,87 @@ func main() {
log.Fatalf("Failed to retrieve staged files: %v", err)
}
- // TODO(BEAM-13647): Remove legacy hack once aged out.
+ name, err := getGoWorkerArtifactName(artifacts)
+ if err != nil {
+ log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
+ }
+
+ // (3) The persist dir may be on a noexec volume, so we must
+ // copy the binary to a different location to execute.
+ const prog = "/bin/worker"
+ if err := copyExe(filepath.Join(dir, name), prog); err != nil {
+ log.Fatalf("Failed to copy worker binary: %v", err)
+ }
+
+ args := []string{
+ "--worker=true",
+ "--id=" + *id,
+ "--logging_endpoint=" + *loggingEndpoint,
+ "--control_endpoint=" + *controlEndpoint,
+ "--semi_persist_dir=" + *semiPersistDir,
+ "--options=" + options,
+ }
+ if info.GetStatusEndpoint() != nil {
+ args = append(args,
"--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+ }
+
+ log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+}
+
+func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string,
error) {
const worker = "worker"
name := worker
switch len(artifacts) {
case 0:
- log.Fatal("No artifacts staged")
+ return "", errors.New(fmt.Sprintf("No artifacts staged"))
case 1:
name, _ = artifact.MustExtractFilePayload(artifacts[0])
+ return name, nil
Review comment:
One question I had while refactoring this: Does this case actually need
to be split out on its own or should we just lump it in to the default case (at
that point, we could just get rid of the switch and do a simple if on `len ==
0`).
This does something slightly different than the default case (it doesn't
check if the role urn is URNGoWorkerBinaryRole or do the fallback check of if
the name equals "worker"), but I'm not sure if that behavior difference is
actually intentional. @lostluck any idea if this actually needs to be its own
case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]