This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e4c99da [BEAM-11811] Disallow num_workers > max_num_workers for Go
Dataflow runner
new 2035e1b Merge pull request #14955 from zhoufek/wsc_go
e4c99da is described below
commit e4c99da30b69768dbda7ba58b1f59de4fd13be58
Author: zhoufek <[email protected]>
AuthorDate: Mon Jun 7 12:27:04 2021 -0400
[BEAM-11811] Disallow num_workers > max_num_workers for Go Dataflow runner
---
.../pkg/beam/runners/dataflow/dataflowlib/job.go | 13 +++++++++
.../beam/runners/dataflow/dataflowlib/job_test.go | 33 ++++++++++++++++++++++
2 files changed, 46 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 390082b..6249441 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -17,6 +17,7 @@ package dataflowlib
import (
"context"
+ "fmt"
"strings"
"time"
@@ -345,5 +346,17 @@ func validateWorkerSettings(ctx context.Context, opts
*JobOptions) error {
opts.WorkerZone = opts.Zone
opts.Zone = ""
}
+
+ numWorkers := opts.NumWorkers
+ maxNumWorkers := opts.MaxNumWorkers
+ if numWorkers < 0 {
+ return fmt.Errorf("num_workers (%d) cannot be negative",
numWorkers)
+ }
+ if maxNumWorkers < 0 {
+ return fmt.Errorf("max_num_workers (%d) cannot be negative",
maxNumWorkers)
+ }
+ if numWorkers > 0 && maxNumWorkers > 0 && numWorkers > maxNumWorkers {
+ return fmt.Errorf("num_workers (%d) cannot exceed
max_num_workers (%d)", numWorkers, maxNumWorkers)
+ }
return nil
}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
index a760229..1bf178f 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
@@ -76,6 +76,28 @@ func TestValidateWorkerSettings(t *testing.T) {
},
errMessage: "experiment worker_region and option Zone
are mutually exclusive",
},
+ {
+ name: "test_num_workers_cannot_be_negative",
+ jobOptions: JobOptions{
+ NumWorkers: -1,
+ },
+ errMessage: "num_workers (-1) cannot be negative",
+ },
+ {
+ name: "test_max_num_workers_cannot_be_negative",
+ jobOptions: JobOptions{
+ MaxNumWorkers: -1,
+ },
+ errMessage: "max_num_workers (-1) cannot be negative",
+ },
+ {
+ name: "test_num_workers_cannot_exceed_max_num_workers",
+ jobOptions: JobOptions{
+ NumWorkers: 43,
+ MaxNumWorkers: 42,
+ },
+ errMessage: "num_workers (43) cannot exceed
max_num_workers (42)",
+ },
}
for _, test := range testsWithErr {
@@ -110,6 +132,17 @@ func TestValidateWorkerSettings(t *testing.T) {
opts: JobOptions{WorkerRegion: "foo"},
expected: JobOptions{WorkerRegion: "foo"},
},
+ {
+ name: "test_num_workers_can_equal_max_num_workers",
+ opts: JobOptions{
+ NumWorkers: 42,
+ MaxNumWorkers: 42,
+ },
+ expected: JobOptions{
+ NumWorkers: 42,
+ MaxNumWorkers: 42,
+ },
+ },
}
for _, test := range tests {