[ 
https://issues.apache.org/jira/browse/BEAM-4549?focusedWorklogId=112026&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112026
 ]

ASF GitHub Bot logged work on BEAM-4549:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/18 19:55
            Start Date: 14/Jun/18 19:55
    Worklog Time Spent: 10m 
      Work Description: jbonofre closed pull request #5636: [BEAM-4549] (CP 
#5623) Use per-pipeline unique ids for side inputs in DataflowRunner
URL: https://github.com/apache/beam/pull/5636
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ec25ce01ad1..ca9e892fc2d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -423,12 +423,13 @@ def _get_encoded_output_coder(self, transform_node, 
window_value=True):
     return self._get_typehint_based_encoding(
         element_type, window_coder=window_coder)
 
-  def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
+  def _add_step(self, step_kind, step_label, transform_node, side_tags=(),
+                step_name=None):
     """Creates a Step object and adds it to the cache."""
     # Import here to avoid adding the dependency for local running scenarios.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.runners.dataflow.internal import apiclient
-    step = apiclient.Step(step_kind, self._get_unique_step_name())
+    step = apiclient.Step(step_kind, step_name or self._get_unique_step_name())
     self.job.proto.steps.append(step.proto)
     step.add_property(PropertyNames.USER_NAME, step_label)
     # Cache the node/step association for the main output of the transform 
node.
@@ -581,6 +582,8 @@ def run_ParDo(self, transform_node):
     input_tag = transform_node.inputs[0].tag
     input_step = self._cache.get_pvalue(transform_node.inputs[0])
 
+    pardo_step_name = self._get_unique_step_name()
+
     # Attach side inputs.
     si_dict = {}
     # We must call self._cache.get_pvalue exactly once due to refcounting.
@@ -590,7 +593,7 @@ def run_ParDo(self, transform_node):
     for ix, side_pval in enumerate(transform_node.side_inputs):
       assert isinstance(side_pval, AsSideInput)
       step_name = 'SideInput-' + self._get_unique_step_name()
-      si_label = 'side%d' % ix
+      si_label = 'side-%s-%d' % (pardo_step_name, ix)
       pcollection_label = '%s.%s' % (
           side_pval.pvalue.producer.full_label.split('/')[-1],
           side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
@@ -621,7 +624,8 @@ def run_ParDo(self, transform_node):
             '/{}'.format(transform_name)
             if transform_node.side_inputs else ''),
         transform_node,
-        transform_node.transform.output_tags)
+        transform_node.transform.output_tags,
+        step_name=pardo_step_name)
     # Import here to avoid adding the dependency for local running scenarios.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.runners.dataflow.internal import apiclient


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 112026)
    Time Spent: 1h 40m  (was: 1.5h)

> Streaming pipelines with multiple side inputs fail on DataflowRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-4549
>                 URL: https://issues.apache.org/jira/browse/BEAM-4549
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Charles Chen
>            Assignee: Charles Chen
>            Priority: Blocker
>             Fix For: 2.5.0
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Streaming pipelines with multiple side inputs currently fail on 
> DataflowRunner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to