y1chi commented on a change in pull request #16841:
URL: https://github.com/apache/beam/pull/16841#discussion_r839060269



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains 
input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on time advancing. ((stage_name, time), inputs), where
+       the time is the real time point at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+  """
+  class KeyedQueue(Generic[QUEUE_KEY_TYPE]):
+    def __init__(self) -> None:
+      self._q: typing.Deque[Tuple[QUEUE_KEY_TYPE,
+                                  DataInput]] = collections.deque()
+      self._keyed_elements: MutableMapping[QUEUE_KEY_TYPE,
+                                           Tuple[QUEUE_KEY_TYPE,

Review comment:
       aren't you passing the tuple from the self._q with deque()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to