[ 
https://issues.apache.org/jira/browse/BEAM-5443?focusedWorklogId=149676&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149676
 ]

ASF GitHub Bot logged work on BEAM-5443:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Sep/18 21:53
            Start Date: 29/Sep/18 21:53
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6512: [BEAM-5443] Pipeline 
option defaults for portable runner.
URL: https://github.com/apache/beam/pull/6512
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 5a4c1dc9228..68e313909a3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -60,6 +60,7 @@
 from apache_beam import pvalue
 from apache_beam.internal import pickler
 from apache_beam.io.filesystems import FileSystems
+from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.options.pipeline_options import StandardOptions
@@ -152,6 +153,14 @@ def __init__(self, runner=None, options=None, argv=None):
       raise ValueError(
           'Pipeline has validations errors: \n' + '\n'.join(errors))
 
+    # set default experiments for portable runner
+    # (needs to occur prior to pipeline construction)
+    if self._options.view_as(StandardOptions).runner == 'PortableRunner':
+      experiments = (self._options.view_as(DebugOptions).experiments or [])
+      if not 'beam_fn_api' in experiments:
+        experiments.append('beam_fn_api')
+        self._options.view_as(DebugOptions).experiments = experiments
+
     # Default runner to be used.
     self.runner = runner
     # Stack of transforms generated by nested apply() calls. The stack will
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 5aba3f16076..09261c9030b 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -25,7 +25,6 @@
 
 import apache_beam as beam
 from apache_beam.options.pipeline_options import DebugOptions
-from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability import portable_runner_test
@@ -66,7 +65,6 @@ def get_runner(cls):
     def create_options(self):
       options = super(FlinkRunnerTest, self).create_options()
       options.view_as(DebugOptions).experiments = ['beam_fn_api']
-      options.view_as(SetupOptions).sdk_location = 'container'
       if streaming:
         options.view_as(StandardOptions).streaming = True
       return options
@@ -80,6 +78,9 @@ def test_read(self):
     def test_no_subtransform_composite(self):
       raise unittest.SkipTest("BEAM-4781")
 
+    def test_pardo_state_only(self):
+      raise unittest.SkipTest("BEAM-2918 - User state not yet supported.")
+
     # Inherits all other tests.
 
   # Run the tests.
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 98f073387b5..a1a4c655a4d 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -243,11 +243,6 @@ def cross_product(elem, sides):
           equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
 
   def test_pardo_state_only(self):
-    p = self.create_pipeline()
-    if not isinstance(p.runner, fn_api_runner.FnApiRunner):
-      # test is inherited by Flink PVR, which does not support the feature yet
-      self.skipTest('User state not supported.')
-
     index_state_spec = userstate.CombiningValueStateSpec(
         'index', beam.coders.VarIntCoder(), sum)
 
@@ -265,7 +260,7 @@ def process(self, kv, 
index=beam.DoFn.StateParam(index_state_spec)):
                 ('B', 'b', 2),
                 ('B', 'b', 3)]
 
-    with p:
+    with self.create_pipeline() as p:
       assert_that(p | beam.Create(inputs) | beam.ParDo(AddIndex()),
                   equal_to(expected))
 
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index ec537266324..16643c54207 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -28,6 +28,7 @@
 from apache_beam import metrics
 from apache_beam.internal import pickler
 from apache_beam.options.pipeline_options import PortableOptions
+from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
@@ -104,6 +105,12 @@ def _create_environment(options):
   def run_pipeline(self, pipeline):
     portable_options = pipeline.options.view_as(PortableOptions)
     job_endpoint = portable_options.job_endpoint
+
+    # TODO: https://issues.apache.org/jira/browse/BEAM-5525
+    # portable runner specific default
+    if pipeline.options.view_as(SetupOptions).sdk_location == 'default':
+      pipeline.options.view_as(SetupOptions).sdk_location = 'container'
+
     if not job_endpoint:
       docker = DockerizedJobServer()
       job_endpoint = docker.start()
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index dd9c8f1fd2a..b99f153697f 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -245,9 +245,7 @@ task portableWordCount(dependsOn: 'installGcpTest') {
     def options = [
             "--input=/etc/profile",
             "--output=/tmp/py-wordcount-direct",
-            "--experiments=beam_fn_api",
             "--runner=PortableRunner",
-            "--sdk_location=container",
     ]
     if (project.hasProperty("streaming"))
       options += ["--streaming"]


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 149676)
    Time Spent: 3h 20m  (was: 3h 10m)

> Simplify Python pipeline options for portable runner
> ----------------------------------------------------
>
>                 Key: BEAM-5443
>                 URL: https://issues.apache.org/jira/browse/BEAM-5443
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-harness
>    Affects Versions: 2.7.0
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Currently, the user needs to specify several extra pipeline options to run a 
> Python pipeline with the portable runner. It would be nice to remove the need 
> to explicitly provide these options when they can be inferred / have defaults.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to