y1chi commented on a change in pull request #16841:
URL: https://github.com/apache/beam/pull/16841#discussion_r839058876



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -187,6 +222,9 @@ def __init__(self,
     self._windowing = windowing
     self._grouped_output = None  # type: Optional[List[List[bytes]]]
 
+  def copy(self) -> 'GroupingBuffer':
+    return self

Review comment:
       sure, consider adding a comment on it

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains 
input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue

Review comment:
       sure

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -96,6 +108,27 @@
 # (MultiMap / Iterable).
 DataSideInput = Dict[SideInputId, Tuple[bytes, SideInputAccessPattern]]
 
+DataOutput = Dict[str, BufferId]
+
+# A map of [Transform ID, Timer Family ID] to [Buffer ID, Time Domain for 
timer]
+# The time domain comes from beam_runner_api_pb2.TimeDomain. It may be
+# EVENT_TIME or PROCESSING_TIME.
+OutputTimers = MutableMapping[TimerFamilyId, Tuple[BufferId, Any]]
+
+# A map of [Transform ID, Timer Family ID] to [Buffer CONTENTS, Timestamp]
+OutputTimerData = MutableMapping[TimerFamilyId,
+                                 Tuple['PartitionableBuffer',
+                                       timestamp.Timestamp]]
+
+BundleProcessResult = Tuple[beam_fn_api_pb2.InstructionResponse,
+                            List[beam_fn_api_pb2.ProcessBundleSplitResponse]]
+
+
+# TODO(pabloem): Change tha name to a more representative one
+class DataInput(NamedTuple):

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to