This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch includeFailures in repository https://gitbox.apache.org/repos/asf/beam.git
commit be8a7b54dbbcca5e2fc1e33180c1d66beaca8718 Author: Robert Burke <[email protected]> AuthorDate: Wed Jun 28 16:56:54 2023 +0200 Maintain job on requirement failure. --- .../pkg/beam/runners/prism/internal/jobservices/job.go | 7 ++++++- .../runners/prism/internal/jobservices/management.go | 17 ++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 4ac37c5db59..ea7b09c8441 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -31,6 +31,7 @@ import ( "strings" "sync" "sync/atomic" + "time" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" @@ -77,6 +78,8 @@ type Job struct { msgs []string stateIdx int state atomic.Value // jobpb.JobState_Enum + stateTime time.Time + failureErr error // Context used to terminate this job. RootCtx context.Context @@ -134,6 +137,7 @@ func (j *Job) SendMsg(msg string) { func (j *Job) sendState(state jobpb.JobState_Enum) { j.streamCond.L.Lock() defer j.streamCond.L.Unlock() + j.stateTime = time.Now() j.stateIdx++ j.state.Store(state) j.streamCond.Broadcast() @@ -155,6 +159,7 @@ func (j *Job) Done() { } // Failed indicates that the job completed unsuccessfully. -func (j *Job) Failed() { +func (j *Job) Failed(err error) { j.sendState(jobpb.JobState_FAILED) + j.failureErr = err } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 2b077c3a661..f65d2eb070f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -25,6 +25,7 @@ import ( pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "golang.org/x/exp/slog" + "google.golang.org/protobuf/types/known/timestamppb" ) func (s *Server) nextId() string { @@ -81,8 +82,10 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo // Queue initial state of the job. job.state.Store(jobpb.JobState_STOPPED) + s.jobs[job.key] = job if err := isSupported(job.Pipeline.GetRequirements()); err != nil { + job.Failed(err) slog.Error("unable to run job", slog.String("error", err.Error()), slog.String("jobname", req.GetJobName())) return nil, err } @@ -136,11 +139,12 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } } if len(errs) > 0 { - err := &joinError{errs: errs} - slog.Error("unable to run job", slog.String("cause", "unimplemented features"), slog.String("jobname", req.GetJobName()), slog.String("errors", err.Error())) - return nil, fmt.Errorf("found %v uses of features unimplemented in prism in job %v: %v", len(errs), req.GetJobName(), err) + jErr := &joinError{errs: errs} + slog.Error("unable to run job", slog.String("cause", "unimplemented features"), slog.String("jobname", req.GetJobName()), slog.String("errors", jErr.Error())) + err := fmt.Errorf("found %v uses of features unimplemented in prism in job %v: %v", len(errs), req.GetJobName(), jErr) + job.Failed(err) + return nil, err } - s.jobs[job.key] = job return &jobpb.PrepareJobResponse{ PreparationId: job.key, StagingSessionToken: job.key, @@ -178,8 +182,6 @@ func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.Jo curMsg := job.minMsg curState := job.stateIdx - stream.Context() - state := job.state.Load().(jobpb.JobState_Enum) for { for (curMsg >= job.maxMsg || len(job.msgs) == 0) && curState > job.stateIdx { @@ -283,6 +285,7 @@ func (s *Server) GetState(_ context.Context, req *jobpb.GetJobStateRequest) (*jo return nil, fmt.Errorf("job with id %v not found", req.GetJobId()) } return &jobpb.JobStateEvent{ - State: j.state.Load().(jobpb.JobState_Enum), + State: j.state.Load().(jobpb.JobState_Enum), + Timestamp: timestamppb.New(j.stateTime), }, nil }
