robertwb commented on a change in pull request #11229: [BEAM-9608] Increasing 
scope of context managers for FnApiRunner
URL: https://github.com/apache/beam/pull/11229#discussion_r399556279
 
 

 ##########
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
 ##########
 @@ -245,37 +254,106 @@ class FnApiRunnerExecutionContext(object):
        ``beam.PCollection``.
  """
   def __init__(self,
-      worker_handler_factory,  # type: Callable[[Optional[str], int], 
List[WorkerHandler]]
+      worker_handler_manager,  # type: worker_handlers.WorkerHandlerManager
       pipeline_components,  # type: beam_runner_api_pb2.Components
       safe_coders,
       data_channel_coders,
                ):
     """
-    :param worker_handler_factory: A ``callable`` that takes in an environment
+    :param worker_handler_manager: A ``callable`` that takes in an environment
         id and a number of workers, and returns a list of ``WorkerHandler``s.
     :param pipeline_components:  (beam_runner_api_pb2.Components): TODO
     :param safe_coders:
     :param data_channel_coders:
     """
     self.pcoll_buffers = {}  # type: MutableMapping[bytes, PartitionableBuffer]
-    self.worker_handler_factory = worker_handler_factory
+    self.worker_handler_manager = worker_handler_manager
     self.pipeline_components = pipeline_components
     self.safe_coders = safe_coders
     self.data_channel_coders = data_channel_coders
 
+    self.pipeline_context = pipeline_context.PipelineContext(
+        self.pipeline_components,
+        iterable_state_write=self._iterable_state_write)
+    self._last_uid = -1
+
+  def next_uid(self):
+    self._last_uid += 1
+    return str(self._last_uid)
+
+  def _iterable_state_write(self, values, element_coder_impl):
+    # type: (...) -> bytes
+    token = unique_name(None, 'iter').encode('ascii')
+    out = create_OutputStream()
+    for element in values:
+      element_coder_impl.encode_to_stream(element, out, True)
+    self.worker_handler_manager.state_servicer.append_raw(
+        beam_fn_api_pb2.StateKey(
+            runner=beam_fn_api_pb2.StateKey.Runner(key=token)),
+        out.get())
+    return token
+
 
 class BundleContextManager(object):
 
   def __init__(self,
-      execution_context, # type: FnApiRunnerExecutionContext
-      process_bundle_descriptor,  # type: 
beam_fn_api_pb2.ProcessBundleDescriptor
-      worker_handler,  # type: fn_runner.WorkerHandler
-      p_context,  # type: pipeline_context.PipelineContext
-               ):
+               execution_context, # type: FnApiRunnerExecutionContext
+               stage,  # type: translations.Stage
+               num_workers,  # type: int
+              ):
     self.execution_context = execution_context
-    self.process_bundle_descriptor = process_bundle_descriptor
-    self.worker_handler = worker_handler
-    self.pipeline_context = p_context
+    self.stage = stage
+    self.bundle_uid = self.execution_context.next_uid()
+    self.num_workers = num_workers
+
+    # Properties that are lazily initialized
+    self._process_bundle_descriptor = None
+    self._worker_handlers = None
+
+  @property
+  def worker_handlers(self):
+    if self._worker_handlers is None:
+      self._worker_handlers = self.execution_context.worker_handler_manager\
 
 Review comment:
   Prefer ()'s to backslashes for line breaks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to