[ 
https://issues.apache.org/jira/browse/BEAM-5617?focusedWorklogId=160772&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-160772
 ]

ASF GitHub Bot logged work on BEAM-5617:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Oct/18 18:53
            Start Date: 30/Oct/18 18:53
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #6844: [BEAM-5617] Use bytes 
consistently for pcollection ids.
URL: https://github.com/apache/beam/pull/6844
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 67414088c8e..832cba95180 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -62,7 +62,7 @@
     beam.coders.coders.GlobalWindowCoder()).get_impl().encode_nested(
         beam.transforms.window.GlobalWindows.windowed_value(b''))
 
-IMPULSE_BUFFER_PREFIX = b'impulse:'
+IMPULSE_BUFFER = b'impulse'
 
 
 class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):
@@ -458,14 +458,12 @@ def impulse_to_input(stages):
         for transform in list(stage.transforms):
           if transform.spec.urn == common_urns.primitives.IMPULSE.urn:
             stage.transforms.remove(transform)
-            impulse_pc = only_element(transform.outputs.values())
             stage.transforms.append(
                 beam_runner_api_pb2.PTransform(
                     unique_name=transform.unique_name,
                     spec=beam_runner_api_pb2.FunctionSpec(
                         urn=bundle_processor.DATA_INPUT_URN,
-                        payload=IMPULSE_BUFFER_PREFIX +
-                        impulse_pc.encode('utf-8')),
+                        payload=IMPULSE_BUFFER),
                     outputs=transform.outputs))
 
         yield stage
@@ -618,7 +616,7 @@ def expand_gbk(stages):
                 pipeline_components.pcollections[pcoll_id], 
pipeline_components)
 
           # This is used later to correlate the read and write.
-          param = str("group:%s" % stage.name).encode('utf-8')
+          grouping_buffer = create_buffer_id(stage.name, kind='group')
           if stage.name not in pipeline_components.transforms:
             pipeline_components.transforms[stage.name].CopyFrom(transform)
           gbk_write = Stage(
@@ -628,7 +626,7 @@ def expand_gbk(stages):
                   inputs=transform.inputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_OUTPUT_URN,
-                      payload=param))],
+                      payload=grouping_buffer))],
               downstream_side_inputs=frozenset(),
               must_follow=stage.must_follow)
           yield gbk_write
@@ -640,7 +638,7 @@ def expand_gbk(stages):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      payload=param))],
+                      payload=grouping_buffer))],
               downstream_side_inputs=stage.downstream_side_inputs,
               must_follow=union(frozenset([gbk_write]), stage.must_follow))
         else:
@@ -660,7 +658,7 @@ def sink_flattens(stages):
         transform = stage.transforms[0]
         if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
           # This is used later to correlate the read and writes.
-          param = str("materialize:%s" % transform.unique_name).encode('utf-8')
+          buffer_id = create_buffer_id(transform.unique_name)
           output_pcoll_id, = list(transform.outputs.values())
           output_coder_id = pcollections[output_pcoll_id].coder_id
           flatten_writes = []
@@ -696,7 +694,7 @@ def sink_flattens(stages):
                     inputs={local_in: transcoded_pcollection},
                     spec=beam_runner_api_pb2.FunctionSpec(
                         urn=bundle_processor.DATA_OUTPUT_URN,
-                        payload=param))],
+                        payload=buffer_id))],
                 downstream_side_inputs=frozenset(),
                 must_follow=stage.must_follow)
             flatten_writes.append(flatten_write)
@@ -709,7 +707,7 @@ def sink_flattens(stages):
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
-                      payload=param))],
+                      payload=buffer_id))],
               downstream_side_inputs=stage.downstream_side_inputs,
               must_follow=union(frozenset(flatten_writes), stage.must_follow))
 
@@ -808,7 +806,6 @@ def fuse(producer, consumer):
 
       # Now try to fuse away all pcollections.
       for pcoll, producer in producers_by_pcoll.items():
-        pcoll_as_param = str("materialize:%s" % pcoll).encode('utf-8')
         write_pcoll = None
         for consumer in consumers_by_pcoll[pcoll]:
           producer = replacement(producer)
@@ -820,6 +817,7 @@ def fuse(producer, consumer):
             fuse(producer, consumer)
           else:
             # If we can't fuse, do a read + write.
+            buffer_id = create_buffer_id(pcoll)
             if write_pcoll is None:
               write_pcoll = Stage(
                   pcoll + '/Write',
@@ -828,7 +826,7 @@ def fuse(producer, consumer):
                       inputs={'in': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_OUTPUT_URN,
-                          payload=pcoll_as_param))])
+                          payload=buffer_id))])
               fuse(producer, write_pcoll)
             if consumer.has_as_main_input(pcoll):
               read_pcoll = Stage(
@@ -838,7 +836,7 @@ def fuse(producer, consumer):
                       outputs={'out': pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_INPUT_URN,
-                          payload=pcoll_as_param))],
+                          payload=buffer_id))],
                   must_follow=frozenset([write_pcoll]))
               fuse(read_pcoll, consumer)
             else:
@@ -922,16 +920,16 @@ def inject_timer_pcollections(stages):
                       outputs={'out': timer_read_pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_INPUT_URN,
-                          payload=('timers:%s' % timer_read_pcoll).encode(
-                              'utf-8'))))
+                          payload=create_buffer_id(
+                              timer_read_pcoll, kind='timers'))))
               stage.transforms.append(
                   beam_runner_api_pb2.PTransform(
                       unique_name=timer_write_pcoll + '/Write',
                       inputs={'in': timer_write_pcoll},
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_OUTPUT_URN,
-                          payload=('timers:%s' % timer_write_pcoll).encode(
-                              'utf-8'))))
+                          payload=create_buffer_id(
+                              timer_write_pcoll, kind='timers'))))
               assert tag not in transform.inputs
               transform.inputs[tag] = timer_read_pcoll
               assert tag not in transform.outputs
@@ -1044,7 +1042,7 @@ def extract_endpoints(stage):
           pcoll_id = transform.spec.payload
           if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
             target = transform.unique_name, only_element(transform.outputs)
-            if pcoll_id.startswith(IMPULSE_BUFFER_PREFIX):
+            if pcoll_id == IMPULSE_BUFFER:
               data_input[target] = [ENCODED_IMPULSE_VALUE]
             else:
               data_input[target] = pcoll_buffers[pcoll_id]
@@ -1067,7 +1065,7 @@ def extract_endpoints(stage):
               transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
           for tag, si in payload.side_inputs.items():
             data_side_input[transform.unique_name, tag] = (
-                'materialize:' + transform.inputs[tag],
+                create_buffer_id(transform.inputs[tag]),
                 beam.pvalue.SideInputData.from_runner_api(si, context))
       return data_input, data_side_input, data_output
 
@@ -1090,12 +1088,12 @@ def extract_endpoints(stage):
           controller.state_api_service_descriptor().url)
 
     # Store the required side inputs into state.
-    for (transform_id, tag), (pcoll_id, si) in data_side_input.items():
-      actual_pcoll_id = pcoll_id[len(b"materialize:"):]
+    for (transform_id, tag), (buffer_id, si) in data_side_input.items():
+      _, pcoll_id = split_buffer_id(buffer_id)
       value_coder = context.coders[safe_coders[
-          pipeline_components.pcollections[actual_pcoll_id].coder_id]]
+          pipeline_components.pcollections[pcoll_id].coder_id]]
       elements_by_window = _WindowGroupingBuffer(si, value_coder)
-      for element_data in pcoll_buffers[pcoll_id]:
+      for element_data in pcoll_buffers[buffer_id]:
         elements_by_window.append(element_data)
       for key, window, elements_data in elements_by_window.encoded_items():
         state_key = beam_fn_api_pb2.StateKey(
@@ -1106,16 +1104,16 @@ def extract_endpoints(stage):
                 key=key))
         controller.state_handler.blocking_append(state_key, elements_data)
 
-    def get_buffer(pcoll_id):
-      if (pcoll_id.startswith(b'materialize:')
-          or pcoll_id.startswith(b'timers:')):
-        if pcoll_id not in pcoll_buffers:
+    def get_buffer(buffer_id):
+      kind, name = split_buffer_id(buffer_id)
+      if kind in ('materialize', 'timers'):
+        if buffer_id not in pcoll_buffers:
           # Just store the data chunks for replay.
-          pcoll_buffers[pcoll_id] = list()
-      elif pcoll_id.startswith(b'group:'):
+          pcoll_buffers[buffer_id] = list()
+      elif kind == 'group':
         # This is a grouping write, create a grouping buffer if needed.
-        if pcoll_id not in pcoll_buffers:
-          original_gbk_transform = pcoll_id.split(b':', 1)[1]
+        if buffer_id not in pcoll_buffers:
+          original_gbk_transform = name
           transform_proto = pipeline_components.transforms[
               original_gbk_transform]
           input_pcoll = only_element(list(transform_proto.inputs.values()))
@@ -1127,13 +1125,13 @@ def get_buffer(pcoll_id):
           windowing_strategy = context.windowing_strategies[
               pipeline_components
               .pcollections[output_pcoll].windowing_strategy_id]
-          pcoll_buffers[pcoll_id] = _GroupingBuffer(
+          pcoll_buffers[buffer_id] = _GroupingBuffer(
               pre_gbk_coder, post_gbk_coder, windowing_strategy)
       else:
         # These should be the only two identifiers we produce for now,
         # but special side input writes may go here.
-        raise NotImplementedError(pcoll_id)
-      return pcoll_buffers[pcoll_id]
+        raise NotImplementedError(buffer_id)
+      return pcoll_buffers[buffer_id]
 
     for k in range(self._bundle_repeat):
       try:
@@ -1153,7 +1151,8 @@ def get_buffer(pcoll_id):
       for transform_id, timer_writes in stage.timer_pcollections:
         windowed_timer_coder_impl = context.coders[
             pipeline_components.pcollections[timer_writes].coder_id].get_impl()
-        written_timers = get_buffer(b'timers:' + timer_writes.encode('utf-8'))
+        written_timers = get_buffer(
+            create_buffer_id(timer_writes, kind='timers'))
         if written_timers:
           # Keep only the "last" timer set per key and window.
           timers_by_key_and_window = {}
@@ -1636,3 +1635,11 @@ def unique_name(existing, prefix):
         return prefix_counter
   else:
     return prefix
+
+
+def create_buffer_id(name, kind='materialize'):
+  return ('%s:%s' % (kind, name)).encode('utf-8')
+
+
+def split_buffer_id(buffer_id):
+  return buffer_id.decode('utf-8').split(':', 1)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 160772)
    Time Spent: 1.5h  (was: 1h 20m)

> Side inputs don't work on Python 3 
> -----------------------------------
>
>                 Key: BEAM-5617
>                 URL: https://issues.apache.org/jira/browse/BEAM-5617
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Assignee: Robert Bradshaw
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> ======================================================================
> ERROR: test_iterable_side_input 
> (apache_beam.transforms.sideinputs_test.SideInputsTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File 
> "/usr/local/google/home/valentyn/projects/beam/clean_head/beam/sdks/python/apache_beam/runners/common.py",
>  line 677, in process
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File 
> "/usr/local/google/home/valentyn/projects/beam/clean_head/beam/sdks/python/apache_beam/runners/common.py",
>  line 414, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File 
> "/usr/local/google/home/valentyn/projects/beam/clean_head/beam/sdks/python/apache_beam/transforms/core.py",
>  line 1068, in <lambda>
>     wrapper = lambda x: [fn(x)]
>   File 
> "/usr/local/google/home/valentyn/projects/beam/clean_head/beam/sdks/python/apache_beam/testing/util.py",
>  line 119, in _equal
>     'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
> apache_beam.testing.util.BeamAssertException: Failed assert: [3, 4, 6, 8] == 
> []



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to