y1chi commented on a change in pull request #16841:
URL: https://github.com/apache/beam/pull/16841#discussion_r834533026



##########
File path: sdks/python/apache_beam/examples/matrix_power.py
##########
@@ -0,0 +1,103 @@
+#

Review comment:
       Are the examples related to the change? if not, prefer splitting into 
different PR.

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -187,6 +222,9 @@ def __init__(self,
     self._windowing = windowing
     self._grouped_output = None  # type: Optional[List[List[bytes]]]
 
+  def copy(self) -> 'GroupingBuffer':
+    return self

Review comment:
       This is confusing to me, is copy suppose to create a new object?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}
+        else:
+          assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN
+          coder_id = self.data_channel_coders[only_element(
+              transform.inputs.values())]
+        # For every DATA_INPUT or DATA_OUTPUT operation, we need to replace the
+        # payload with the GRPC configuration for the Data channel.
+        bundle_manager = self.bundle_manager_for(stage)
+        data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
+        data_api_service_descriptor = (
+            bundle_manager.data_api_service_descriptor())
+        if data_api_service_descriptor:
+          data_spec.api_service_descriptor.url = (
+              data_api_service_descriptor.url)
+        transform.spec.payload = data_spec.SerializeToString()
+      elif transform.spec.urn in translations.PAR_DO_URNS:
+        payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+        if payload.side_inputs:

Review comment:
       Is this only for batch, I think in streaming we have no guarantee when 
side input is ready and may run a stage even if it is not?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1973,6 +2006,39 @@ def populate_data_channel_coders(stages, 
pipeline_context):
   return stages
 
 
+def add_impulse_to_dangling_transforms(stages, pipeline_context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+
+  """Populate coders for GRPC input and output ports."""
+  for stage in stages:
+    for transform in stage.transforms:
+      if len(transform.inputs
+             ) == 0 and transform.spec.urn != bundle_processor.DATA_INPUT_URN:
+        print('transform! : ' + transform.spec.urn)

Review comment:
       log debug instead?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -96,6 +108,27 @@
 # (MultiMap / Iterable).
 DataSideInput = Dict[SideInputId, Tuple[bytes, SideInputAccessPattern]]
 
+DataOutput = Dict[str, BufferId]
+
+# A map of [Transform ID, Timer Family ID] to [Buffer ID, Time Domain for 
timer]
+# The time domain comes from beam_runner_api_pb2.TimeDomain. It may be
+# EVENT_TIME or PROCESSING_TIME.
+OutputTimers = MutableMapping[TimerFamilyId, Tuple[BufferId, Any]]
+
+# A map of [Transform ID, Timer Family ID] to [Buffer CONTENTS, Timestamp]
+OutputTimerData = MutableMapping[TimerFamilyId,
+                                 Tuple['PartitionableBuffer',
+                                       timestamp.Timestamp]]
+
+BundleProcessResult = Tuple[beam_fn_api_pb2.InstructionResponse,
+                            List[beam_fn_api_pb2.ProcessBundleSplitResponse]]
+
+
+# TODO(pabloem): Change tha name to a more representative one
+class DataInput(NamedTuple):

Review comment:
       DataAndTimersInput?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}

Review comment:
       redundant?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains 
input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue

Review comment:
       nit: walltime_pending_inputs to differentiate with watermark time?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains 
input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on time advancing. ((stage_name, time), inputs), where
+       the time is the real time point at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+  """
+  class KeyedQueue(Generic[QUEUE_KEY_TYPE]):
+    def __init__(self) -> None:
+      self._q: typing.Deque[Tuple[QUEUE_KEY_TYPE,
+                                  DataInput]] = collections.deque()
+      self._keyed_elements: MutableMapping[QUEUE_KEY_TYPE,
+                                           Tuple[QUEUE_KEY_TYPE,

Review comment:
       Seems that storing the key in value tuple is unnecessary?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains 
input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on time advancing. ((stage_name, time), inputs), where
+       the time is the real time point at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+  """
+  class KeyedQueue(Generic[QUEUE_KEY_TYPE]):
+    def __init__(self) -> None:
+      self._q: typing.Deque[Tuple[QUEUE_KEY_TYPE,
+                                  DataInput]] = collections.deque()
+      self._keyed_elements: MutableMapping[QUEUE_KEY_TYPE,
+                                           Tuple[QUEUE_KEY_TYPE,
+                                                 DataInput]] = {}
+
+    def enque(self, elm: Tuple[QUEUE_KEY_TYPE, DataInput]) -> None:
+      key = elm[0]
+      incoming_inputs: DataInput = elm[1]
+      if key in self._keyed_elements:
+        existing_inputs = self._keyed_elements[key][1]
+        for pcoll in incoming_inputs.data:

Review comment:
       can there be only timers in the incoming inputs? for example a stage 
with a looping timer?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER

Review comment:
       redundant?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -602,6 +747,107 @@ def __init__(self,
         for id in self.pipeline_components.windowing_strategies.keys()
     }
 
+    self._stage_managers: Dict[str, BundleContextManager] = {}
+
+  def bundle_manager_for(
+      self,
+      stage: Stage,
+      num_workers: Optional[int] = None) -> 'BundleContextManager':
+    if stage.name not in self._stage_managers:
+      self._stage_managers[stage.name] = BundleContextManager(
+          self, stage, num_workers or self.num_workers)
+    return self._stage_managers[stage.name]
+
+  def _compute_pipeline_dictionaries(self) -> None:
+    for s in self.stages.values():
+      for t in s.transforms:
+        buffer_id = t.spec.payload
+        if t.spec.urn == bundle_processor.DATA_INPUT_URN:
+          self.input_transform_to_buffer_id[t.unique_name] = buffer_id
+          if t.spec.payload == translations.IMPULSE_BUFFER:
+            # Impulse data is not produced by any PTransform.
+            self.pcollection_to_producer_transform[
+                translations.IMPULSE_BUFFER] = None
+          else:
+            assert t.spec.payload != translations.IMPULSE_BUFFER
+            _, input_pcoll = split_buffer_id(buffer_id)
+            # Adding PCollections that may not have a producer.
+            # This is necessary, for example, for the case where we pass an
+            # empty list of PCollections into a Flatten transform.
+            if input_pcoll not in self.pcollection_to_producer_transform:
+              self.pcollection_to_producer_transform[input_pcoll] = None
+            if buffer_id not in self.buffer_id_to_consumer_pairs:
+              self.buffer_id_to_consumer_pairs[buffer_id] = set()
+            if (s.name, t.unique_name
+                ) not in self.buffer_id_to_consumer_pairs[buffer_id]:
+              self.buffer_id_to_consumer_pairs[buffer_id].add(
+                  (s.name, t.unique_name))
+        elif t.spec.urn == bundle_processor.DATA_OUTPUT_URN:
+          _, output_pcoll = split_buffer_id(buffer_id)
+          self.pcollection_to_producer_transform[output_pcoll] = t.unique_name
+        elif t.spec.urn in translations.PAR_DO_URNS:
+          pass
+
+  def setup(self) -> None:
+    """This sets up the pipeline to begin running.
+
+    1. This function enqueues all initial pipeline bundles to be executed.
+    2. It also updates payload fields on DATA_INPUT and DATA_OUTPUT operations
+      to the Data API endpoints that are live.
+    """
+    for stage in self.stages.values():
+      self._enqueue_stage_initial_inputs(stage)
+
+  def _enqueue_stage_initial_inputs(self, stage: Stage) -> None:
+    """Sets up IMPULSE inputs for a stage, and the data GRPC API endpoint."""
+    data_input = {}  # type: MutableMapping[str, PartitionableBuffer]
+    ready_to_schedule = True
+    for transform in stage.transforms:
+      if (transform.spec.urn in {bundle_processor.DATA_INPUT_URN,
+                                 bundle_processor.DATA_OUTPUT_URN}):
+        if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+          coder_id = self.data_channel_coders[only_element(
+              transform.outputs.values())]
+          coder = self.pipeline_context.coders[self.safe_coders.get(
+              coder_id, coder_id)]
+          if transform.spec.payload == translations.IMPULSE_BUFFER:
+            data_input[transform.unique_name] = ListBuffer(coder.get_impl())
+            data_input[transform.unique_name].append(ENCODED_IMPULSE_VALUE)
+          else:
+            # If this is not an IMPULSE input, then it is not part of the
+            # initial inputs of a pipeline, and we'll ignore it.
+            data_input = {}
+        else:
+          assert transform.spec.urn == bundle_processor.DATA_OUTPUT_URN

Review comment:
       redundant?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##########
@@ -180,16 +195,14 @@ def test_pardo_side_input_dependencies(self):
                 ExpectingSideInputsFn(f'Do{k}'),
                 *[beam.pvalue.AsList(inputs[s]) for s in range(1, k)]))
 
-  @unittest.skip('BEAM-13040')
-  @retry(stop=stop_after_attempt(3))

Review comment:
       I remember adding this retry because fn_runner_test on grpc port is 
flaky on jenkins [BEAM-12603](https://issues.apache.org/jira/browse/BEAM-12603)

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -340,6 +389,87 @@ def from_runner_api_parameter(window_coder_id, context):
         context.coders[window_coder_id.decode('utf-8')])
 
 
+QUEUE_KEY_TYPE = TypeVar('QUEUE_KEY_TYPE')
+
+
+class _ProcessingQueueManager(object):
+  """Manages the queues for ProcessBundle inputs.
+  There are three queues:
+   - ready_inputs(_ProcessingQueueManager.KeyedQueue). This queue contains 
input
+       data that is ready to be processed. These are data such as timers past
+       their trigger time, and data to be processed.
+       The ready_inputs_queue contains tuples of (stage_name, inputs), where
+       inputs are dictionaries mapping PCollection name to data buffers.
+   - watermark_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on the watermark advancing. ((stage_name, watermark), inputs), where
+       the watermark is the watermark at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+   - time_pending_inputs(_ProcessingQueueManager.KeyedQueue). This queue
+       contains input data that is not yet ready to be processed, and is 
blocked
+       on time advancing. ((stage_name, time), inputs), where
+       the time is the real time point at which the inputs should be scheduled,
+       and inputs are dictionaries mapping PCollection name to data buffers.
+  """
+  class KeyedQueue(Generic[QUEUE_KEY_TYPE]):
+    def __init__(self) -> None:
+      self._q: typing.Deque[Tuple[QUEUE_KEY_TYPE,
+                                  DataInput]] = collections.deque()
+      self._keyed_elements: MutableMapping[QUEUE_KEY_TYPE,
+                                           Tuple[QUEUE_KEY_TYPE,
+                                                 DataInput]] = {}
+
+    def enque(self, elm: Tuple[QUEUE_KEY_TYPE, DataInput]) -> None:
+      key = elm[0]
+      incoming_inputs: DataInput = elm[1]
+      if key in self._keyed_elements:
+        existing_inputs = self._keyed_elements[key][1]
+        for pcoll in incoming_inputs.data:
+          if incoming_inputs.data[pcoll] and pcoll in existing_inputs.data:

Review comment:
       Instead of using DataInput as NamedTuple it seems that you could just 
implement the class with defaultdict that automatically construct 
PartitionBuffer to simplify the logic here and below.

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -363,68 +364,164 @@ def run_stages(self,
             self.NUM_FUSED_STAGES_COUNTER,
             urn='internal:' + self.NUM_FUSED_STAGES_COUNTER)).update(
                 len(stages))
-    monitoring_infos_by_stage = {}
+    monitoring_infos_by_stage: MutableMapping[
+        str, Iterable['metrics_pb2.MonitoringInfo']] = {}
 
     runner_execution_context = execution.FnApiRunnerExecutionContext(
         stages,
         worker_handler_manager,
         stage_context.components,
         stage_context.safe_coders,
-        stage_context.data_channel_coders)
+        stage_context.data_channel_coders,
+        self._num_workers)
 
     try:
       with self.maybe_profile():
-        for stage in stages:
-          bundle_context_manager = execution.BundleContextManager(
-              runner_execution_context, stage, self._num_workers)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong watermark for %s. Expected %s, but got %s.' % (
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name),
-                  timestamp.MAX_TIMESTAMP,
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name
-                  ).input_watermark()
-              )
-          )
-
-          stage_results = self._run_stage(
-              runner_execution_context, bundle_context_manager)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong input watermark for %s. Expected %s, but got %s.' % (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name),
-              timestamp.MAX_TIMESTAMP,
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).output_watermark())
-          )
-
-          monitoring_infos_by_stage[stage.name] = (
-              list(stage_results.process_bundle.monitoring_infos))
-
-      monitoring_infos_by_stage[''] = list(
-          pipeline_metrics.to_runner_api_monitoring_infos('').values())
+        # Initialize Runner context:
+        # - Pipeline dictionaries, initial inputs and pipeline triggers
+        # - Replace Data API endpoints in protobufs.
+        runner_execution_context.setup()
+
+        bundle_counter = 0
+        # Start executing all ready bundles.
+        while len(runner_execution_context.queues.ready_inputs) > 0:

Review comment:
       can this break prematurely if the time_pending_inputs are not fully 
scheduled?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -363,68 +364,164 @@ def run_stages(self,
             self.NUM_FUSED_STAGES_COUNTER,
             urn='internal:' + self.NUM_FUSED_STAGES_COUNTER)).update(
                 len(stages))
-    monitoring_infos_by_stage = {}
+    monitoring_infos_by_stage: MutableMapping[
+        str, Iterable['metrics_pb2.MonitoringInfo']] = {}
 
     runner_execution_context = execution.FnApiRunnerExecutionContext(
         stages,
         worker_handler_manager,
         stage_context.components,
         stage_context.safe_coders,
-        stage_context.data_channel_coders)
+        stage_context.data_channel_coders,
+        self._num_workers)
 
     try:
       with self.maybe_profile():
-        for stage in stages:
-          bundle_context_manager = execution.BundleContextManager(
-              runner_execution_context, stage, self._num_workers)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong watermark for %s. Expected %s, but got %s.' % (
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name),
-                  timestamp.MAX_TIMESTAMP,
-                  runner_execution_context.watermark_manager.get_stage_node(
-                      bundle_context_manager.stage.name
-                  ).input_watermark()
-              )
-          )
-
-          stage_results = self._run_stage(
-              runner_execution_context, bundle_context_manager)
-
-          assert (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).input_watermark() == timestamp.MAX_TIMESTAMP), (
-              'wrong input watermark for %s. Expected %s, but got %s.' % (
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name),
-              timestamp.MAX_TIMESTAMP,
-              runner_execution_context.watermark_manager.get_stage_node(
-                  bundle_context_manager.stage.name
-              ).output_watermark())
-          )
-
-          monitoring_infos_by_stage[stage.name] = (
-              list(stage_results.process_bundle.monitoring_infos))
-
-      monitoring_infos_by_stage[''] = list(
-          pipeline_metrics.to_runner_api_monitoring_infos('').values())
+        # Initialize Runner context:
+        # - Pipeline dictionaries, initial inputs and pipeline triggers
+        # - Replace Data API endpoints in protobufs.
+        runner_execution_context.setup()
+
+        bundle_counter = 0
+        # Start executing all ready bundles.
+        while len(runner_execution_context.queues.ready_inputs) > 0:
+          _LOGGER.debug(
+              "Remaining ready bundles: %s\n"
+              "\tWatermark pending bunbles: %s\n"
+              "\tTime pending bunbles: %s",
+              len(runner_execution_context.queues.ready_inputs),
+              len(runner_execution_context.queues.watermark_pending_inputs),
+              len(runner_execution_context.queues.time_pending_inputs))
+          consuming_stage_name, bundle_input = (
+              runner_execution_context.queues.ready_inputs.deque())
+          stage = runner_execution_context.stages[consuming_stage_name]
+          bundle_context_manager = runner_execution_context.bundle_manager_for(
+              stage)
+          _BUNDLE_LOGGER.debug(
+              'Running bundle for stage %s\n\tExpected outputs: %s timers: %s',
+              bundle_context_manager.stage.name,
+              bundle_context_manager.stage_data_outputs,
+              bundle_context_manager.stage_timer_outputs)
+          assert consuming_stage_name == bundle_context_manager.stage.name
+
+          bundle_counter += 1
+          bundle_results = self._execute_bundle(
+              runner_execution_context, bundle_context_manager, bundle_input)
+
+          if consuming_stage_name in monitoring_infos_by_stage:
+            monitoring_infos_by_stage[
+                consuming_stage_name] = consolidate_monitoring_infos(
+                    itertools.chain(
+                        bundle_results.process_bundle.monitoring_infos,
+                        monitoring_infos_by_stage[consuming_stage_name]))
+          else:
+            assert isinstance(
+                bundle_results.process_bundle.monitoring_infos, Iterable)
+            monitoring_infos_by_stage[consuming_stage_name] = \
+              bundle_results.process_bundle.monitoring_infos
+
+          if '' not in monitoring_infos_by_stage:

Review comment:
       what is this used for?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to