[ 
https://issues.apache.org/jira/browse/BEAM-3885?focusedWorklogId=154221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154221
 ]

ASF GitHub Bot logged work on BEAM-3885:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Oct/18 09:00
            Start Date: 15/Oct/18 09:00
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6581: [BEAM-3885] 
Translate impulse primitive for dataflow batch pipelines.
URL: https://github.com/apache/beam/pull/6581
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 868d264a9d9..728e9800a90 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -55,6 +55,7 @@
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms import window
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.utils import proto_utils
@@ -501,22 +502,28 @@ def _add_singleton_step(
   def run_Impulse(self, transform_node):
     standard_options = (
         
transform_node.outputs[None].pipeline._options.view_as(StandardOptions))
+    step = self._add_step(
+        TransformNames.READ, transform_node.full_label, transform_node)
     if standard_options.streaming:
-      step = self._add_step(
-          TransformNames.READ, transform_node.full_label, transform_node)
       step.add_property(PropertyNames.FORMAT, 'pubsub')
       step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, '_starting_signal/')
-
-      step.encoding = self._get_encoded_output_coder(transform_node)
-      step.add_property(
-          PropertyNames.OUTPUT_INFO,
-          [{PropertyNames.USER_NAME: (
-              '%s.%s' % (
-                  transform_node.full_label, PropertyNames.OUT)),
-            PropertyNames.ENCODING: step.encoding,
-            PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
     else:
-      ValueError('Impulse source for batch pipelines has not been defined.')
+      step.add_property(PropertyNames.FORMAT, 'impulse')
+      encoded_impulse_element = coders.WindowedValueCoder(
+          coders.BytesCoder(),
+          coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
+              window.GlobalWindows.windowed_value(''))
+      step.add_property(PropertyNames.IMPULSE_ELEMENT,
+                        
self.byte_array_to_json_string(encoded_impulse_element))
+
+    step.encoding = self._get_encoded_output_coder(transform_node)
+    step.add_property(
+        PropertyNames.OUTPUT_INFO,
+        [{PropertyNames.USER_NAME: (
+            '%s.%s' % (
+                transform_node.full_label, PropertyNames.OUT)),
+          PropertyNames.ENCODING: step.encoding,
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
   def run_Flatten(self, transform_node):
     step = self._add_step(TransformNames.FLATTEN,
@@ -775,9 +782,18 @@ def run_CombineValues(self, transform_node):
          PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
     step.add_property(PropertyNames.OUTPUT_INFO, outputs)
 
-  def apply_Read(self, unused_transform, pbegin):
-    # Always consider Read to be a primitive for dataflow.
-    return beam.pvalue.PCollection(pbegin.pipeline)
+  def apply_Read(self, transform, pbegin):
+    if hasattr(transform.source, 'format'):
+      # Consider native Read to be a primitive for dataflow.
+      return beam.pvalue.PCollection(pbegin.pipeline)
+    else:
+      options = pbegin.pipeline.options.view_as(DebugOptions)
+      if options.experiments and 'beam_fn_api' in options.experiments:
+        # Expand according to FnAPI primitives.
+        return self.apply_PTransform(transform, pbegin)
+      else:
+        # Custom Read is also a primitive for non-FnAPI on dataflow.
+        return beam.pvalue.PCollection(pbegin.pipeline)
 
   def run_Read(self, transform_node):
     transform = transform_node.transform
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py 
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 7066e7f7627..fd91d3d6e79 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -95,6 +95,7 @@ class PropertyNames(object):
   FILE_NAME_SUFFIX = 'filename_suffix'
   FORMAT = 'format'
   INPUTS = 'inputs'
+  IMPULSE_ELEMENT = 'impulse_element'
   NON_PARALLEL_INPUTS = 'non_parallel_inputs'
   NUM_SHARDS = 'num_shards'
   OUT = 'out'


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 154221)
    Time Spent: 50m  (was: 40m)

> Python SDK Read IO is expressed as Impulse -> ParDo
> ---------------------------------------------------
>
>                 Key: BEAM-3885
>                 URL: https://issues.apache.org/jira/browse/BEAM-3885
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Ben Sidhom
>            Assignee: Ahmet Altay
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Portable runners cannot understand SDK-specific Read transforms. The Python 
> SDK will need to rewrite Read as Impulse followed by a ParDo that actually 
> does the IO.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to