[
https://issues.apache.org/jira/browse/BEAM-5190?focusedWorklogId=137603&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137603
]
ASF GitHub Bot logged work on BEAM-5190:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Aug/18 22:20
Start Date: 23/Aug/18 22:20
Worklog Time Spent: 10m
Work Description: pabloem closed pull request #6267: [BEAM-5190] Fixing
Python SDK thread count for Portable Runners
URL: https://github.com/apache/beam/pull/6267
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 19ab147644a..19b54dadd36 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -32,6 +32,8 @@
from google.protobuf import text_format
from apache_beam.internal import pickler
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
@@ -110,9 +112,10 @@ def main(unused_argv):
thread.start()
if 'PIPELINE_OPTIONS' in os.environ:
- sdk_pipeline_options = json.loads(os.environ['PIPELINE_OPTIONS'])
+ sdk_pipeline_options = _parse_pipeline_options(
+ os.environ['PIPELINE_OPTIONS'])
else:
- sdk_pipeline_options = {}
+ sdk_pipeline_options = PipelineOptions.from_dictionary({})
if 'SEMI_PERSISTENT_DIRECTORY' in os.environ:
semi_persistent_directory = os.environ['SEMI_PERSISTENT_DIRECTORY']
@@ -130,7 +133,7 @@ def main(unused_argv):
try:
logging.info('Python sdk harness started with pipeline_options: %s',
- sdk_pipeline_options)
+ sdk_pipeline_options.get_all_options(drop_default=True))
service_descriptor = endpoints_pb2.ApiServiceDescriptor()
text_format.Merge(os.environ['CONTROL_API_SERVICE_DESCRIPTOR'],
service_descriptor)
@@ -148,6 +151,21 @@ def main(unused_argv):
fn_log_handler.close()
+def _parse_pipeline_options(options_json):
+ options = json.loads(options_json)
+ # Check the options field first for backward compatibility.
+ if 'options' in options:
+ return PipelineOptions.from_dictionary(options.get('options'))
+ else:
+ # Remove extra urn part from the key.
+ portable_option_regex = r'^beam:option:(?P<key>.*):v1$'
+ return PipelineOptions.from_dictionary({
+ re.match(portable_option_regex, k).group('key')
+ if re.match(portable_option_regex, k) else k: v
+ for k, v in options.iteritems()
+ })
+
+
def _get_worker_count(pipeline_options):
"""Extract worker count from the pipeline_options.
@@ -163,11 +181,7 @@ def _get_worker_count(pipeline_options):
Returns:
an int containing the worker_threads to use. Default is 1
"""
- pipeline_options = pipeline_options.get(
- 'options') if 'options' in pipeline_options else {}
- experiments = pipeline_options.get(
- 'experiments'
- ) if pipeline_options and 'experiments' in pipeline_options else []
+ experiments = pipeline_options.view_as(DebugOptions).experiments
experiments = experiments if experiments else []
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
index fdb1551705e..6b5972e4ac4 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -24,11 +24,25 @@
import logging
import unittest
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.worker import sdk_worker_main
class SdkWorkerMainTest(unittest.TestCase):
+ # Used for testing newly added flags.
+ class MockOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--eam:option:m_option:v', help='mock option')
+ parser.add_argument('--eam:option:m_option:v1', help='mock option')
+ parser.add_argument('--beam:option:m_option:v', help='mock option')
+ parser.add_argument('--m_flag', action='store_true', help='mock flag')
+ parser.add_argument('--m_option', help='mock option')
+ parser.add_argument(
+ '--m_m_option', action='append', help='mock multi option')
+
def test_status_server(self):
# Wrapping the method to see if it appears in threadump
@@ -42,29 +56,75 @@ def wrapped_method_for_test():
def test_work_count_default_value(self):
self._check_worker_count('{}', 12)
+ def test_parse_pipeine_options(self):
+ expected_options = PipelineOptions()
+ expected_options.view_as(
+ SdkWorkerMainTest.MockOptions).m_m_option = [
+ 'worker_threads=1', 'beam_fn_api'
+ ]
+ expected_options.view_as(
+ SdkWorkerMainTest.MockOptions).m_option = '/tmp/requirements.txt'
+ self.assertEqual(
+ {'m_m_option': ['worker_threads=1']},
+ sdk_worker_main._parse_pipeline_options(
+ '{"options": {"m_m_option":["worker_threads=1"]}}')
+ .get_all_options(drop_default=True))
+ self.assertEqual(
+ expected_options.get_all_options(),
+ sdk_worker_main._parse_pipeline_options(
+ '{"options": {' +
+ '"m_option": "/tmp/requirements.txt", ' +
+ '"m_m_option":["worker_threads=1", "beam_fn_api"]' +
+ '}}').get_all_options())
+ self.assertEqual(
+ {'m_m_option': ['worker_threads=1']},
+ sdk_worker_main._parse_pipeline_options(
+ '{"beam:option:m_m_option:v1":["worker_threads=1"]}')
+ .get_all_options(drop_default=True))
+ self.assertEqual(
+ expected_options.get_all_options(),
+ sdk_worker_main._parse_pipeline_options(
+ '{"beam:option:m_option:v1": "/tmp/requirements.txt", ' +
+ '"beam:option:m_m_option:v1":["worker_threads=1", ' +
+ '"beam_fn_api"]}').get_all_options())
+ self.assertEqual(
+ {'beam:option:m_option:v': 'mock_val'},
+ sdk_worker_main._parse_pipeline_options(
+ '{"options": {"beam:option:m_option:v":"mock_val"}}')
+ .get_all_options(drop_default=True))
+ self.assertEqual(
+ {'eam:option:m_option:v1': 'mock_val'},
+ sdk_worker_main._parse_pipeline_options(
+ '{"options": {"eam:option:m_option:v1":"mock_val"}}')
+ .get_all_options(drop_default=True))
+ self.assertEqual(
+ {'eam:option:m_option:v': 'mock_val'},
+ sdk_worker_main._parse_pipeline_options(
+ '{"options": {"eam:option:m_option:v":"mock_val"}}')
+ .get_all_options(drop_default=True))
+
def test_work_count_custom_value(self):
- self._check_worker_count(
- '{"options": {"experiments":["worker_threads=1"]}}', 1)
- self._check_worker_count(
- '{"options": {"experiments":["worker_threads=4"]}}', 4)
- self._check_worker_count(
- '{"options": {"experiments":["worker_threads=12"]}}', 12)
+ self._check_worker_count('{"experiments":["worker_threads=1"]}', 1)
+ self._check_worker_count('{"experiments":["worker_threads=4"]}', 4)
+ self._check_worker_count('{"experiments":["worker_threads=12"]}', 12)
def test_work_count_wrong_format(self):
self._check_worker_count(
- '{"options": {"experiments":["worker_threads="]}}', exception=True)
+ '{"experiments":["worker_threads="]}', exception=True)
self._check_worker_count(
- '{"options": {"experiments":["worker_threads=a"]}}', exception=True)
+ '{"experiments":["worker_threads=a"]}', exception=True)
self._check_worker_count(
- '{"options": {"experiments":["worker_threads=1a"]}}', exception=True)
+ '{"experiments":["worker_threads=1a"]}', exception=True)
def _check_worker_count(self, pipeline_options, expected=0, exception=False):
if exception:
- self.assertRaises(Exception, sdk_worker_main._get_worker_count,
- json.loads(pipeline_options))
+ self.assertRaises(
+ Exception, sdk_worker_main._get_worker_count,
+ PipelineOptions.from_dictionary(json.loads(pipeline_options)))
else:
self.assertEquals(
- sdk_worker_main._get_worker_count(json.loads(pipeline_options)),
+ sdk_worker_main._get_worker_count(
+ PipelineOptions.from_dictionary(json.loads(pipeline_options))),
expected)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 137603)
Time Spent: 1h (was: 50m)
> Python pipeline options are not picked correctly by PortableRunner
> ------------------------------------------------------------------
>
> Key: BEAM-5190
> URL: https://issues.apache.org/jira/browse/BEAM-5190
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Reporter: Ankur Goenka
> Assignee: Ankur Goenka
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Python SDK worker is deserializing the pipeline options to dictionary instead
> of PipelineOptions
> Sample log
> [grpc-default-executor-2] INFO sdk_worker_main.main - Python sdk harness
> started with pipeline_options: \{u'beam:option:flink_master:v1': u'[auto]',
> u'beam:option:streaming:v1': False, u'beam:option:experiments:v1':
> [u'beam_fn_api', u'worker_threads=50'], u'beam:option:dry_run:v1': False,
> u'beam:option:runner:v1': None, u'beam:option:profile_memory:v1': False,
> u'beam:option:runtime_type_check:v1': False, u'beam:option:region:v1':
> u'us-central1', u'beam:option:options_id:v1': 1, u'beam:option:no_auth:v1':
> False, u'beam:option:dataflow_endpoint:v1':
> u'https://dataflow.googleapis.com', u'beam:option:sdk_location:v1':
> u'/usr/local/google/home/goenka/d/work/beam/beam/sdks/python/dist/apache-beam-2.7.0.dev0.tar.gz',
> u'beam:option:direct_runner_use_stacked_bundle:v1': True,
> u'beam:option:save_main_session:v1': True,
> u'beam:option:type_check_strictness:v1': u'DEFAULT_TO_ANY',
> u'beam:option:profile_cpu:v1': False, u'beam:option:job_endpoint:v1':
> u'localhost:8099', u'beam:option:job_name:v1':
> u'BeamApp-goenka-0822071645-48ae1008', u'beam:option:temp_location:v1':
> u'gs://clouddfe-goenka/tmp/', u'beam:option:app_name:v1': None,
> u'beam:option:project:v1': u'google.com:clouddfe',
> u'beam:option:pipeline_type_check:v1': True,
> u'beam:option:staging_location:v1': u'gs://clouddfe-goenka/tmp/staging'}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)