lostluck commented on code in PR #31028:
URL: https://github.com/apache/beam/pull/31028#discussion_r1571072697
##########
sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:
##########
@@ -243,21 +243,21 @@ func (s *Server) Run(ctx context.Context, req
*jobpb.RunJobRequest) (*jobpb.RunJ
// Otherwise, returns nil if Job does not exist or the Job's existing state as
part of the CancelJobResponse.
func (s *Server) Cancel(_ context.Context, req *jobpb.CancelJobRequest)
(*jobpb.CancelJobResponse, error) {
s.mu.Lock()
+ defer s.mu.Unlock()
job, ok := s.jobs[req.GetJobId()]
- s.mu.Unlock()
if !ok {
return nil, nil
}
state := job.state.Load().(jobpb.JobState_Enum)
switch state {
- case jobpb.JobState_CANCELLED, jobpb.JobState_DONE,
jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED:
+ case jobpb.JobState_CANCELLED, jobpb.JobState_DONE,
jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED,
jobpb.JobState_STOPPED:
Review Comment:
STOPPED is unfortunately not a Terminal State, it's a badly named state for
a job "that has yet to start". Essentially, jobs are initially in "stopped",
after being submitted to the runner, and move to the running state once data
processing can begin.
See
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/model/jobmanagement_v1/beam_job_api.pb.go#L111
A job is cancelable if it's not already in a terminal state. STOPPED is not
terminal.
Was this added to resolve a bug or similar?
##########
sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:
##########
@@ -243,21 +243,21 @@ func (s *Server) Run(ctx context.Context, req
*jobpb.RunJobRequest) (*jobpb.RunJ
// Otherwise, returns nil if Job does not exist or the Job's existing state as
part of the CancelJobResponse.
func (s *Server) Cancel(_ context.Context, req *jobpb.CancelJobRequest)
(*jobpb.CancelJobResponse, error) {
s.mu.Lock()
+ defer s.mu.Unlock()
job, ok := s.jobs[req.GetJobId()]
- s.mu.Unlock()
if !ok {
return nil, nil
}
state := job.state.Load().(jobpb.JobState_Enum)
switch state {
- case jobpb.JobState_CANCELLED, jobpb.JobState_DONE,
jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED:
+ case jobpb.JobState_CANCELLED, jobpb.JobState_DONE,
jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED,
jobpb.JobState_STOPPED:
// Already at terminal state.
return &jobpb.CancelJobResponse{
State: state,
}, nil
}
job.SendMsg("canceling " + job.String())
- job.Canceling()
+ job.Canceled()
Review Comment:
Nit: A job can move straight to canceled, but only if all resources are
already cleaned up (SDKs, docker environments, etc). Is that true at this point?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]