shunping commented on code in PR #38620:
URL: https://github.com/apache/beam/pull/38620#discussion_r3305268249
##########
sdks/python/apache_beam/utils/subprocess_server.py:
##########
@@ -108,14 +108,22 @@ def purge(self, owner):
for value in to_delete:
self._destructor(value)
- def get(self, *key):
+ def get(self, *key, owner=None):
if not self._live_owners:
raise RuntimeError("At least one owner must be registered.")
with self._lock:
if key not in self._cache:
self._cache[key] = _SharedCacheEntry(self._constructor(*key), set())
- for owner in self._live_owners:
+ if owner is not None:
+ if owner not in self._live_owners:
+ raise RuntimeError("The requesting owner must be registered.")
self._cache[key].owners.add(owner)
+ for live_owner, is_context in self._live_owners.items():
+ if is_context:
+ self._cache[key].owners.add(live_owner)
+ else:
+ for live_owner in self._live_owners:
+ self._cache[key].owners.add(live_owner)
return self._cache[key].obj
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]