damccorm commented on code in PR #24767:
URL: https://github.com/apache/beam/pull/24767#discussion_r1055844810


##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -71,7 +72,6 @@ var (
        tempLocation           = flag.String("temp_location", "", "Temp 
location (optional)")
        machineType            = flag.String("worker_machine_type", "", "GCE 
machine type (optional)")
        minCPUPlatform         = flag.String("min_cpu_platform", "", "GCE 
minimum cpu platform (optional)")
-       workerJar              = flag.String("dataflow_worker_jar", "", 
"Dataflow worker jar (optional)")

Review Comment:
   This is technically breaking even if it previously no-oped (now it will 
fail) - could we call it out in the release notes as well?



##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -282,24 +285,51 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
        hooks.SerializeHooksToOptions()
 
        experiments := jobopts.GetExperiments()
-       // Always use runner v2, unless set already.
-       var v2set, portaSubmission bool
+       // Ensure that we enable the same set of experiments across all SDKs
+       // for runner v2.
+       var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
        for _, e := range experiments {
-               if strings.Contains(e, "use_runner_v2") || strings.Contains(e, 
"use_unified_worker") {
+               if strings.Contains(e, "beam_fn_api") {
+                       fnApiSet = true
+               }
+               if strings.Contains(e, "use_runner_v2") {
                        v2set = true
                }
+               if strings.Contains(e, "use_unified_worker") {
+                       uwSet = true
+               }
                if strings.Contains(e, "use_portable_job_submission") {
                        portaSubmission = true
                }
+               if strings.Contains(e, "disable_runner_v2") || 
strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, 
"disable_prime_runner_v2") {
+                       return nil, errors.New("disabling runner v2 is now 
longer supported as of 2.45.0+")

Review Comment:
   Maybe add a little more information like: `Detected one of the following 
experiments: disable_runner_v2 | disable_runner_v2_until_2023 | 
disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam 
version 2.45.0+`
   
   (also, typo nit: `now` -> `no`)



##########
sdks/go/pkg/beam/runners/dataflow/dataflow_test.go:
##########
@@ -49,18 +49,91 @@ func TestGetJobOptions(t *testing.T) {
        *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
        *jobopts.JobName = "testJob"
 
-       opts, err := getJobOptions(context.Background())
+       opts, err := getJobOptions(context.Background(), false)
        if err != nil {
                t.Fatalf("getJobOptions() returned error %q, want %q", err, 
"nil")
        }
+       if got, want := opts.Streaming, false; got != want {
+               t.Errorf("getJobOptions().Streaming = %t, want %t", got, want)
+       }
+       if got, want := opts.Name, "testJob"; got != want {
+               t.Errorf("getJobOptions().Name = %q, want %q", got, want)
+       }
+       if got, want := len(opts.Experiments), 5; got != want {
+               t.Errorf("len(getJobOptions().Experiments) = %q, want %q", got, 
want)
+       } else {
+               sort.Strings(opts.Experiments)
+               expectedExperiments := []string{"beam_fn_api", 
"min_cpu_platform=testPlatform", "use_portable_job_submission", 
"use_runner_v2", "use_unified_worker"}
+               for i := 0; i < 3; i++ {
+                       if got, want := opts.Experiments[i], 
expectedExperiments[i]; got != want {
+                               t.Errorf("getJobOptions().Experiments = %q, 
want %q", got, want)

Review Comment:
   Same comment applies below



##########
sdks/go/pkg/beam/runners/dataflow/dataflow_test.go:
##########
@@ -49,18 +49,91 @@ func TestGetJobOptions(t *testing.T) {
        *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
        *jobopts.JobName = "testJob"
 
-       opts, err := getJobOptions(context.Background())
+       opts, err := getJobOptions(context.Background(), false)
        if err != nil {
                t.Fatalf("getJobOptions() returned error %q, want %q", err, 
"nil")
        }
+       if got, want := opts.Streaming, false; got != want {
+               t.Errorf("getJobOptions().Streaming = %t, want %t", got, want)
+       }
+       if got, want := opts.Name, "testJob"; got != want {
+               t.Errorf("getJobOptions().Name = %q, want %q", got, want)
+       }
+       if got, want := len(opts.Experiments), 5; got != want {
+               t.Errorf("len(getJobOptions().Experiments) = %q, want %q", got, 
want)
+       } else {
+               sort.Strings(opts.Experiments)
+               expectedExperiments := []string{"beam_fn_api", 
"min_cpu_platform=testPlatform", "use_portable_job_submission", 
"use_runner_v2", "use_unified_worker"}
+               for i := 0; i < 3; i++ {
+                       if got, want := opts.Experiments[i], 
expectedExperiments[i]; got != want {
+                               t.Errorf("getJobOptions().Experiments = %q, 
want %q", got, want)

Review Comment:
   ```suggestion
                                t.Errorf("getJobOptions().Experiments[%s] = %q, 
want %q", i, got, want)
   ```



##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -282,24 +285,51 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
        hooks.SerializeHooksToOptions()
 
        experiments := jobopts.GetExperiments()
-       // Always use runner v2, unless set already.
-       var v2set, portaSubmission bool
+       // Ensure that we enable the same set of experiments across all SDKs
+       // for runner v2.
+       var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
        for _, e := range experiments {
-               if strings.Contains(e, "use_runner_v2") || strings.Contains(e, 
"use_unified_worker") {
+               if strings.Contains(e, "beam_fn_api") {
+                       fnApiSet = true
+               }
+               if strings.Contains(e, "use_runner_v2") {
                        v2set = true
                }
+               if strings.Contains(e, "use_unified_worker") {
+                       uwSet = true
+               }
                if strings.Contains(e, "use_portable_job_submission") {
                        portaSubmission = true
                }
+               if strings.Contains(e, "disable_runner_v2") || 
strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, 
"disable_prime_runner_v2") {
+                       return nil, errors.New("disabling runner v2 is now 
longer supported as of 2.45.0+")
+               }
+       }
+       // Enable default experiments.
+       if !fnApiSet {
+               experiments = append(experiments, "beam_fn_api")
        }
-       // Enable by default unified worker, and portable job submission.
        if !v2set {
                experiments = append(experiments, "use_unified_worker")
        }
+       if !uwSet {
+               experiments = append(experiments, "use_unified_worker")
+       }

Review Comment:
   If `!v2set && !uwSet` we'll append this twice - could you clarify the logic 
here and only set once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to