[
https://issues.apache.org/jira/browse/BEAM-4549?focusedWorklogId=112227&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112227
]
ASF GitHub Bot logged work on BEAM-4549:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Jun/18 08:16
Start Date: 15/Jun/18 08:16
Worklog Time Spent: 10m
Work Description: charlesccychen closed pull request #5655: [BEAM-4549]
Fix side inputs for streaming pipelines on Dataflow
URL: https://github.com/apache/beam/pull/5655
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index 482afa680fc..1a2c9de12fd 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -50,6 +50,7 @@
import collections
import logging
import os
+import re
import shutil
import tempfile
from builtins import object
@@ -849,7 +850,8 @@ def is_side_input(tag):
for tag, id in proto.inputs.items()
if not is_side_input(tag)]
# Ordering is important here.
- indexed_side_inputs = [(int(tag[4:]), context.pcollections.get_by_id(id))
+ indexed_side_inputs = [(int(re.match('side([0-9]+)(-.*)?$', tag).group(1)),
+ context.pcollections.get_by_id(id))
for tag, id in proto.inputs.items()
if is_side_input(tag)]
side_inputs = [si for _, si in sorted(indexed_side_inputs)]
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ec25ce01ad1..81ccd70f964 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -49,6 +49,7 @@
from apache_beam.runners.runner import PValueCache
from apache_beam.transforms.display import DisplayData
from apache_beam.typehints import typehints
+from apache_beam.utils import proto_utils
from apache_beam.utils.plugin import BeamPlugin
__all__ = ['DataflowRunner']
@@ -312,15 +313,16 @@ def run_pipeline(self, pipeline):
pipeline.visit(self.side_input_visitor())
# Snapshot the pipeline in a portable proto before mutating it
- proto_pipeline, self.proto_context = pipeline.to_runner_api(
+ self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True)
# TODO(BEAM-2717): Remove once Coders are already in proto.
- for pcoll in proto_pipeline.components.pcollections.values():
+ for pcoll in self.proto_pipeline.components.pcollections.values():
if pcoll.coder_id not in self.proto_context.coders:
coder = coders.registry.get_coder(pickler.loads(pcoll.coder_id))
pcoll.coder_id = self.proto_context.coders.get_id(coder)
- self.proto_context.coders.populate_map(proto_pipeline.components.coders)
+ self.proto_context.coders.populate_map(
+ self.proto_pipeline.components.coders)
# Performing configured PTransform overrides.
pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
@@ -332,7 +334,7 @@ def run_pipeline(self, pipeline):
plugins = list(set(plugins + setup_options.beam_plugins))
setup_options.beam_plugins = plugins
- self.job = apiclient.Job(pipeline._options, proto_pipeline)
+ self.job = apiclient.Job(pipeline._options, self.proto_pipeline)
# Dataflow runner requires a KV type for GBK inputs, hence we enforce that
# here.
@@ -587,10 +589,15 @@ def run_ParDo(self, transform_node):
si_labels = {}
full_label_counts = defaultdict(int)
lookup_label = lambda side_pval: si_labels[side_pval]
+ named_inputs = transform_node.named_inputs()
+ label_renames = {}
for ix, side_pval in enumerate(transform_node.side_inputs):
assert isinstance(side_pval, AsSideInput)
step_name = 'SideInput-' + self._get_unique_step_name()
- si_label = 'side%d' % ix
+ si_label = 'side%d-%s' % (ix, transform_node.full_label)
+ old_label = 'side%d' % ix
+ label_renames[old_label] = si_label
+ assert old_label in named_inputs
pcollection_label = '%s.%s' % (
side_pval.pvalue.producer.full_label.split('/')[-1],
side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
@@ -626,9 +633,30 @@ def run_ParDo(self, transform_node):
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.runners.dataflow.internal import apiclient
transform_proto = self.proto_context.transforms.get_proto(transform_node)
+ transform_id = self.proto_context.transforms.get_id(transform_node)
if (apiclient._use_fnapi(transform_node.inputs[0].pipeline._options)
and transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
- serialized_data = self.proto_context.transforms.get_id(transform_node)
+ # Patch side input ids to be unique across a given pipeline.
+ if (label_renames and
+ transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
+ # Patch PTransform proto.
+ for old, new in label_renames.iteritems():
+ transform_proto.inputs[new] = transform_proto.inputs[old]
+ del transform_proto.inputs[old]
+
+ # Patch ParDo proto.
+ proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
+ proto = proto_utils.parse_Bytes(transform_proto.spec.payload,
+ proto_type)
+ for old, new in label_renames.iteritems():
+ proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
+ del proto.side_inputs[old]
+ transform_proto.spec.payload = proto.SerializeToString()
+ # We need to update the pipeline proto.
+ del self.proto_pipeline.components.transforms[transform_id]
+ (self.proto_pipeline.components.transforms[transform_id]
+ .CopyFrom(transform_proto))
+ serialized_data = transform_id
else:
serialized_data = pickler.dumps(
self._pardo_fn_data(transform_node, lookup_label))
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index b94ab8a4f0d..7727c87c4c7 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -25,6 +25,7 @@
import collections
import json
import logging
+import re
import apache_beam as beam
from apache_beam.coders import WindowedValueCoder
@@ -499,7 +500,9 @@ def _create_pardo_operation(
tagged_side_inputs = [
(tag, beam.pvalue.SideInputData.from_runner_api(si, factory.context))
for tag, si in side_inputs_proto.items()]
- tagged_side_inputs.sort(key=lambda tag_si: int(tag_si[0][4:]))
+ tagged_side_inputs.sort(
+ key=lambda tag_si: int(re.match('side([0-9]+)(-.*)?$',
+ tag_si[0]).group(1)))
side_input_maps = [
StateBackedSideInputMap(
factory.state_handler,
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index f02609ad2c9..94d5bb9c960 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -22,6 +22,7 @@
import copy
import inspect
import random
+import re
import types
from six import string_types
@@ -891,8 +892,9 @@ def from_runner_api_parameter(pardo_payload, context):
# This is an ordered list stored as a dict (see the comments in
# to_runner_api_parameter above).
indexed_side_inputs = [
- (int(ix[4:]), pvalue.AsSideInput.from_runner_api(si, context))
- for ix, si in pardo_payload.side_inputs.items()]
+ (int(re.match('side([0-9]+)(-.*)?$', tag).group(1)),
+ pvalue.AsSideInput.from_runner_api(si, context))
+ for tag, si in pardo_payload.side_inputs.items()]
result.side_inputs = [si for _, si in sorted(indexed_side_inputs)]
return result
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 112227)
Time Spent: 3h (was: 2h 50m)
> Streaming pipelines with multiple side inputs fail on DataflowRunner
> --------------------------------------------------------------------
>
> Key: BEAM-4549
> URL: https://issues.apache.org/jira/browse/BEAM-4549
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Charles Chen
> Assignee: Charles Chen
> Priority: Blocker
> Fix For: 2.5.0
>
> Time Spent: 3h
> Remaining Estimate: 0h
>
> Streaming pipelines with multiple side inputs currently fail on
> DataflowRunner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)