[ 
https://issues.apache.org/jira/browse/BEAM-8823?focusedWorklogId=662127&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-662127
 ]

ASF GitHub Bot logged work on BEAM-8823:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Oct/21 23:18
            Start Date: 07/Oct/21 23:18
    Worklog Time Spent: 10m 
      Work Description: y1chi commented on a change in pull request #15441:
URL: https://github.com/apache/beam/pull/15441#discussion_r723703721



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -471,9 +559,17 @@ def _collect_written_timers(
             timer_watermark_data[(transform_id, timer_family_id)] = min(
                 timer_watermark_data[(transform_id, timer_family_id)],
                 decoded_timer.hold_timestamp)
-        newly_set_timers[(transform_id, timer_family_id)] = ListBuffer(
-            coder_impl=timer_coder_impl)
-        newly_set_timers[(transform_id, timer_family_id)].append(out.get())
+          else:
+            # Timer was cleared, so we must skip setting it below.
+            timer_cleared = True
+            continue
+        if timer_cleared or (transform_id,

Review comment:
       and if a timer is cleared for certain key and window we ignore all the 
other set timers for the timer family in the bundle, am I misunderstanding the 
condition here?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -471,9 +559,17 @@ def _collect_written_timers(
             timer_watermark_data[(transform_id, timer_family_id)] = min(
                 timer_watermark_data[(transform_id, timer_family_id)],
                 decoded_timer.hold_timestamp)
-        newly_set_timers[(transform_id, timer_family_id)] = ListBuffer(
-            coder_impl=timer_coder_impl)
-        newly_set_timers[(transform_id, timer_family_id)].append(out.get())
+          else:
+            # Timer was cleared, so we must skip setting it below.
+            timer_cleared = True
+            continue
+        if timer_cleared or (transform_id,

Review comment:
       after the loop starting at line 551, the timer_cleared is set to True as 
long as one (key, window) has a clear timer, and all other (key, window) timers 
are skipped and not append to newly_set_timers because timer_cleared is True 
and the continue jumps to next iteration of loop starting at line 537. Isn't it?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -584,100 +679,164 @@ def 
_add_residuals_and_channel_splits_to_deferred_inputs(
             channel_split.transform_id] = channel_split.last_primary_element
     return pcolls_with_delayed_apps, transforms_with_channel_splits
 
-  def _run_stage(self,
+  def _execute_bundle(self,
                  runner_execution_context,  # type: 
execution.FnApiRunnerExecutionContext
                  bundle_context_manager,  # type: 
execution.BundleContextManager
-                ):
-    # type: (...) -> beam_fn_api_pb2.InstructionResponse
-
-    """Run an individual stage.
+                 bundle_input: DataInput
+                ) -> beam_fn_api_pb2.InstructionResponse:
+    """Execute a bundle end-to-end.
 
     Args:
       runner_execution_context (execution.FnApiRunnerExecutionContext): An
         object containing execution information for the pipeline.
       bundle_context_manager (execution.BundleContextManager): A description of
         the stage to execute, and its context.
+      bundle_input: The set of buffers to input into this bundle
     """
-    data_input, data_output, expected_timer_output = (
-        bundle_context_manager.extract_bundle_inputs_and_outputs())
-    input_timers = {
-    }  # type: Mapping[Tuple[str, str], execution.PartitionableBuffer]
-
     worker_handler_manager = runner_execution_context.worker_handler_manager
-    _LOGGER.info('Running %s', bundle_context_manager.stage.name)
+
+    # TODO(pabloem): Should move this to be done once per stage
     worker_handler_manager.register_process_bundle_descriptor(
         bundle_context_manager.process_bundle_descriptor)
 
-    # We create the bundle manager here, as it can be reused for bundles of the
-    # same stage, but it may have to be created by-bundle later on.
+    # We create the bundle manager here, as it can be reused for bundles of
+    # the same stage, but it may have to be created by-bundle later on.
+    bundle_manager = self._get_bundle_manager(bundle_context_manager)
+
+    last_result, deferred_inputs, newly_set_timers, watermark_updates = (
+        self._run_bundle(
+            runner_execution_context,
+            bundle_context_manager,
+            bundle_input,
+            bundle_context_manager.stage_data_outputs,
+            bundle_context_manager.stage_timer_outputs,
+            bundle_manager))
+
+    for pc_name, watermark in watermark_updates.items():
+      runner_execution_context.watermark_manager.set_pcoll_watermark(
+          pc_name, watermark)
+
+    if deferred_inputs:
+      assert (runner_execution_context.watermark_manager.get_stage_node(
+          bundle_context_manager.stage.name).output_watermark()
+              < timestamp.MAX_TIMESTAMP), (
+          'wrong timestamp for %s. '
+          % runner_execution_context.watermark_manager.get_stage_node(
+          bundle_context_manager.stage.name))
+      runner_execution_context.queues.ready_inputs.enque(
+          (bundle_context_manager.stage.name, DataInput(deferred_inputs, {})))
+
+    self._enqueue_set_timers(
+        runner_execution_context,
+        bundle_context_manager,
+        newly_set_timers,
+        bundle_input)
+
+    # Store the required downstream side inputs into state so it is accessible
+    # for the worker when it runs bundles that consume this stage's output.
+    data_side_input = (
+        runner_execution_context.side_input_descriptors_by_stage.get(
+            bundle_context_manager.stage.name, {}))
+    runner_execution_context.commit_side_inputs_to_state(data_side_input)
+
+    buffers_to_clean = set()
+    known_consumers = set()
+    for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+      for (consuming_stage_name, consuming_transform) in \
+          runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
+                                                                   []):
+        buffer = runner_execution_context.pcoll_buffers.get(
+            buffer_id, ListBuffer(None))
+
+        if (buffer_id in runner_execution_context.pcoll_buffers and
+            buffer_id not in buffers_to_clean):
+          buffers_to_clean.add(buffer_id)
+        elif buffer and buffer_id in buffers_to_clean:
+          # If the buffer_id has already been added to buffers_to_clean, this
+          # means that the buffer is being consumed by two separate stages,
+          # so we create a copy of the buffer for every new stage.
+          runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
+          buffer = runner_execution_context.pcoll_buffers[buffer_id]

Review comment:
       how is `runner_execution_context.pcoll_buffers[buffer_id] = 
buffer.copy()` creaing copy for every stage, isn't it just overriding the 
original buffer with it's own copy?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 662127)
    Time Spent: 7.5h  (was: 7h 20m)

> Make FnApiRunner work by executing ready elements instead of stages
> -------------------------------------------------------------------
>
>                 Key: BEAM-8823
>                 URL: https://issues.apache.org/jira/browse/BEAM-8823
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Priority: P3
>          Time Spent: 7.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to