gemini-code-assist[bot] commented on code in PR #37112:
URL: https://github.com/apache/beam/pull/37112#discussion_r2687633603
##########
sdks/python/apache_beam/utils/multi_process_shared_test.py:
##########
@@ -242,6 +281,185 @@ def test_release_always_proxy(self):
with self.assertRaisesRegex(Exception, 'released'):
counter1.get()
+ def test_proxy_on_proxy(self):
+ shared1 = multi_process_shared.MultiProcessShared(
+ SimpleClass, tag='proxy_on_proxy_main', always_proxy=True)
+ instance = shared1.acquire()
+ proxy_instance = instance.make_proxy()
+ self.assertEqual(proxy_instance.increment(), 1)
+
+
+class MultiProcessSharedSpawnProcessTest(unittest.TestCase):
+ def setUp(self):
+ tempdir = tempfile.gettempdir()
+ for tag in ['basic',
+ 'proxy_on_proxy',
+ 'proxy_on_proxy_main',
+ 'main',
+ 'to_delete',
+ 'mix1',
+ 'mix2'
+ 'test_process_exit']:
Review Comment:

There's a missing comma after `'mix2'`. In Python, adjacent string literals
are automatically concatenated, resulting in a single string
`'mix2test_process_exit'` instead of two separate tags. This will cause the
file cleanup logic in `setUp` to not work as intended for the
`'test_process_exit'` tag.
```suggestion
'mix2',
'test_process_exit']:
```
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -315,25 +437,102 @@ def unsafe_hard_delete(self):
to this object exist, or (b) you are ok with all existing references to
this object throwing strange errors when derefrenced.
"""
- self._get_manager().unsafe_hard_delete_singleton(self._tag)
+ try:
+ self._get_manager().unsafe_hard_delete_singleton(self._tag)
+ except (EOFError, ConnectionResetError, BrokenPipeError):
+ pass
+ except Exception as e:
+ logging.warning(
+ "Exception %s when trying to hard delete shared object %s",
+ e,
+ self._tag)
def _create_server(self, address_file):
- # We need to be able to authenticate with both the manager and the process.
- self._serving_manager = _SingletonRegistrar(
- address=('localhost', 0), authkey=AUTH_KEY)
- multiprocessing.current_process().authkey = AUTH_KEY
- # Initialize eagerly to avoid acting as the server if there are issues.
- # Note, however, that _create_server itself is called lazily.
- _process_level_singleton_manager.register_singleton(
- self._constructor, self._tag, initialize_eagerly=True)
- self._server = self._serving_manager.get_server()
- logging.info(
- 'Starting proxy server at %s for shared %s',
- self._server.address,
- self._tag)
- with open(address_file + '.tmp', 'w') as fout:
- fout.write('%s:%d' % self._server.address)
- os.rename(address_file + '.tmp', address_file)
- t = threading.Thread(target=self._server.serve_forever, daemon=True)
- t.start()
- logging.info('Done starting server')
+ if self._spawn_process:
+ error_file = address_file + ".error"
+
+ if os.path.exists(error_file):
+ try:
+ os.remove(error_file)
+ except OSError:
+ pass
+
+ ctx = multiprocessing.get_context('spawn')
+ p = ctx.Process(
+ target=_run_server_process,
+ args=(address_file, self._tag, self._constructor, AUTH_KEY),
+ daemon=False # Must be False for nested proxies
+ )
+ p.start()
+ logging.info("Parent: Waiting for %s to write address file...",
self._tag)
+
+ def cleanup_process():
+ if p.is_alive():
+ logging.info(
+ "Parent: Terminating server process %s for %s", p.pid, self._tag)
+ p.terminate()
+ p.join()
+ try:
+ if os.path.exists(address_file):
+ os.remove(address_file)
+ if os.path.exists(error_file):
+ os.remove(error_file)
+ except Exception:
+ pass
Review Comment:

Similar to another comment, silently ignoring all exceptions here can hide
problems during file cleanup in the `atexit` handler. Logging a warning would
provide better visibility into potential issues.
```suggestion
except Exception as e:
logging.warning(
'Failed to cleanup files for tag %s in atexit handler: %s',
self._tag,
e)
```
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
def __getattr__(self, name):
return getattr(self._proxyObject, name)
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __getstate__(self):
+ return self.__dict__
+
def get_auto_proxy_object(self):
return self._proxyObject
+ def unsafe_hard_delete(self):
+ try:
+ self._proxyObject.unsafe_hard_delete()
+ except (EOFError, ConnectionResetError, BrokenPipeError):
+ pass
+ except Exception as e:
+ logging.warning(
+ "Exception %s when trying to hard delete shared object proxy", e)
+
+
+def _run_server_process(address_file, tag, constructor, authkey):
+ """
+ Runs in a separate process.
+ Includes a 'Suicide Pact' monitor: If parent dies, I die.
+ """
+ parent_pid = os.getppid()
+
+ def cleanup_files():
+ logging.info("Server process exiting. Deleting files for %s", tag)
+ try:
+ if os.path.exists(address_file):
+ os.remove(address_file)
+ if os.path.exists(address_file + ".error"):
+ os.remove(address_file + ".error")
+ except Exception:
+ pass
Review Comment:

Silently ignoring all exceptions with a bare `except Exception: pass` can
hide underlying issues during file cleanup, such as permission errors. It would
be more robust to log these exceptions as warnings to aid in debugging.
```suggestion
except Exception as e:
logging.warning('Failed to cleanup files for tag %s: %s', tag, e)
```
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -160,7 +185,8 @@ def release_singleton(self, tag, obj):
return self.entries[tag].release(obj)
def unsafe_hard_delete_singleton(self, tag):
- return self.entries[tag].unsafe_hard_delete()
+ self.entries[tag].unsafe_hard_delete()
+ self._hard_delete_callback()
Review Comment:

The call to `self.entries[tag].unsafe_hard_delete()` on the preceding line
will trigger `_SingletonEntry.unsafe_hard_delete()`. In the `spawn_process`
case, this entry's `_hard_delete_callback` is `handle_unsafe_hard_delete`,
which calls `os._exit(0)`. Therefore, the process will exit, and this call to
`self._hard_delete_callback()` will never be reached. This line appears to be
redundant and can be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]