tvalentyn commented on code in PR #32571:
URL: https://github.com/apache/beam/pull/32571#discussion_r1777750310
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2594,11 +2594,23 @@ def __getattribute__(self, name):
def process(self, *args, **kwargs):
if self._pool is None:
self._pool = concurrent.futures.ThreadPoolExecutor(10)
+
+ # Import here to avoid circular dependency
+ from apache_beam.runners.worker.statesampler import get_current_tracker,
set_current_tracker
+
+ # State sampler/tracker is stored as a thread local variable, and is used
+ # when incrementing counter metrics.
+ dispatching_thread_state_sampler = get_current_tracker()
+
+ def wrapped_process():
Review Comment:
Have you considered an idea where wrapped_process would be returning a tuple:
(_fn_process_output_list,
modifications_to_global_state_that_can_be_subsequently_reapplied_onto_the_parent_thread)
?
There are questions how to capture the modifications of global state, and
what what the exact modifications we will be watching for.
Something that comes to mind is a possibility of temporarily using stubs for
objects/methods. For example unittest.mock often employs this technique to
stub-out portions of the codebase that are not under test. In our scenario, we
could consider setting stubs for elements of the global state, and stubs would
instead collect the necessary updates, that we can reapply on the main thread
or process after the subprocess call finishes.
In this scenario, we might be able to pass the information also across the
process boundary, not just the thread boundary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]