[
https://issues.apache.org/jira/browse/BEAM-8823?focusedWorklogId=750625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-750625
]
ASF GitHub Bot logged work on BEAM-8823:
----------------------------------------
Author: ASF GitHub Bot
Created on: 31/Mar/22 00:05
Start Date: 31/Mar/22 00:05
Worklog Time Spent: 10m
Work Description: y1chi commented on a change in pull request #16841:
URL: https://github.com/apache/beam/pull/16841#discussion_r839060269
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
context.coders[window_coder_id.decode('utf-8')])
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+ """Manages the queues for ProcessBundle inputs.
+ There are three queues:
+ - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains
input
+ data that is ready to be processed. These are data such as timers past
+ their trigger time, and data to be processed.
+ The ready_inputs_queue contains tuples of (stage_name, inputs), where
+ inputs are dictionaries mapping PCollection name to data buffers.
+ - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+ contains input data that is not yet ready to be processed, and is
blocked
+ on the watermark advancing. ((stage_name, watermark), inputs), where
+ the watermark is the watermark at which the inputs should be scheduled,
+ and inputs are dictionaries mapping PCollection name to data buffers.
+ - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+ contains input data that is not yet ready to be processed, and is
blocked
+ on time advancing. ((stage_name, time), inputs), where
+ the time is the real time point at which the inputs should be scheduled,
+ and inputs are dictionaries mapping PCollection name to data buffers.
+ """
+ class KeyedQueue(Generic[QUEUE_KEY_TYPE]):
+ def __init__(self) -> None:
+ self._q: typing.Deque[Tuple[QUEUE_KEY_TYPE,
+ DataInput]] = collections.deque()
+ self._keyed_elements: MutableMapping[QUEUE_KEY_TYPE,
+ Tuple[QUEUE_KEY_TYPE,
Review comment:
aren't you passing the tuple from the self._q with deque()?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 750625)
Time Spent: 17h 10m (was: 17h)
> Make FnApiRunner work by executing ready elements instead of stages
> -------------------------------------------------------------------
>
> Key: BEAM-8823
> URL: https://issues.apache.org/jira/browse/BEAM-8823
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Priority: P3
> Time Spent: 17h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)