[ 
https://issues.apache.org/jira/browse/BEAM-5637?focusedWorklogId=155151&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155151
 ]

ASF GitHub Bot logged work on BEAM-5637:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Oct/18 22:06
            Start Date: 16/Oct/18 22:06
    Worklog Time Spent: 10m 
      Work Description: pabloem closed pull request #6680: [BEAM-5637] Python 
support for custom dataflow worker jar
URL: https://github.com/apache/beam/pull/6680
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index a0059dbb381..357c97ea6da 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -520,6 +520,12 @@ def _add_argparse_args(cls, parser):
         type=str,
         help='GCE minimum CPU platform. Default is determined by GCP.'
     )
+    parser.add_argument(
+        '--dataflow_worker_jar',
+        dest='dataflow_worker_jar',
+        type=str,
+        help='Dataflow worker jar.'
+    )
 
   def validate(self, validator):
     errors = []
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1acd3488524..4143f2dbb1d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -381,6 +381,13 @@ def run_pipeline(self, pipeline):
     self.dataflow_client = apiclient.DataflowApplicationClient(
         pipeline._options)
 
+    dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
+    if dataflow_worker_jar is not None:
+      experiments = ["use_staged_dataflow_worker_jar"]
+      if debug_options.experiments is not None:
+        experiments = list(set(experiments + debug_options.experiments))
+      debug_options.experiments = experiments
+
     # Create the job description and send a request to the service. The result
     # can be None if there is no need to send a request to the service (e.g.
     # template creation). If a request was sent and failed then the call will
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index ef7401ac6aa..cd7e24fce51 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -59,6 +59,7 @@
 from apache_beam.internal import pickler
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import WorkerOptions
 # TODO(angoenka): Remove reference to dataflow internal names
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.utils import processes
@@ -123,8 +124,7 @@ def stage_job_resources(self,
 
         Returns:
           A list of file names (no paths) for the resources staged. All the
-          files
-          are assumed to be staged at staging_location.
+          files are assumed to be staged at staging_location.
 
         Raises:
           RuntimeError: If files specified are not found or error encountered
@@ -256,6 +256,14 @@ def stage_job_resources(self,
                 'The file "%s" cannot be found. Its location was specified by '
                 'the --sdk_location command-line option.' % sdk_path)
 
+    worker_options = options.view_as(WorkerOptions)
+    dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
+    if dataflow_worker_jar is not None:
+      jar_staged_filename = 'dataflow-worker.jar'
+      staged_path = FileSystems.join(staging_location, jar_staged_filename)
+      self.stage_artifact(dataflow_worker_jar, staged_path)
+      resources.append(jar_staged_filename)
+
     # Delete all temp files created while staging job resources.
     shutil.rmtree(temp_dir)
     retrieval_token = self.commit_manifest()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 155151)
    Time Spent: 4.5h  (was: 4h 20m)

> Python support for custom dataflow worker jar
> ---------------------------------------------
>
>                 Key: BEAM-5637
>                 URL: https://issues.apache.org/jira/browse/BEAM-5637
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Henning Rohde
>            Assignee: Ruoyun Huang
>            Priority: Major
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> One of the slightly subtle aspects is that we would need to ignore one of the 
> staged jars for portable Python jobs. That requires a change to the Python 
> boot code: 
> https://github.com/apache/beam/blob/66d7c865b7267f388ee60752891a9141fad43774/sdks/python/container/boot.go#L104



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to