damccorm commented on code in PR #24767: URL: https://github.com/apache/beam/pull/24767#discussion_r1055844810
########## sdks/go/pkg/beam/runners/dataflow/dataflow.go: ########## @@ -71,7 +72,6 @@ var ( tempLocation = flag.String("temp_location", "", "Temp location (optional)") machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)") minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)") - workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)") Review Comment: This is technically breaking even if it previously no-oped (now it will fail) - could we call it out in the release notes as well? ########## sdks/go/pkg/beam/runners/dataflow/dataflow.go: ########## @@ -282,24 +285,51 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { hooks.SerializeHooksToOptions() experiments := jobopts.GetExperiments() - // Always use runner v2, unless set already. - var v2set, portaSubmission bool + // Ensure that we enable the same set of experiments across all SDKs + // for runner v2. + var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { - if strings.Contains(e, "use_runner_v2") || strings.Contains(e, "use_unified_worker") { + if strings.Contains(e, "beam_fn_api") { + fnApiSet = true + } + if strings.Contains(e, "use_runner_v2") { v2set = true } + if strings.Contains(e, "use_unified_worker") { + uwSet = true + } if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { + return nil, errors.New("disabling runner v2 is now longer supported as of 2.45.0+") Review Comment: Maybe add a little more information like: `Detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+` (also, typo nit: `now` -> `no`) ########## sdks/go/pkg/beam/runners/dataflow/dataflow_test.go: ########## @@ -49,18 +49,91 @@ func TestGetJobOptions(t *testing.T) { *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" *jobopts.JobName = "testJob" - opts, err := getJobOptions(context.Background()) + opts, err := getJobOptions(context.Background(), false) if err != nil { t.Fatalf("getJobOptions() returned error %q, want %q", err, "nil") } + if got, want := opts.Streaming, false; got != want { + t.Errorf("getJobOptions().Streaming = %t, want %t", got, want) + } + if got, want := opts.Name, "testJob"; got != want { + t.Errorf("getJobOptions().Name = %q, want %q", got, want) + } + if got, want := len(opts.Experiments), 5; got != want { + t.Errorf("len(getJobOptions().Experiments) = %q, want %q", got, want) + } else { + sort.Strings(opts.Experiments) + expectedExperiments := []string{"beam_fn_api", "min_cpu_platform=testPlatform", "use_portable_job_submission", "use_runner_v2", "use_unified_worker"} + for i := 0; i < 3; i++ { + if got, want := opts.Experiments[i], expectedExperiments[i]; got != want { + t.Errorf("getJobOptions().Experiments = %q, want %q", got, want) Review Comment: Same comment applies below ########## sdks/go/pkg/beam/runners/dataflow/dataflow_test.go: ########## @@ -49,18 +49,91 @@ func TestGetJobOptions(t *testing.T) { *jobopts.Experiments = "use_runner_v2,use_portable_job_submission" *jobopts.JobName = "testJob" - opts, err := getJobOptions(context.Background()) + opts, err := getJobOptions(context.Background(), false) if err != nil { t.Fatalf("getJobOptions() returned error %q, want %q", err, "nil") } + if got, want := opts.Streaming, false; got != want { + t.Errorf("getJobOptions().Streaming = %t, want %t", got, want) + } + if got, want := opts.Name, "testJob"; got != want { + t.Errorf("getJobOptions().Name = %q, want %q", got, want) + } + if got, want := len(opts.Experiments), 5; got != want { + t.Errorf("len(getJobOptions().Experiments) = %q, want %q", got, want) + } else { + sort.Strings(opts.Experiments) + expectedExperiments := []string{"beam_fn_api", "min_cpu_platform=testPlatform", "use_portable_job_submission", "use_runner_v2", "use_unified_worker"} + for i := 0; i < 3; i++ { + if got, want := opts.Experiments[i], expectedExperiments[i]; got != want { + t.Errorf("getJobOptions().Experiments = %q, want %q", got, want) Review Comment: ```suggestion t.Errorf("getJobOptions().Experiments[%s] = %q, want %q", i, got, want) ``` ########## sdks/go/pkg/beam/runners/dataflow/dataflow.go: ########## @@ -282,24 +285,51 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) { hooks.SerializeHooksToOptions() experiments := jobopts.GetExperiments() - // Always use runner v2, unless set already. - var v2set, portaSubmission bool + // Ensure that we enable the same set of experiments across all SDKs + // for runner v2. + var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { - if strings.Contains(e, "use_runner_v2") || strings.Contains(e, "use_unified_worker") { + if strings.Contains(e, "beam_fn_api") { + fnApiSet = true + } + if strings.Contains(e, "use_runner_v2") { v2set = true } + if strings.Contains(e, "use_unified_worker") { + uwSet = true + } if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { + return nil, errors.New("disabling runner v2 is now longer supported as of 2.45.0+") + } + } + // Enable default experiments. + if !fnApiSet { + experiments = append(experiments, "beam_fn_api") } - // Enable by default unified worker, and portable job submission. if !v2set { experiments = append(experiments, "use_unified_worker") } + if !uwSet { + experiments = append(experiments, "use_unified_worker") + } Review Comment: If `!v2set && !uwSet` we'll append this twice - could you clarify the logic here and only set once? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org