[ 
https://issues.apache.org/jira/browse/BEAM-14487?focusedWorklogId=772546&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772546
 ]

ASF GitHub Bot logged work on BEAM-14487:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/22 17:58
            Start Date: 19/May/22 17:58
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17710:
URL: https://github.com/apache/beam/pull/17710#discussion_r877366617


##########
sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go:
##########
@@ -240,29 +240,46 @@ func WaitForCompletion(ctx context.Context, client 
*df.Service, project, region,
                        return errors.Wrap(err, "failed to get job")
                }
 
-               switch j.CurrentState {
-               case "JOB_STATE_DONE":
-                       log.Info(ctx, "Job succeeded!")
-                       return nil
-
-               case "JOB_STATE_CANCELLED":
-                       log.Info(ctx, "Job cancelled")
+               terminal, msg, err := currentStateMessage(j.CurrentState, jobID)
+               if err != nil {
+                       return err
+               }
+               log.Infof(ctx, msg)
+               if terminal {
                        return nil
-
-               case "JOB_STATE_FAILED":
-                       return errors.Errorf("job %s failed", jobID)
-
-               case "JOB_STATE_RUNNING":
-                       log.Info(ctx, "Job still running ...")
-
-               default:
-                       log.Infof(ctx, "Job state: %v ...", j.CurrentState)
                }
 
                time.Sleep(30 * time.Second)
        }
 }
 
+// currentStateMessage indicates if the state is terminal, and provides a 
message to log, or an error.
+// Errors are always terminal.
+func currentStateMessage(currentState, jobID string) (bool, string, error) {
+       switch currentState {
+       // Add all Terminal Success stats here.
+       case "JOB_STATE_DONE", "JOB_STATE_CANCELLED", "JOB_STATE_DRAINED", 
"JOB_STATE_UPDATED":
+               var state string
+               switch currentState {
+               case "JOB_STATE_DONE":
+                       state = "succeeded"

Review Comment:
   ```suggestion
                        state = "succeeded!"
   ```
   
   Totally optional (and it would require a small test tweak too), but I liked 
the exclamation point! It brought me some small joy/excitement haha



##########
sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go:
##########
@@ -157,3 +158,39 @@ func TestValidateWorkerSettings(t *testing.T) {
                })
        }
 }
+
+func TestCurrentStateMessage(t *testing.T) {
+       tests := []struct {
+               state   string
+               term    bool
+               want    string
+               wantErr error
+       }{
+               {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded", 
term: true},

Review Comment:
   ```suggestion
                {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded!", 
term: true},
   ```
   
   Putting this here in case you want to quick commit both 😉 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772546)
    Time Spent: 40m  (was: 0.5h)

> Make Drain a terminal state.
> ----------------------------
>
>                 Key: BEAM-14487
>                 URL: https://issues.apache.org/jira/browse/BEAM-14487
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Robert Burke
>            Priority: P2
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Until recently, there have been no unbounded PCollections in the Go SDK, and 
> only Dataflow supports drain. 
> Either way, drain is a terminal state, and should cause any monitoring 
> program to halt, rather query perpetually.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to