tvalentyn commented on code in PR #32571:
URL: https://github.com/apache/beam/pull/32571#discussion_r1777750310


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2594,11 +2594,23 @@ def __getattribute__(self, name):
   def process(self, *args, **kwargs):
     if self._pool is None:
       self._pool = concurrent.futures.ThreadPoolExecutor(10)
+
+    # Import here to avoid circular dependency
+    from apache_beam.runners.worker.statesampler import get_current_tracker, 
set_current_tracker
+
+    # State sampler/tracker is stored as a thread local variable, and is used
+    # when incrementing counter metrics.
+    dispatching_thread_state_sampler = get_current_tracker()
+
+    def wrapped_process():

Review Comment:
   Have you considered an idea where wrapped_process would be returning a tuple:
   
        (_fn_process_output_list, 
modifications_to_global_state_that_can_be_subsequently_reapplied_onto_the_parent_thread)
 ? 
   
   There are questions how to capture the modifications of global state, and 
what what the exact modifications we will be watching for. 
   
   Something that comes to mind is a possibility of temporarily using stubs for 
objects/methods. For example unittest.mock often employs this technique to 
mock-out portions of the codebase that are not under test. In our scenario, we 
could consider  setting stubs for elements of the global state, and stubs would 
instead collect the necessary updates, that we can reapply on the main thread 
or process after the subprocess call finishes. 
   
   In this scenario, we might be able to pass the information also across the 
process boundary, not just the thread boundary. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to