lukecwik commented on code in PR #10648:
URL: https://github.com/apache/beam/pull/10648#discussion_r938026539


##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -320,24 +320,32 @@ def visit_transform(self, transform_node):
           for ix, side_input in enumerate(transform_node.side_inputs):
             access_pattern = side_input._side_input_data().access_pattern
             if access_pattern == common_urns.side_inputs.ITERABLE.urn:
-              # Add a map to ('', value) as Dataflow currently only handles
-              # keyed side inputs.
-              pipeline = side_input.pvalue.pipeline
-              new_side_input = _DataflowIterableSideInput(side_input)
-              new_side_input.pvalue = beam.pvalue.PCollection(
-                  pipeline,
-                  element_type=typehints.KV[
-                      bytes, side_input.pvalue.element_type],
-                  is_bounded=side_input.pvalue.is_bounded)
-              parent = transform_node.parent or pipeline._root_transform()
-              map_to_void_key = beam.pipeline.AppliedPTransform(
-                  pipeline,
-                  beam.Map(lambda x: (b'', x)),
-                  transform_node.full_label + '/MapToVoidKey%s' % ix,
-                  (side_input.pvalue,))
-              new_side_input.pvalue.producer = map_to_void_key
-              map_to_void_key.add_output(new_side_input.pvalue)
-              parent.add_part(map_to_void_key)
+              if use_unified_worker:
+                # Patch up the access pattern to appease Dataflow when using
+                # the UW and hardcode the output type to be Any since
+                # the Dataflow JSON and pipeline proto can differ in coders
+                # which leads to encoding/decoding issues within the runner.
+                side_input.pvalue.element_type = typehints.Any
+                new_side_input = _DataflowIterableSideInput(side_input)
+              else:
+                # Add a map to ('', value) as Dataflow currently only handles
+                # keyed side inputs when using the JRH.
+                pipeline = side_input.pvalue.pipeline
+                new_side_input = _DataflowIterableAsMultimapSideInput(
+                    side_input)

Review Comment:
   _DataflowIterableSideInput was poorly named as it was using a multimap as 
the access pattern URN.
   
   You can see in this change that _DataflowIterableSideInput was renamed to 
_DataflowIterableAsMultimapSideInput preserving the existing behavior.
   
   Also the JRH (non-UW case) only supports the multimap access pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to