[
https://issues.apache.org/jira/browse/BEAM-14509?focusedWorklogId=774674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774674
]
ASF GitHub Bot logged work on BEAM-14509:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/May/22 16:38
Start Date: 25/May/22 16:38
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17752:
URL: https://github.com/apache/beam/pull/17752#discussion_r881850464
##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -51,26 +51,31 @@ import (
// TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
var (
- endpoint = flag.String("dataflow_endpoint", "", "Dataflow
endpoint (optional).")
- stagingLocation = flag.String("staging_location", "", "GCS staging
location (required).")
- image = flag.String("worker_harness_container_image",
"", "Worker harness container image (required).")
- labels = flag.String("labels", "", "JSON-formatted
map[string]string of job labels (optional).")
- serviceAccountEmail = flag.String("service_account_email", "",
"Service account email (optional).")
- numWorkers = flag.Int64("num_workers", 0, "Number of workers
(optional).")
- maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number
of workers during scaling (optional).")
- diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk
for VMs, in GB (optional).")
- diskType = flag.String("disk_type", "", "Type of root disk
for VMs (optional).")
- autoscalingAlgorithm = flag.String("autoscaling_algorithm", "",
"Autoscaling mode to use (optional).")
- zone = flag.String("zone", "", "GCP zone (optional)")
- network = flag.String("network", "", "GCP network
(optional)")
- subnetwork = flag.String("subnetwork", "", "GCP subnetwork
(optional)")
- noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers
must not use public IP addresses (optional)")
- tempLocation = flag.String("temp_location", "", "Temp location
(optional)")
- machineType = flag.String("worker_machine_type", "", "GCE
machine type (optional)")
- minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum
cpu platform (optional)")
- workerJar = flag.String("dataflow_worker_jar", "", "Dataflow
worker jar (optional)")
- workerRegion = flag.String("worker_region", "", "Dataflow
worker region (optional)")
- workerZone = flag.String("worker_zone", "", "Dataflow worker
zone (optional)")
+ endpoint = flag.String("dataflow_endpoint", "", "Dataflow
endpoint (optional).")
+ stagingLocation = flag.String("staging_location", "", "GCS
staging location (required).")
+ image = flag.String("worker_harness_container_image",
"", "Worker harness container image (required).")
+ labels = flag.String("labels", "", "JSON-formatted
map[string]string of job labels (optional).")
+ serviceAccountEmail = flag.String("service_account_email", "",
"Service account email (optional).")
+ numWorkers = flag.Int64("num_workers", 0, "Number of
workers (optional).")
+ workerHarnessThreads = flag.Int64("number_of_worker_harness_threads",
0, "The number of threads per each worker harness process (optional).")
+ maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum
number of workers during scaling (optional).")
+ diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root
disk for VMs, in GB (optional).")
+ diskType = flag.String("disk_type", "", "Type of root
disk for VMs (optional).")
+ autoscalingAlgorithm = flag.String("autoscaling_algorithm", "",
"Autoscaling mode to use (optional).")
+ zone = flag.String("zone", "", "GCP zone (optional)")
+ kmsKey = flag.String("dataflow_kms_key", "", "The Cloud
KMS key identifier used to encrypt data at rest (optional).")
+ network = flag.String("network", "", "GCP network
(optional)")
+ subnetwork = flag.String("subnetwork", "", "GCP subnetwork
(optional)")
+ noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers
must not use public IP addresses (optional)")
+ tempLocation = flag.String("temp_location", "", "Temp
location (optional)")
+ machineType = flag.String("worker_machine_type", "", "GCE
machine type (optional)")
+ minCPUPlatform = flag.String("min_cpu_platform", "", "GCE
minimum cpu platform (optional)")
+ workerJar = flag.String("dataflow_worker_jar", "",
"Dataflow worker jar (optional)")
+ workerRegion = flag.String("worker_region", "", "Dataflow
worker region (optional)")
+ workerZone = flag.String("worker_zone", "", "Dataflow
worker zone (optional)")
+ dataflowServiceOptions = flag.String("dataflow_service_options", "",
"Comma separated list of additional job modes and configurations (optional)")
+ flexRSGoal = flag.String("flexrs_goal", "", "Which Flexible
Resource Scheduling mode to run in (optional)")
+ enableHotKeyLogging = flag.Bool("enable_hot_key_logging", false,
"Specifies that when a hot key is detected in the pipeline, the literal,
human-readable key is printed in the user's Cloud Logging project (optional).")
Review Comment:
Filed https://issues.apache.org/jira/browse/BEAM-14512 about below. We
should comment this out for now and link to this JIRA. We can keep the rest of
the plumbing though, as we'll need it later.
We can't support this yet because we don't support the "ToString"
URN/Capability yet. It would be very similar to Drain, but without the user
customization. Basically we add an exec Node that takes an element and uses
fmt.Sprintf("%v") to turn it into a string. The element comes from a DataSource
as usual, and is returned to the service with a DataSink, so it really is just
"understand the URN" and "include it in the Capabilities list" (statically, all
the time even).
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L329
Unfortunately, this one is annoying to test because there's no closed form
test runner for it. It's not hard to write a job that HotKeys at least, and
that we're printing whatever the `String()` method provides (as handled by the
`fmt` package).
##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -90,29 +95,34 @@ func init() {
// New flags that are already put into pipeline options
// should be added to this map.
var flagFilter = map[string]bool{
- "dataflow_endpoint": true,
- "staging_location": true,
- "worker_harness_container_image": true,
- "labels": true,
- "service_account_email": true,
- "num_workers": true,
- "max_num_workers": true,
- "disk_size_gb": true,
- "disk_type": true,
- "autoscaling_algorithm": true,
- "zone": true,
- "network": true,
- "subnetwork": true,
- "no_use_public_ips": true,
- "temp_location": true,
- "worker_machine_type": true,
- "min_cpu_platform": true,
- "dataflow_worker_jar": true,
- "worker_region": true,
- "worker_zone": true,
- "teardown_policy": true,
- "cpu_profiling": true,
- "session_recording": true,
+ "dataflow_endpoint": true,
+ "staging_location": true,
+ "worker_harness_container_image": true,
+ "labels": true,
+ "service_account_email": true,
+ "num_workers": true,
+ "max_num_workers": true,
+ "number_of_worker_harness_threads": true,
Review Comment:
This list is largely to filter out things that already show up visibly in
the UI when set. If it's not UI visible by default, we probably shouldn't
filter them out.
My recommendation here is *not* adding anything to start and we can filter
them if we notice they stutter in the UI. Ultimately, this helps users (and us)
know the configuration of a job, and failing in a "it shows up twice" is fine,
but hiding the information could be painful.
I'll also note that if you tried these things already and noticed the
stutter, then this comment can be disregarded.
##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -242,6 +252,12 @@ func getJobOptions(ctx context.Context)
(*dataflowlib.JobOptions, error) {
}
}
+ if *flexRSGoal != "" {
+ if *flexRSGoal != "FLEXRS_UNSPECIFIED" && *flexRSGoal !=
"FLEXRS_SPEED_OPTIMIZED" && *flexRSGoal != "FLEXRS_COST_OPTIMIZED" {
+ return nil, errors.New("invalid flex resource
scheduling goal. Use
--flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)")
+ }
Review Comment:
Nit: The switch syntax, while it inverts the case, is a bit clearer since it
avoids repeating the `*flexRSGoal !=` several times.
Also, print out the flag value we did get, which makes it easier for users
to spot typos.
```suggestion
switch *flexRSGoal {
case "FLEXRS_UNSPECIFIED" , "FLEXRS_SPEED_OPTIMIZED",
"FLEXRS_COST_OPTIMIZED":
// valid values
default:
return nil, errors.Errorf("invalid flex resource
scheduling goal. Got %q; Use
--flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)",
*flexRSGoal)
}
```
##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -51,26 +51,31 @@ import (
// TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
var (
- endpoint = flag.String("dataflow_endpoint", "", "Dataflow
endpoint (optional).")
- stagingLocation = flag.String("staging_location", "", "GCS staging
location (required).")
- image = flag.String("worker_harness_container_image",
"", "Worker harness container image (required).")
- labels = flag.String("labels", "", "JSON-formatted
map[string]string of job labels (optional).")
- serviceAccountEmail = flag.String("service_account_email", "",
"Service account email (optional).")
- numWorkers = flag.Int64("num_workers", 0, "Number of workers
(optional).")
- maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number
of workers during scaling (optional).")
- diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk
for VMs, in GB (optional).")
- diskType = flag.String("disk_type", "", "Type of root disk
for VMs (optional).")
- autoscalingAlgorithm = flag.String("autoscaling_algorithm", "",
"Autoscaling mode to use (optional).")
- zone = flag.String("zone", "", "GCP zone (optional)")
- network = flag.String("network", "", "GCP network
(optional)")
- subnetwork = flag.String("subnetwork", "", "GCP subnetwork
(optional)")
- noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers
must not use public IP addresses (optional)")
- tempLocation = flag.String("temp_location", "", "Temp location
(optional)")
- machineType = flag.String("worker_machine_type", "", "GCE
machine type (optional)")
- minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum
cpu platform (optional)")
- workerJar = flag.String("dataflow_worker_jar", "", "Dataflow
worker jar (optional)")
- workerRegion = flag.String("worker_region", "", "Dataflow
worker region (optional)")
- workerZone = flag.String("worker_zone", "", "Dataflow worker
zone (optional)")
+ endpoint = flag.String("dataflow_endpoint", "", "Dataflow
endpoint (optional).")
+ stagingLocation = flag.String("staging_location", "", "GCS
staging location (required).")
+ image = flag.String("worker_harness_container_image",
"", "Worker harness container image (required).")
Review Comment:
```suggestion
image = flag.String("worker_harness_container_image",
"", "Worker harness container image (optional).")
```
Definitely optional at this stage. I haven't used the flag in so long it
doesn't show up in my command line history.
Issue Time Tracking
-------------------
Worklog Id: (was: 774674)
Time Spent: 1.5h (was: 1h 20m)
> Add Dataflow flags to Go
> ------------------------
>
> Key: BEAM-14509
> URL: https://issues.apache.org/jira/browse/BEAM-14509
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow, sdk-go
> Reporter: Danny McCormick
> Assignee: Danny McCormick
> Priority: P2
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)