This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fd3f3974f7a [prism] Universal runner improvements. (#27570)
fd3f3974f7a is described below

commit fd3f3974f7ada8ee74609bd2edb30a0122864756
Author: Robert Burke <[email protected]>
AuthorDate: Thu Jul 20 11:17:00 2023 -0700

    [prism] Universal runner improvements. (#27570)
    
    Co-authored-by: lostluck <[email protected]>
---
 .../beam/runners/universal/runnerlib/execute.go    |  7 ++++-
 .../go/pkg/beam/runners/universal/runnerlib/job.go | 30 +++++++++++++++++++---
 sdks/go/pkg/beam/runners/universal/universal.go    |  1 +
 3 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index 68db9b0ee76..5b49d2f9473 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -41,7 +41,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
        presult := &universalPipelineResult{}
 
        bin := opt.Worker
-       if bin == "" {
+       if bin == "" && !opt.Loopback {
                if self, ok := IsWorkerCompatibleBinary(); ok {
                        bin = self
                        log.Infof(ctx, "Using running binary as worker binary: 
'%v'", bin)
@@ -56,6 +56,11 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, 
endpoint string, opt *JobO
 
                        bin = worker
                }
+       } else if opt.Loopback {
+               // TODO(https://github.com/apache/beam/issues/27569: determine 
the canonical location for Beam temp files.
+               // In loopback mode, the binary is unused, so we can avoid an 
unnecessary compile step.
+               f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*")
+               bin = f.Name()
        } else {
                log.Infof(ctx, "Using specified worker binary: '%v'", bin)
        }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index daa6896da40..5752b33892b 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -39,15 +39,17 @@ type JobOptions struct {
        // Experiments are additional experiments.
        Experiments []string
 
-       // TODO(herohde) 3/17/2018: add further parametrization as needed
-
        // Worker is the worker binary override.
        Worker string
 
-       // RetainDocker is an option to pass to the runner.
+       // RetainDocker is an option to pass to the runner indicating the 
docker containers should be cached.
        RetainDocker bool
 
+       // Indicates a limit on parallelism the runner should impose.
        Parallelism int
+
+       // Loopback indicates this job is running in loopback mode and will 
reconnect to the local process.
+       Loopback bool
 }
 
 // Prepare prepares a job to the given job service. It returns the preparation 
id
@@ -101,10 +103,17 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                return errors.Wrap(err, "failed to get job stream")
        }
 
+       mostRecentError := errors.New("<no error received, see runner logs>")
+       var errReceived, jobFailed bool
+
        for {
                msg, err := stream.Recv()
                if err != nil {
                        if err == io.EOF {
+                               if jobFailed {
+                                       // Connection finished with a failed 
status, so produce what we have.
+                                       return errors.Errorf("job %v 
failed:\n%w", jobID, mostRecentError)
+                               }
                                return nil
                        }
                        return err
@@ -120,7 +129,11 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                        case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
                                return nil
                        case jobpb.JobState_FAILED:
-                               return errors.Errorf("job %v failed", jobID)
+                               jobFailed = true
+                               if errReceived {
+                                       return errors.Errorf("job %v 
failed:\n%w", jobID, mostRecentError)
+                               }
+                               // Otherwise, wait for at least one error log 
from the runner, or the connection to close.
                        }
 
                case msg.GetMessageResponse() != nil:
@@ -129,6 +142,15 @@ func WaitForCompletion(ctx context.Context, client 
jobpb.JobServiceClient, jobID
                        text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), 
resp.GetMessageId(), resp.GetMessageText())
                        log.Output(ctx, messageSeverity(resp.GetImportance()), 
1, text)
 
+                       if resp.GetImportance() >= 
jobpb.JobMessage_JOB_MESSAGE_ERROR {
+                               errReceived = true
+                               mostRecentError = 
errors.New(resp.GetMessageText())
+
+                               if jobFailed {
+                                       return errors.Errorf("job %v 
failed:\n%w", jobID, mostRecentError)
+                               }
+                       }
+
                default:
                        return errors.Errorf("unexpected job update: %v", 
proto.MarshalTextString(msg))
                }
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index 299a64acdd6..8af9e91e1e1 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -101,6 +101,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                Worker:       *jobopts.WorkerBinary,
                RetainDocker: *jobopts.RetainDockerContainers,
                Parallelism:  *jobopts.Parallelism,
+               Loopback:     jobopts.IsLoopback(),
        }
        return runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
 }

Reply via email to