This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch includeFailures
in repository https://gitbox.apache.org/repos/asf/beam.git

commit be8a7b54dbbcca5e2fc1e33180c1d66beaca8718
Author: Robert Burke <[email protected]>
AuthorDate: Wed Jun 28 16:56:54 2023 +0200

    Maintain job on requirement failure.
---
 .../pkg/beam/runners/prism/internal/jobservices/job.go  |  7 ++++++-
 .../runners/prism/internal/jobservices/management.go    | 17 ++++++++++-------
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index 4ac37c5db59..ea7b09c8441 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -31,6 +31,7 @@ import (
        "strings"
        "sync"
        "sync/atomic"
+       "time"
 
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        jobpb 
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
@@ -77,6 +78,8 @@ type Job struct {
        msgs           []string
        stateIdx       int
        state          atomic.Value // jobpb.JobState_Enum
+       stateTime      time.Time
+       failureErr     error
 
        // Context used to terminate this job.
        RootCtx  context.Context
@@ -134,6 +137,7 @@ func (j *Job) SendMsg(msg string) {
 func (j *Job) sendState(state jobpb.JobState_Enum) {
        j.streamCond.L.Lock()
        defer j.streamCond.L.Unlock()
+       j.stateTime = time.Now()
        j.stateIdx++
        j.state.Store(state)
        j.streamCond.Broadcast()
@@ -155,6 +159,7 @@ func (j *Job) Done() {
 }
 
 // Failed indicates that the job completed unsuccessfully.
-func (j *Job) Failed() {
+func (j *Job) Failed(err error) {
        j.sendState(jobpb.JobState_FAILED)
+       j.failureErr = err
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 2b077c3a661..f65d2eb070f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -25,6 +25,7 @@ import (
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
        "golang.org/x/exp/slog"
+       "google.golang.org/protobuf/types/known/timestamppb"
 )
 
 func (s *Server) nextId() string {
@@ -81,8 +82,10 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (*jo
 
        // Queue initial state of the job.
        job.state.Store(jobpb.JobState_STOPPED)
+       s.jobs[job.key] = job
 
        if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
+               job.Failed(err)
                slog.Error("unable to run job", slog.String("error", 
err.Error()), slog.String("jobname", req.GetJobName()))
                return nil, err
        }
@@ -136,11 +139,12 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (*jo
                }
        }
        if len(errs) > 0 {
-               err := &joinError{errs: errs}
-               slog.Error("unable to run job", slog.String("cause", 
"unimplemented features"), slog.String("jobname", req.GetJobName()), 
slog.String("errors", err.Error()))
-               return nil, fmt.Errorf("found %v uses of features unimplemented 
in prism in job %v: %v", len(errs), req.GetJobName(), err)
+               jErr := &joinError{errs: errs}
+               slog.Error("unable to run job", slog.String("cause", 
"unimplemented features"), slog.String("jobname", req.GetJobName()), 
slog.String("errors", jErr.Error()))
+               err := fmt.Errorf("found %v uses of features unimplemented in 
prism in job %v: %v", len(errs), req.GetJobName(), jErr)
+               job.Failed(err)
+               return nil, err
        }
-       s.jobs[job.key] = job
        return &jobpb.PrepareJobResponse{
                PreparationId:       job.key,
                StagingSessionToken: job.key,
@@ -178,8 +182,6 @@ func (s *Server) GetMessageStream(req 
*jobpb.JobMessagesRequest, stream jobpb.Jo
        curMsg := job.minMsg
        curState := job.stateIdx
 
-       stream.Context()
-
        state := job.state.Load().(jobpb.JobState_Enum)
        for {
                for (curMsg >= job.maxMsg || len(job.msgs) == 0) && curState > 
job.stateIdx {
@@ -283,6 +285,7 @@ func (s *Server) GetState(_ context.Context, req 
*jobpb.GetJobStateRequest) (*jo
                return nil, fmt.Errorf("job with id %v not found", 
req.GetJobId())
        }
        return &jobpb.JobStateEvent{
-               State: j.state.Load().(jobpb.JobState_Enum),
+               State:     j.state.Load().(jobpb.JobState_Enum),
+               Timestamp: timestamppb.New(j.stateTime),
        }, nil
 }

Reply via email to