[
https://issues.apache.org/jira/browse/BEAM-14487?focusedWorklogId=772546&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772546
]
ASF GitHub Bot logged work on BEAM-14487:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/May/22 17:58
Start Date: 19/May/22 17:58
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17710:
URL: https://github.com/apache/beam/pull/17710#discussion_r877366617
##########
sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go:
##########
@@ -240,29 +240,46 @@ func WaitForCompletion(ctx context.Context, client
*df.Service, project, region,
return errors.Wrap(err, "failed to get job")
}
- switch j.CurrentState {
- case "JOB_STATE_DONE":
- log.Info(ctx, "Job succeeded!")
- return nil
-
- case "JOB_STATE_CANCELLED":
- log.Info(ctx, "Job cancelled")
+ terminal, msg, err := currentStateMessage(j.CurrentState, jobID)
+ if err != nil {
+ return err
+ }
+ log.Infof(ctx, msg)
+ if terminal {
return nil
-
- case "JOB_STATE_FAILED":
- return errors.Errorf("job %s failed", jobID)
-
- case "JOB_STATE_RUNNING":
- log.Info(ctx, "Job still running ...")
-
- default:
- log.Infof(ctx, "Job state: %v ...", j.CurrentState)
}
time.Sleep(30 * time.Second)
}
}
+// currentStateMessage indicates if the state is terminal, and provides a
message to log, or an error.
+// Errors are always terminal.
+func currentStateMessage(currentState, jobID string) (bool, string, error) {
+ switch currentState {
+ // Add all Terminal Success stats here.
+ case "JOB_STATE_DONE", "JOB_STATE_CANCELLED", "JOB_STATE_DRAINED",
"JOB_STATE_UPDATED":
+ var state string
+ switch currentState {
+ case "JOB_STATE_DONE":
+ state = "succeeded"
Review Comment:
```suggestion
state = "succeeded!"
```
Totally optional (and it would require a small test tweak too), but I liked
the exclamation point! It brought me some small joy/excitement haha
##########
sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go:
##########
@@ -157,3 +158,39 @@ func TestValidateWorkerSettings(t *testing.T) {
})
}
}
+
+func TestCurrentStateMessage(t *testing.T) {
+ tests := []struct {
+ state string
+ term bool
+ want string
+ wantErr error
+ }{
+ {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded",
term: true},
Review Comment:
```suggestion
{state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded!",
term: true},
```
Putting this here in case you want to quick commit both 😉
Issue Time Tracking
-------------------
Worklog Id: (was: 772546)
Time Spent: 40m (was: 0.5h)
> Make Drain a terminal state.
> ----------------------------
>
> Key: BEAM-14487
> URL: https://issues.apache.org/jira/browse/BEAM-14487
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Robert Burke
> Priority: P2
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Until recently, there have been no unbounded PCollections in the Go SDK, and
> only Dataflow supports drain.
> Either way, drain is a terminal state, and should cause any monitoring
> program to halt, rather query perpetually.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)