lukecwik commented on code in PR #24767:
URL: https://github.com/apache/beam/pull/24767#discussion_r1055880466


##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -282,24 +285,51 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
        hooks.SerializeHooksToOptions()
 
        experiments := jobopts.GetExperiments()
-       // Always use runner v2, unless set already.
-       var v2set, portaSubmission bool
+       // Ensure that we enable the same set of experiments across all SDKs
+       // for runner v2.
+       var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
        for _, e := range experiments {
-               if strings.Contains(e, "use_runner_v2") || strings.Contains(e, 
"use_unified_worker") {
+               if strings.Contains(e, "beam_fn_api") {
+                       fnApiSet = true
+               }
+               if strings.Contains(e, "use_runner_v2") {
                        v2set = true
                }
+               if strings.Contains(e, "use_unified_worker") {
+                       uwSet = true
+               }
                if strings.Contains(e, "use_portable_job_submission") {
                        portaSubmission = true
                }
+               if strings.Contains(e, "disable_runner_v2") || 
strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, 
"disable_prime_runner_v2") {
+                       return nil, errors.New("disabling runner v2 is now 
longer supported as of 2.45.0+")

Review Comment:
   done



##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -71,7 +72,6 @@ var (
        tempLocation           = flag.String("temp_location", "", "Temp 
location (optional)")
        machineType            = flag.String("worker_machine_type", "", "GCE 
machine type (optional)")
        minCPUPlatform         = flag.String("min_cpu_platform", "", "GCE 
minimum cpu platform (optional)")
-       workerJar              = flag.String("dataflow_worker_jar", "", 
"Dataflow worker jar (optional)")

Review Comment:
   I'll add it back, and mark as deprecated, not worth breaking people over it.



##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -282,24 +285,51 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
        hooks.SerializeHooksToOptions()
 
        experiments := jobopts.GetExperiments()
-       // Always use runner v2, unless set already.
-       var v2set, portaSubmission bool
+       // Ensure that we enable the same set of experiments across all SDKs
+       // for runner v2.
+       var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
        for _, e := range experiments {
-               if strings.Contains(e, "use_runner_v2") || strings.Contains(e, 
"use_unified_worker") {
+               if strings.Contains(e, "beam_fn_api") {
+                       fnApiSet = true
+               }
+               if strings.Contains(e, "use_runner_v2") {
                        v2set = true
                }
+               if strings.Contains(e, "use_unified_worker") {
+                       uwSet = true
+               }
                if strings.Contains(e, "use_portable_job_submission") {
                        portaSubmission = true
                }
+               if strings.Contains(e, "disable_runner_v2") || 
strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, 
"disable_prime_runner_v2") {
+                       return nil, errors.New("disabling runner v2 is now 
longer supported as of 2.45.0+")
+               }
+       }
+       // Enable default experiments.
+       if !fnApiSet {
+               experiments = append(experiments, "beam_fn_api")
        }
-       // Enable by default unified worker, and portable job submission.
        if !v2set {
                experiments = append(experiments, "use_unified_worker")
        }
+       if !uwSet {
+               experiments = append(experiments, "use_unified_worker")
+       }

Review Comment:
   fixed



##########
sdks/go/pkg/beam/runners/dataflow/dataflow_test.go:
##########
@@ -49,18 +49,91 @@ func TestGetJobOptions(t *testing.T) {
        *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
        *jobopts.JobName = "testJob"
 
-       opts, err := getJobOptions(context.Background())
+       opts, err := getJobOptions(context.Background(), false)
        if err != nil {
                t.Fatalf("getJobOptions() returned error %q, want %q", err, 
"nil")
        }
+       if got, want := opts.Streaming, false; got != want {
+               t.Errorf("getJobOptions().Streaming = %t, want %t", got, want)
+       }
+       if got, want := opts.Name, "testJob"; got != want {
+               t.Errorf("getJobOptions().Name = %q, want %q", got, want)
+       }
+       if got, want := len(opts.Experiments), 5; got != want {
+               t.Errorf("len(getJobOptions().Experiments) = %q, want %q", got, 
want)
+       } else {
+               sort.Strings(opts.Experiments)
+               expectedExperiments := []string{"beam_fn_api", 
"min_cpu_platform=testPlatform", "use_portable_job_submission", 
"use_runner_v2", "use_unified_worker"}
+               for i := 0; i < 3; i++ {
+                       if got, want := opts.Experiments[i], 
expectedExperiments[i]; got != want {
+                               t.Errorf("getJobOptions().Experiments = %q, 
want %q", got, want)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to