[ 
https://issues.apache.org/jira/browse/BEAM-5442?focusedWorklogId=191309&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-191309
 ]

ASF GitHub Bot logged work on BEAM-5442:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jan/19 23:54
            Start Date: 28/Jan/19 23:54
    Worklog Time Spent: 10m 
      Work Description: tweise commented on pull request #7597: [BEAM-5442] 
Retrieve valid runner options from JobService
URL: https://github.com/apache/beam/pull/7597#discussion_r251642819
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/portable_runner.py
 ##########
 @@ -189,20 +189,60 @@ def run_pipeline(self, pipeline, options):
       proto_pipeline = fn_api_runner_transforms.with_stages(
           proto_pipeline, stages)
 
-    # TODO: Define URNs for options.
-    # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
-    p_options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
-                 for k, v in options.get_all_options().items()
-                 if v is not None}
-
     if not job_service:
       channel = grpc.insecure_channel(job_endpoint)
       grpc.channel_ready_future(channel).result()
       job_service = beam_job_api_pb2_grpc.JobServiceStub(channel)
     else:
       channel = None
 
+    # fetch runner options from job service
+    def send_options_request(max_retries=5):
+      num_retries = 0
+      while True:
+        try:
+          # This reports channel is READY but connections may fail
+          # Seems to be only an issue on Mac with port forwardings
+          if channel:
+            grpc.channel_ready_future(channel).result()
+          return job_service.DescribePipelineOptions(
+              beam_job_api_pb2.DescribePipelineOptionsRequest())
+        except grpc._channel._Rendezvous as e:
+          num_retries += 1
+          if num_retries > max_retries:
+            raise e
+
+    options_response = send_options_request()
+
+    def add_runner_options(parser):
+      for option in options_response.options:
+        try:
+          # no default values - we don't want runner options
+          # added unless they were specified by the user
+          # TODO: types
+          action = 'store'
+          if option.type == 'Boolean':
+            action = 'store_true'
+          parser.add_argument("--%s" % option.name,
+                              action=action,
+                              help=option.description
+                             )
+        except Exception as e:
+          # ignore runner options that are already present
+          # only in this case is duplicate not treated as error
+          if 'conflicting option string' not in str(e):
+            raise
 
 Review comment:
   IMO, from user perspective, these options should work like any other option 
that for example the SDK provides. Note that "runner options" can belong to 
multiple groups also. We discussed last week how collision in the global key 
namespace could be handled by using the option group as explicit qualifier. 
When there is ambiguity, the user could resolve it by specifying a namespace 
prefix. For example, --MyOptions.foo and --FlinkOptions.foo  could coexist. 
This would be separate work, but I can create a JIRA for it. The implementation 
as of this PR does not represent any change compared to current behavior WRT 
uniqueness and precedence of option keys. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 191309)
    Time Spent: 23h  (was: 22h 50m)

> PortableRunner swallows custom options for Runner
> -------------------------------------------------
>
>                 Key: BEAM-5442
>                 URL: https://issues.apache.org/jira/browse/BEAM-5442
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Maximilian Michels
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability, portability-flink
>          Time Spent: 23h
>  Remaining Estimate: 0h
>
> The PortableRunner doesn't pass custom PipelineOptions to the executing 
> Runner.
> Example: {{--parallelism=4}} won't be forwarded to the FlinkRunner.
> (The option is just removed during proto translation without any warning)
> We should allow some form of customization through the options, even for the 
> PortableRunner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to