damccorm commented on code in PR #37112:
URL: https://github.com/apache/beam/pull/37112#discussion_r2729419081
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
def __getattr__(self, name):
return getattr(self._proxyObject, name)
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __getstate__(self):
+ return self.__dict__
+
def get_auto_proxy_object(self):
return self._proxyObject
+ def unsafe_hard_delete(self):
+ try:
+ self._proxyObject.unsafe_hard_delete()
+ except (EOFError, ConnectionResetError, BrokenPipeError):
+ pass
+ except Exception as e:
+ logging.warning(
+ "Exception %s when trying to hard delete shared object proxy", e)
+
+
+def _run_server_process(address_file, tag, constructor, authkey):
+ """
+ Runs in a separate process.
+ Includes a 'Suicide Pact' monitor: If parent dies, I die.
+ """
+ parent_pid = os.getppid()
+
+ def cleanup_files():
+ logging.info("Server process exiting. Deleting files for %s", tag)
+ try:
+ if os.path.exists(address_file):
+ os.remove(address_file)
+ if os.path.exists(address_file + ".error"):
+ os.remove(address_file + ".error")
+ except Exception:
+ pass
+
+ def handle_unsafe_hard_delete():
+ cleanup_files()
+ os._exit(0)
+
+ def _monitor_parent():
+ """Checks if parent is alive every second."""
+ while True:
+ try:
+ os.kill(parent_pid, 0)
Review Comment:
Got it, thanks
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
def __getattr__(self, name):
return getattr(self._proxyObject, name)
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __getstate__(self):
+ return self.__dict__
Review Comment:
Is there any more info in the stack trace? I don't really understand why
this would recurse like this, and it is a bit concerning (e.g. could this
happen for other methods?)
I guess instead of pickling the underlying proxied element, we're now
pickling the proxy; that's probably better if anything, so I'm not really
worried about the change, but it is definitely surprising.
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
def __getattr__(self, name):
return getattr(self._proxyObject, name)
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __getstate__(self):
+ return self.__dict__
Review Comment:
Pickling the proxy is just a quite odd pattern IMO, and I worry we'll run
into unexpected issues from doing that.
##########
sdks/python/apache_beam/utils/multi_process_shared.py:
##########
@@ -200,9 +226,99 @@ def __call__(self, *args, **kwargs):
def __getattr__(self, name):
return getattr(self._proxyObject, name)
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ def __getstate__(self):
+ return self.__dict__
Review Comment:
I'm actually also wondering if:
```
proxy_instance = instance.make_proxy()
```
is the right pattern. That's creating a proxy object in the
multiprocess_shared instance, then pickling that object and sending it back to
the caller. Would it be better to send back the tag, and do something like:
```
proxy_instance_tag = instance.make_proxy() # returns tag instead of object
proxy_instance = return multi_process_shared.MultiProcessShared(Constructor,
proxy_instance_tag)
```
At that point, the 2nd process has created the proxy_instance already, but
the caller can now talk directly to the proxy_instance without us needing to
pickle the proxy at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]