tvalentyn commented on code in PR #26220:
URL: https://github.com/apache/beam/pull/26220#discussion_r1166061567
##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -175,20 +175,32 @@ def create_harness(environment, dry_run=False):
def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
- fn_log_handler, sdk_harness, sdk_pipeline_options =
create_harness(os.environ)
- experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or []
+ (fn_log_handler, sdk_harness,
+ sdk_pipeline_options) = create_harness(os.environ)
dataflow_service_options = (
sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options
or [])
- if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or (
- _ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options):
+ experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or [])
Review Comment:
I think it would be sufficient to only parse Dataflow service options, since
enable_google_cloud_profiler is advertized as a service option.
##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -175,20 +175,32 @@ def create_harness(environment, dry_run=False):
def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
- fn_log_handler, sdk_harness, sdk_pipeline_options =
create_harness(os.environ)
- experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or []
+ (fn_log_handler, sdk_harness,
+ sdk_pipeline_options) = create_harness(os.environ)
dataflow_service_options = (
sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options
or [])
- if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or (
- _ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options):
+ experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or [])
+ exp = None
+ if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or
+ (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)):
+ exp = True
+ for experiment in experiments:
+ if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='):
+ exp = experiment.split('=', 1)[1]
+
+ if exp:
Review Comment:
try to use more mnemonic names (will be taken care of if you go with
suggestion above).
##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -175,20 +175,32 @@ def create_harness(environment, dry_run=False):
def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
- fn_log_handler, sdk_harness, sdk_pipeline_options =
create_harness(os.environ)
- experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or []
+ (fn_log_handler, sdk_harness,
+ sdk_pipeline_options) = create_harness(os.environ)
dataflow_service_options = (
sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options
or [])
- if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or (
- _ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options):
+ experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or [])
+ exp = None
+ if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or
+ (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)):
+ exp = True
+ for experiment in experiments:
+ if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='):
+ exp = experiment.split('=', 1)[1]
Review Comment:
check out
https://github.com/apache/beam/blob/5a9ab685d9bb7a1df901f535cfb7b5c106fbf927/sdks/python/apache_beam/options/pipeline_options.py#L1143
.
We can add this helper to GoogleCloudOptions to parse the
`dataflow_service_options`.
Then, we can write:
```google_cloud_profiler_service_name =
sdk_pipeline_options.view_as(GoogleCloudOptions).lookup_dataflow_service_option('_ENABLE_GOOGLE_CLOUD_PROFILER',
default=os.environ["JOB_NAME"])```
##########
sdks/python/apache_beam/runners/worker/sdk_worker_main.py:
##########
@@ -175,20 +175,32 @@ def create_harness(environment, dry_run=False):
def main(unused_argv):
"""Main entry point for SDK Fn Harness."""
- fn_log_handler, sdk_harness, sdk_pipeline_options =
create_harness(os.environ)
- experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or []
+ (fn_log_handler, sdk_harness,
+ sdk_pipeline_options) = create_harness(os.environ)
dataflow_service_options = (
sdk_pipeline_options.view_as(GoogleCloudOptions).dataflow_service_options
or [])
- if (_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or (
- _ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options):
+ experiments = (sdk_pipeline_options.view_as(DebugOptions).experiments or [])
+ exp = None
+ if ((_ENABLE_GOOGLE_CLOUD_PROFILER in experiments) or
+ (_ENABLE_GOOGLE_CLOUD_PROFILER in dataflow_service_options)):
+ exp = True
+ for experiment in experiments:
+ if experiment.startswith(_ENABLE_GOOGLE_CLOUD_PROFILER + '='):
+ exp = experiment.split('=', 1)[1]
+
+ if exp:
+ if not isinstance(exp, bool):
Review Comment:
This logic is hard to reason about. would be simplified if you go with
suggestion above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]