lukecwik commented on a change in pull request #12934:
URL: https://github.com/apache/beam/pull/12934#discussion_r494680493



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -894,7 +897,10 @@ def process_bundle(self,
       finalize_request = beam_fn_api_pb2.InstructionRequest(
           finalize_bundle=beam_fn_api_pb2.FinalizeBundleRequest(
               instruction_id=process_bundle_id))
-      self._worker_handler.control_conn.push(finalize_request)
+      finalize_response = self._worker_handler.control_conn.push(
+          finalize_request).get()

Review comment:
       this .get() was necessary to prevent a race condition where the pipeline 
could shutdown before finalization happened.

##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -286,22 +294,9 @@ def _request_process_bundle_progress(self, request):
 
   def _request_process_bundle_action(self, request):
     # type: (beam_fn_api_pb2.InstructionRequest) -> None
-
     def task():
-      instruction_id = getattr(

Review comment:
       This logic is now handled by the BundleProcessorCache




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to