tvalentyn commented on code in PR #35656: URL: https://github.com/apache/beam/pull/35656#discussion_r2249189733
########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,391 @@ # limitations under the License. # +import collections +import hashlib +import inspect +import re +import sys +import types + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" return path + + +def _get_code_path(callable): + """Returns the stable reference to the code object. + + Args: + callable: The callable object to search for. + + Returns: + The stable reference to the code object. + """ + code_path = _extend_path( + callable.__module__, + _search( + callable, + sys.modules[callable.__module__], + callable.__qualname__.split('.'), + ), + ) + return code_path + + +def _extend_path(prefix, suffix): + """Extends the path to the code object. + + Args: + prefix: The prefix of the path. + suffix: The rest of the path. + + Returns: + The extended path. + """ + if suffix is None: + return None + if not suffix: Review Comment: can prefix be None or empty ? ########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,391 @@ # limitations under the License. # +import collections +import hashlib +import inspect +import re +import sys +import types + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" return path + + +def _get_code_path(callable): + """Returns the stable reference to the code object. + + Args: + callable: The callable object to search for. + + Returns: + The stable reference to the code object. + """ + code_path = _extend_path( + callable.__module__, + _search( + callable, + sys.modules[callable.__module__], + callable.__qualname__.split('.'), + ), + ) + return code_path + + +def _extend_path(prefix, suffix): + """Extends the path to the code object. + + Args: + prefix: The prefix of the path. + suffix: The rest of the path. + + Returns: + The extended path. + """ + if suffix is None: + return None + if not suffix: + return prefix + return prefix + '.' + suffix + + +def _search(callable, node, qual_name_parts): + """Searches an object to create a stable reference code path. + + Recursively searches the tree of objects starting from node and looking for + callable and returns a string to uniquely identify the path from node to the + callable. + + Args: + callable: The callable object to search for. + node: The object to search within. + qual_name_parts: A list of strings representing the qualified name. + + Returns: + The stable reference to the code object, or None if not found. + """ + if node is None: + return None + if not qual_name_parts: + if hasattr(node, '__code__') and node.__code__ == callable.__code__: + return '__code__' + else: + return None + if inspect.ismodule(node) or inspect.isclass(node): + return _search_module_or_class(callable, node, qual_name_parts) + elif inspect.isfunction(node): + return _search_function(callable, node, qual_name_parts) + elif inspect.iscode(node): + return _search_code(callable, node, qual_name_parts) + + +def _search_module_or_class(callable, node, qual_name_parts): + """Searches a module or class to create a stable reference code path. + + Args: + callable: The callable object to search for. + node: The module or class to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + # Functions/methods have a name that is unique within a given module or class + # so the traversal can directly lookup function object identified by the name. + # Lambdas don't have a name so we need to search all the attributes of the + # node. + first_part = qual_name_parts[0] + rest = qual_name_parts[1:] + if first_part == '<lambda>': + for name in dir(node): + value = getattr(node, name) + if (isinstance(value, type(callable)) and + value.__code__ == callable.__code__): + return name + '.__code__' + elif (isinstance(value, types.FunctionType) and + value.__defaults__ is not None): + # Python functions can have other functions as default parameters which + # might contain the code object so we have to search them. + for i, default_param_value in enumerate(value.__defaults__): + path = _search(callable, default_param_value, rest) + if path is not None: + return _extend_path(name, _extend_path(f'__defaults__[{i}]', path)) + else: + return _extend_path( + first_part, _search(callable, getattr(node, first_part), rest)) + + +def _search_function(callable, node, qual_name_parts): + """Searches a function to create a stable reference code path. + + Args: + callable: The callable object to search for. + node: The function to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + first_part = qual_name_parts[0] + if node.__code__ == callable.__code__: + if len(qual_name_parts) > 1: + raise ValueError('Qual name parts too long') + return '__code__' + # If first part is '<locals>' then the code object is in a local variable + # so we should add __code__ to the path to indicate that we are entering + # the code object of the function. + if first_part == '<locals>': + return _extend_path( + '__code__', _search(callable, node.__code__, qual_name_parts)) + + +def _search_code(callable, node, qual_name_parts): + """Searches a code object to create a stable reference code path. + + Args: + callable: The callable to search for. + node: The code object to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + + Raises: + ValueError: If the qual name parts are too long. + """ + first_part = qual_name_parts[0] + rest = qual_name_parts[1:] + if node == callable.__code__: + if len(qual_name_parts) > 1: + raise ValueError('Qual name parts too long') + return '' + elif first_part == '<locals>': + code_objects_by_name = collections.defaultdict(list) + for co_const in node.co_consts: + if inspect.iscode(co_const): + code_objects_by_name[co_const.co_name].append(co_const) + num_lambdas = len(code_objects_by_name.get('<lambda>', [])) + # If there is only one lambda, we can use the default path + # 'co_consts[<lambda>]'. This is the most common case and it is + # faster than calculating the signature and the hash. + if num_lambdas == 1: + path = _search(callable, code_objects_by_name['<lambda>'][0], rest) + if path is not None: + return _extend_path('co_consts[<lambda>]', path) + else: + return _search_lambda(callable, code_objects_by_name, rest) + elif node.co_name == first_part: + return _search(callable, node, rest) + + +def _search_lambda(callable, code_objects_by_name, qual_name_parts): + """Searches a lambda to create a stable reference code path. + + Args: + callable: The callable to search for. + code_objects_by_name: The code objects to search within, keyed by name. + qual_name_parts: The rest of the qual_name_parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + # There are multiple lambdas in the code object, so we need to calculate + # the signature and the hash to identify the correct lambda. + lambda_code_objects_by_name = collections.defaultdict(list) + name = qual_name_parts[0] + code_objects = code_objects_by_name[name] + if name == '<lambda>': + for code_object in code_objects: + lambda_name = f'<lambda>, {_signature(code_object)}' + lambda_code_objects_by_name[lambda_name].append(code_object) + # Check if there are any lambdas with the same signature. + # If there are, we need to calculate the hash to identify the correct + # lambda. + for lambda_name, lambda_objects in lambda_code_objects_by_name.items(): + if len(lambda_objects) > 1: + for lambda_object in lambda_objects: + path = _search(callable, lambda_object, qual_name_parts) + if path is not None: + return _extend_path( + f'co_consts[{lambda_name},' + f' {_create_bytecode_hash(lambda_object)}]', + path, + ) + else: + # If there is only one lambda with this signature, we can + # use the signature to identify the correct lambda. + path = _search(callable, code_objects[0], qual_name_parts) + if path is not None: + return _extend_path(f'co_consts[{lambda_name}]', path) + else: + # For non lambda objects, we can use the name to identify the object. + path = _search(callable, code_objects[0], qual_name_parts) + if path is not None: + return _extend_path(f'co_consts[{name}]', path) + + +# Matches a path like: co_consts[my_function] +_SINGLE_NAME_PATTERN = re.compile(r'co_consts\[([a-zA-Z0-9\<\>_-]+)]') +# Matches a path like: co_consts[<lambda>, ('x',)] +_LAMBDA_WITH_ARGS_PATTERN = re.compile( + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\))\]") +# Matches a path like: co_consts[<lambda>, ('x',), 1234567890] +_LAMBDA_WITH_HASH_PATTERN = re.compile( + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\)),\s*(.+)\]") +# Matches a path like: __defaults__[0] +_DEFAULT_PATTERN = re.compile(r'(__defaults__)\[(\d+)\]') +# Matches an argument like: 'x' Review Comment: is matching an empty string ok in this case? (`''`) ########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,391 @@ # limitations under the License. # +import collections +import hashlib +import inspect +import re +import sys +import types + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" return path + + +def _get_code_path(callable): Review Comment: the method is seemingly unused in Beam codebase - would you mind adding some comment to prevent someone from thinking this is dead code? ########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,391 @@ # limitations under the License. # +import collections +import hashlib +import inspect +import re +import sys +import types + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" Review Comment: Could you please add some top-level comments re: what is a normalized path, and what is a stable reference? Thanks! ########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,391 @@ # limitations under the License. # +import collections +import hashlib +import inspect +import re +import sys +import types + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" return path + + +def _get_code_path(callable): + """Returns the stable reference to the code object. + + Args: + callable: The callable object to search for. + + Returns: + The stable reference to the code object. + """ + code_path = _extend_path( + callable.__module__, + _search( + callable, + sys.modules[callable.__module__], + callable.__qualname__.split('.'), + ), + ) + return code_path + + +def _extend_path(prefix, suffix): + """Extends the path to the code object. + + Args: + prefix: The prefix of the path. + suffix: The rest of the path. + + Returns: + The extended path. + """ + if suffix is None: + return None + if not suffix: + return prefix + return prefix + '.' + suffix + + +def _search(callable, node, qual_name_parts): Review Comment: Would _find_path be a better name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org