This is an automated email from the ASF dual-hosted git repository.
riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fd3f3974f7a [prism] Universal runner improvements. (#27570)
fd3f3974f7a is described below
commit fd3f3974f7ada8ee74609bd2edb30a0122864756
Author: Robert Burke <[email protected]>
AuthorDate: Thu Jul 20 11:17:00 2023 -0700
[prism] Universal runner improvements. (#27570)
Co-authored-by: lostluck <[email protected]>
---
.../beam/runners/universal/runnerlib/execute.go | 7 ++++-
.../go/pkg/beam/runners/universal/runnerlib/job.go | 30 +++++++++++++++++++---
sdks/go/pkg/beam/runners/universal/universal.go | 1 +
3 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index 68db9b0ee76..5b49d2f9473 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -41,7 +41,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
presult := &universalPipelineResult{}
bin := opt.Worker
- if bin == "" {
+ if bin == "" && !opt.Loopback {
if self, ok := IsWorkerCompatibleBinary(); ok {
bin = self
log.Infof(ctx, "Using running binary as worker binary:
'%v'", bin)
@@ -56,6 +56,11 @@ func Execute(ctx context.Context, p *pipepb.Pipeline,
endpoint string, opt *JobO
bin = worker
}
+ } else if opt.Loopback {
+ // TODO(https://github.com/apache/beam/issues/27569: determine
the canonical location for Beam temp files.
+ // In loopback mode, the binary is unused, so we can avoid an
unnecessary compile step.
+ f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*")
+ bin = f.Name()
} else {
log.Infof(ctx, "Using specified worker binary: '%v'", bin)
}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index daa6896da40..5752b33892b 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -39,15 +39,17 @@ type JobOptions struct {
// Experiments are additional experiments.
Experiments []string
- // TODO(herohde) 3/17/2018: add further parametrization as needed
-
// Worker is the worker binary override.
Worker string
- // RetainDocker is an option to pass to the runner.
+ // RetainDocker is an option to pass to the runner indicating the
docker containers should be cached.
RetainDocker bool
+ // Indicates a limit on parallelism the runner should impose.
Parallelism int
+
+ // Loopback indicates this job is running in loopback mode and will
reconnect to the local process.
+ Loopback bool
}
// Prepare prepares a job to the given job service. It returns the preparation
id
@@ -101,10 +103,17 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
return errors.Wrap(err, "failed to get job stream")
}
+ mostRecentError := errors.New("<no error received, see runner logs>")
+ var errReceived, jobFailed bool
+
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
+ if jobFailed {
+ // Connection finished with a failed
status, so produce what we have.
+ return errors.Errorf("job %v
failed:\n%w", jobID, mostRecentError)
+ }
return nil
}
return err
@@ -120,7 +129,11 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
return nil
case jobpb.JobState_FAILED:
- return errors.Errorf("job %v failed", jobID)
+ jobFailed = true
+ if errReceived {
+ return errors.Errorf("job %v
failed:\n%w", jobID, mostRecentError)
+ }
+ // Otherwise, wait for at least one error log
from the runner, or the connection to close.
}
case msg.GetMessageResponse() != nil:
@@ -129,6 +142,15 @@ func WaitForCompletion(ctx context.Context, client
jobpb.JobServiceClient, jobID
text := fmt.Sprintf("%v (%v): %v", resp.GetTime(),
resp.GetMessageId(), resp.GetMessageText())
log.Output(ctx, messageSeverity(resp.GetImportance()),
1, text)
+ if resp.GetImportance() >=
jobpb.JobMessage_JOB_MESSAGE_ERROR {
+ errReceived = true
+ mostRecentError =
errors.New(resp.GetMessageText())
+
+ if jobFailed {
+ return errors.Errorf("job %v
failed:\n%w", jobID, mostRecentError)
+ }
+ }
+
default:
return errors.Errorf("unexpected job update: %v",
proto.MarshalTextString(msg))
}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go
b/sdks/go/pkg/beam/runners/universal/universal.go
index 299a64acdd6..8af9e91e1e1 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -101,6 +101,7 @@ func Execute(ctx context.Context, p *beam.Pipeline)
(beam.PipelineResult, error)
Worker: *jobopts.WorkerBinary,
RetainDocker: *jobopts.RetainDockerContainers,
Parallelism: *jobopts.Parallelism,
+ Loopback: jobopts.IsLoopback(),
}
return runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
}