lukecwik commented on code in PR #23391:
URL: https://github.com/apache/beam/pull/23391#discussion_r981883845


##########
sdks/python/apache_beam/runners/worker/statecache.py:
##########
@@ -69,24 +87,100 @@ def get_referents_for_cache(self):
     raise NotImplementedError()
 
 
-def get_referents_for_cache(*objs):
+def _safe_isinstance(obj, type):
+  # type: (Any, Union[type, Tuple[type, ...]]) -> bool
+
+  """
+  Return whether an object is an instance of a class or of a subclass thereof.
+  See `isinstance()` for more information.
+
+  Returns false on `isinstance()` failure. For example applying `isinstance()`
+  on `weakref.proxy` objects attempts to dereference the proxy objects, which
+  may yield an exception. See https://github.com/apache/beam/issues/23389 for
+  additional details.
+  """
+  try:
+    return isinstance(obj, type)
+  except Exception:
+    return False
+
+
+def _size_func(obj):
+  # type: (Any) -> int
+
+  """
+  Returns the size of the object or a default size if an error occurred during
+  sizing.
+  """
+  try:
+    return sys.getsizeof(obj)
+  except Exception as e:
+    current_time = time.time()
+    # Limit outputting this log so we don't spam the logs on these
+    # occurrences.
+    if _size_func.last_log_time + 300 < current_time:  # type: ignore
+      _LOGGER.warning(
+          'Failed to size %s of type %s. Note that this may '
+          'impact cache sizing such that the cache is over '
+          'utilized which may lead to out of memory errors.',
+          obj,
+          type(obj),
+          exc_info=e)
+      _size_func.last_log_time = current_time  # type: ignore
+    # Use an arbitrary default size that would account for some of the object
+    # overhead.
+    return _DEFAULT_WEIGHT
+
+
+_size_func.last_log_time = 0  # type: ignore
+
+
+def _get_referents_func(*objs):
   # type: (List[Any]) -> List[Any]
 
   """Returns the list of objects accounted during cache measurement.
 
-  Users can inherit CacheAware to override which referrents should be
+  Users can inherit CacheAware to override which referents should be
   used when measuring the deep size of the object. The default is to
   use gc.get_referents(*objs).
   """
   rval = []
   for obj in objs:
-    if isinstance(obj, CacheAware):
-      rval.extend(obj.get_referents_for_cache())
+    if _safe_isinstance(obj, CacheAware):
+      rval.extend(obj.get_referents_for_cache())  # type: ignore
     else:
       rval.extend(gc.get_referents(obj))
   return rval
 
 
+def _filter_func(o):
+  # type: (Any) -> bool
+
+  """
+  Filter out specific types from being measured.
+
+  Note that we do want to measure the cost of weak references as they will only
+  stay in scope as long as other code references them and will effectively be
+  garbage collected as soon as there isn't a strong reference anymore.
+
+  Note that we cannot use the default filter function due to isinstance raising
+  an error on weakref.proxy types. See
+  https://github.com/liran-funaro/objsize/issues/6 for additional details.

Review Comment:
   It does for most users that rely on the defaults but I made a few 
adaptations based upon that solution to close a few gaps:
   1) catch all exceptions in _safe_isinstance
   2) sys.getsizeof still can raise an error for certain types (found this on 
certain Pandas types) within Google
   3) not measure weak references since the only reason they will survive in 
the cache if the rest of the system already has a strong reference to the 
object making the cache _copy_ effectively free.
   
   Note that I tested this change within Google and the tests passed so we have 
pretty good coverage that we don't cause pipelines to fail with this update.
   
   Upgrading to `0.6.0` won't provide any benefits since we override all the 
methods that `get_deep_size` relied on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to