[ 
https://issues.apache.org/jira/browse/BEAM-14509?focusedWorklogId=774674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774674
 ]

ASF GitHub Bot logged work on BEAM-14509:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/May/22 16:38
            Start Date: 25/May/22 16:38
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17752:
URL: https://github.com/apache/beam/pull/17752#discussion_r881850464


##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -51,26 +51,31 @@ import (
 // TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
 
 var (
-       endpoint             = flag.String("dataflow_endpoint", "", "Dataflow 
endpoint (optional).")
-       stagingLocation      = flag.String("staging_location", "", "GCS staging 
location (required).")
-       image                = flag.String("worker_harness_container_image", 
"", "Worker harness container image (required).")
-       labels               = flag.String("labels", "", "JSON-formatted 
map[string]string of job labels (optional).")
-       serviceAccountEmail  = flag.String("service_account_email", "", 
"Service account email (optional).")
-       numWorkers           = flag.Int64("num_workers", 0, "Number of workers 
(optional).")
-       maxNumWorkers        = flag.Int64("max_num_workers", 0, "Maximum number 
of workers during scaling (optional).")
-       diskSizeGb           = flag.Int64("disk_size_gb", 0, "Size of root disk 
for VMs, in GB (optional).")
-       diskType             = flag.String("disk_type", "", "Type of root disk 
for VMs (optional).")
-       autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", 
"Autoscaling mode to use (optional).")
-       zone                 = flag.String("zone", "", "GCP zone (optional)")
-       network              = flag.String("network", "", "GCP network 
(optional)")
-       subnetwork           = flag.String("subnetwork", "", "GCP subnetwork 
(optional)")
-       noUsePublicIPs       = flag.Bool("no_use_public_ips", false, "Workers 
must not use public IP addresses (optional)")
-       tempLocation         = flag.String("temp_location", "", "Temp location 
(optional)")
-       machineType          = flag.String("worker_machine_type", "", "GCE 
machine type (optional)")
-       minCPUPlatform       = flag.String("min_cpu_platform", "", "GCE minimum 
cpu platform (optional)")
-       workerJar            = flag.String("dataflow_worker_jar", "", "Dataflow 
worker jar (optional)")
-       workerRegion         = flag.String("worker_region", "", "Dataflow 
worker region (optional)")
-       workerZone           = flag.String("worker_zone", "", "Dataflow worker 
zone (optional)")
+       endpoint               = flag.String("dataflow_endpoint", "", "Dataflow 
endpoint (optional).")
+       stagingLocation        = flag.String("staging_location", "", "GCS 
staging location (required).")
+       image                  = flag.String("worker_harness_container_image", 
"", "Worker harness container image (required).")
+       labels                 = flag.String("labels", "", "JSON-formatted 
map[string]string of job labels (optional).")
+       serviceAccountEmail    = flag.String("service_account_email", "", 
"Service account email (optional).")
+       numWorkers             = flag.Int64("num_workers", 0, "Number of 
workers (optional).")
+       workerHarnessThreads   = flag.Int64("number_of_worker_harness_threads", 
0, "The number of threads per each worker harness process (optional).")
+       maxNumWorkers          = flag.Int64("max_num_workers", 0, "Maximum 
number of workers during scaling (optional).")
+       diskSizeGb             = flag.Int64("disk_size_gb", 0, "Size of root 
disk for VMs, in GB (optional).")
+       diskType               = flag.String("disk_type", "", "Type of root 
disk for VMs (optional).")
+       autoscalingAlgorithm   = flag.String("autoscaling_algorithm", "", 
"Autoscaling mode to use (optional).")
+       zone                   = flag.String("zone", "", "GCP zone (optional)")
+       kmsKey                 = flag.String("dataflow_kms_key", "", "The Cloud 
KMS key identifier used to encrypt data at rest (optional).")
+       network                = flag.String("network", "", "GCP network 
(optional)")
+       subnetwork             = flag.String("subnetwork", "", "GCP subnetwork 
(optional)")
+       noUsePublicIPs         = flag.Bool("no_use_public_ips", false, "Workers 
must not use public IP addresses (optional)")
+       tempLocation           = flag.String("temp_location", "", "Temp 
location (optional)")
+       machineType            = flag.String("worker_machine_type", "", "GCE 
machine type (optional)")
+       minCPUPlatform         = flag.String("min_cpu_platform", "", "GCE 
minimum cpu platform (optional)")
+       workerJar              = flag.String("dataflow_worker_jar", "", 
"Dataflow worker jar (optional)")
+       workerRegion           = flag.String("worker_region", "", "Dataflow 
worker region (optional)")
+       workerZone             = flag.String("worker_zone", "", "Dataflow 
worker zone (optional)")
+       dataflowServiceOptions = flag.String("dataflow_service_options", "", 
"Comma separated list of additional job modes and configurations (optional)")
+       flexRSGoal             = flag.String("flexrs_goal", "", "Which Flexible 
Resource Scheduling mode to run in (optional)")
+       enableHotKeyLogging    = flag.Bool("enable_hot_key_logging", false, 
"Specifies that when a hot key is detected in the pipeline, the literal, 
human-readable key is printed in the user's Cloud Logging project (optional).")

Review Comment:
   Filed https://issues.apache.org/jira/browse/BEAM-14512 about below. We 
should comment this out for now and link to this JIRA. We can keep the rest of 
the plumbing though, as we'll need it later.
   
   We can't support this yet because we don't support the "ToString" 
URN/Capability yet. It would be very similar to Drain, but without the user 
customization. Basically we add an exec Node that takes an element and uses 
fmt.Sprintf("%v") to turn it into a string. The element comes from a DataSource 
as usual, and is returned to the service with a DataSink, so it really is just 
"understand the URN" and "include it in the Capabilities list" (statically, all 
the time even).
   
   
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L329
   
   Unfortunately, this one is annoying to test because there's no closed form 
test runner for it. It's not hard to write a job that HotKeys at least, and 
that we're printing whatever the `String()` method provides (as handled by the 
`fmt` package).



##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -90,29 +95,34 @@ func init() {
 // New flags that are already put into pipeline options
 // should be added to this map.
 var flagFilter = map[string]bool{
-       "dataflow_endpoint":              true,
-       "staging_location":               true,
-       "worker_harness_container_image": true,
-       "labels":                         true,
-       "service_account_email":          true,
-       "num_workers":                    true,
-       "max_num_workers":                true,
-       "disk_size_gb":                   true,
-       "disk_type":                      true,
-       "autoscaling_algorithm":          true,
-       "zone":                           true,
-       "network":                        true,
-       "subnetwork":                     true,
-       "no_use_public_ips":              true,
-       "temp_location":                  true,
-       "worker_machine_type":            true,
-       "min_cpu_platform":               true,
-       "dataflow_worker_jar":            true,
-       "worker_region":                  true,
-       "worker_zone":                    true,
-       "teardown_policy":                true,
-       "cpu_profiling":                  true,
-       "session_recording":              true,
+       "dataflow_endpoint":                true,
+       "staging_location":                 true,
+       "worker_harness_container_image":   true,
+       "labels":                           true,
+       "service_account_email":            true,
+       "num_workers":                      true,
+       "max_num_workers":                  true,
+       "number_of_worker_harness_threads": true,

Review Comment:
   This list is largely to filter out things that already show up visibly in 
the UI when set. If it's not UI visible by default, we probably shouldn't 
filter them out.
   
   My recommendation here is *not* adding anything to start and we can filter 
them if we notice they stutter in the UI. Ultimately, this helps users (and us) 
know the configuration of a job, and failing in a "it shows up twice" is fine, 
but hiding the information could be painful.
   
   I'll also note that if you tried these things already and noticed the 
stutter, then this comment can be disregarded.



##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -242,6 +252,12 @@ func getJobOptions(ctx context.Context) 
(*dataflowlib.JobOptions, error) {
                }
        }
 
+       if *flexRSGoal != "" {
+               if *flexRSGoal != "FLEXRS_UNSPECIFIED" && *flexRSGoal != 
"FLEXRS_SPEED_OPTIMIZED" && *flexRSGoal != "FLEXRS_COST_OPTIMIZED" {
+                       return nil, errors.New("invalid flex resource 
scheduling goal. Use 
--flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)")
+               }

Review Comment:
   Nit: The switch syntax, while it inverts the case, is a bit clearer since it 
avoids repeating the `*flexRSGoal !=` several times.
   
   Also, print out the flag value we did get, which makes it easier for users 
to spot typos.
   
   ```suggestion
                switch *flexRSGoal {
                case "FLEXRS_UNSPECIFIED" , "FLEXRS_SPEED_OPTIMIZED", 
"FLEXRS_COST_OPTIMIZED":
                  // valid values
                default:
                        return nil, errors.Errorf("invalid flex resource 
scheduling goal. Got %q; Use 
--flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)",
 *flexRSGoal)
                }
   ```



##########
sdks/go/pkg/beam/runners/dataflow/dataflow.go:
##########
@@ -51,26 +51,31 @@ import (
 // TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
 
 var (
-       endpoint             = flag.String("dataflow_endpoint", "", "Dataflow 
endpoint (optional).")
-       stagingLocation      = flag.String("staging_location", "", "GCS staging 
location (required).")
-       image                = flag.String("worker_harness_container_image", 
"", "Worker harness container image (required).")
-       labels               = flag.String("labels", "", "JSON-formatted 
map[string]string of job labels (optional).")
-       serviceAccountEmail  = flag.String("service_account_email", "", 
"Service account email (optional).")
-       numWorkers           = flag.Int64("num_workers", 0, "Number of workers 
(optional).")
-       maxNumWorkers        = flag.Int64("max_num_workers", 0, "Maximum number 
of workers during scaling (optional).")
-       diskSizeGb           = flag.Int64("disk_size_gb", 0, "Size of root disk 
for VMs, in GB (optional).")
-       diskType             = flag.String("disk_type", "", "Type of root disk 
for VMs (optional).")
-       autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", 
"Autoscaling mode to use (optional).")
-       zone                 = flag.String("zone", "", "GCP zone (optional)")
-       network              = flag.String("network", "", "GCP network 
(optional)")
-       subnetwork           = flag.String("subnetwork", "", "GCP subnetwork 
(optional)")
-       noUsePublicIPs       = flag.Bool("no_use_public_ips", false, "Workers 
must not use public IP addresses (optional)")
-       tempLocation         = flag.String("temp_location", "", "Temp location 
(optional)")
-       machineType          = flag.String("worker_machine_type", "", "GCE 
machine type (optional)")
-       minCPUPlatform       = flag.String("min_cpu_platform", "", "GCE minimum 
cpu platform (optional)")
-       workerJar            = flag.String("dataflow_worker_jar", "", "Dataflow 
worker jar (optional)")
-       workerRegion         = flag.String("worker_region", "", "Dataflow 
worker region (optional)")
-       workerZone           = flag.String("worker_zone", "", "Dataflow worker 
zone (optional)")
+       endpoint               = flag.String("dataflow_endpoint", "", "Dataflow 
endpoint (optional).")
+       stagingLocation        = flag.String("staging_location", "", "GCS 
staging location (required).")
+       image                  = flag.String("worker_harness_container_image", 
"", "Worker harness container image (required).")

Review Comment:
   ```suggestion
        image                  = flag.String("worker_harness_container_image", 
"", "Worker harness container image (optional).")
   ```
   
   Definitely optional at this stage. I haven't used the flag in so long it 
doesn't show up in my command line history.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 774674)
    Time Spent: 1.5h  (was: 1h 20m)

> Add Dataflow flags to Go
> ------------------------
>
>                 Key: BEAM-14509
>                 URL: https://issues.apache.org/jira/browse/BEAM-14509
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-dataflow, sdk-go
>            Reporter: Danny McCormick
>            Assignee: Danny McCormick
>            Priority: P2
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to