This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch log-frame in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-ai.git
commit c42882e91f0be4288254bf219895862ce5f0215d Author: imbajin <[email protected]> AuthorDate: Mon Nov 11 18:33:50 2024 +0800 refact(client): use rich handle for log --- .../src/pyhugegraph/utils/logger.py | 234 +++++++++++++++++++++ 1 file changed, 234 insertions(+) diff --git a/hugegraph-python-client/src/pyhugegraph/utils/logger.py b/hugegraph-python-client/src/pyhugegraph/utils/logger.py new file mode 100644 index 0000000..5f4a53a --- /dev/null +++ b/hugegraph-python-client/src/pyhugegraph/utils/logger.py @@ -0,0 +1,234 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import atexit +import logging +import os +import sys +import time +from collections import Counter +from functools import lru_cache +from logging.handlers import RotatingFileHandler + +from rich.logging import RichHandler + +__all__ = [ + "setup_logger", + "get_log_level", + "log_first_n", + "log_every_n", + "log_every_n_seconds", +] + +LOG_BUFFER_SIZE_KEY: str = "LOG_BUFFER_SIZE" +DEFAULT_LOG_BUFFER_SIZE: int = 1024 * 1024 # 1MB + + +@lru_cache() # avoid create many handlers when call setup_logger() +def setup_logger( + output=None, + level=logging.INFO, + distributed_rank=0, + *, + name="core", + enable_propagation: bool = False, + configure_stdout: bool = True, + max_bytes=200 * 1024 * 1024, # 200MB + backup_count=5, +): + """ + Init the easydeploy logger and set its verbosity level to "DEBUG". + + Args: + output (str): a file name or a directory to save log. If None, will not save a log file. + If it ends with ".txt" or ".log", assumed to be a file name. + Otherwise, logs will be saved to `output/log.txt`. + name (str): the root module name of this logger + abbrev_name (str): an abbreviation of the module, to avoid long names in logs. + Set to "" to not log the root module in logs. + By default, it will abbreviate "easydeploy" to "ed" and leave other + modules unchanged. + enable_propagation (bool): whether to propagate logs to the parent logger. + configure_stdout (bool): whether to configure logging to stdout. + + + Returns: + logging.Logger: a logger + """ + logger = logging.getLogger(name) + logger.setLevel(level) + logger.propagate = enable_propagation + + if logger.hasHandlers(): + logger.handlers.clear() + + # stdout logging: master only + if configure_stdout and distributed_rank == 0: + rh = RichHandler(level) + rh.setFormatter(logging.Formatter("%(name)s: %(message)s")) + logger.addHandler(rh) + + # file logging: all workers + if output is not None: + if output.endswith(".txt") or output.endswith(".log"): + filename = output + else: + filename = os.path.join(output, "log.txt") + + if distributed_rank > 0: + filename = filename + ".rank{}".format(distributed_rank) + + os.makedirs(os.path.dirname(filename), exist_ok=True) + fh = RotatingFileHandler( + # _cached_log_stream(filename), maxBytes=max_bytes, backupCount=backup_count + filename, + maxBytes=max_bytes, + backupCount=backup_count, + ) + fh.setLevel(logging.DEBUG) + plain_formatter = logging.Formatter( + "[%(asctime)s] %(levelname)s [%(name)s:%(filename)s:%(lineno)d] %(message)s", + datefmt="%m/%d/%y %H:%M:%S", + ) + fh.setFormatter(plain_formatter) + logger.addHandler(fh) + return logger + + +# cache the opened file object, so that different calls to `setup_logger` +# with the same file name can safely write to the same file. +@lru_cache(maxsize=None) +def _cached_log_stream(filename): + """_cached_log_stream""" + # use 1K buffer if writing to cloud storage + io = open(filename, "a", buffering=_get_log_stream_buffer_size(filename)) + atexit.register(io.close) + return io + + +def _get_log_stream_buffer_size(filename: str) -> int: + """_get_log_stream_buffer_size""" + if "://" not in filename: + # Local file, no extra caching is necessary + return -1 + # Remote file requires a larger cache to avoid many smalls writes. + if LOG_BUFFER_SIZE_KEY in os.environ: + return int(os.environ[LOG_BUFFER_SIZE_KEY]) + return DEFAULT_LOG_BUFFER_SIZE + + +""" +Below are some other convenient logging methods. +They are mainly adopted from +https://github.com/abseil/abseil-py/blob/master/absl/logging/__init__.py +""" + + +def _find_caller(): + """ + Returns: + str: module name of the caller + tuple: a hashable key to be used to identify different callers + """ + frame = sys._getframe(2) + while frame: + code = frame.f_code + if os.path.join("utils", "logger.") not in code.co_filename: + mod_name = frame.f_globals["__name__"] + if mod_name == "__main__": + mod_name = "easydeploy" + return mod_name, (code.co_filename, frame.f_lineno, code.co_name) + frame = frame.f_back + + +_LOG_COUNTER = Counter() +_LOG_TIMER = {} + + +def log_first_n(lvl, msg, n=1, *, name=None, key="caller"): + """ + Log only for the first n times. + + Args: + lvl (int): the logging level + msg (str): + n (int): + name (str): name of the logger to use. Will use the caller's module by default. + key (str or tuple[str]): the string(s) can be one of "callers" or + "message", which defines how to identify duplicated logs. + For example, if called with `n=1, key="caller"`, this function + will only log the first call from the same caller, regardless of + the message content. + If called with `n=1, key="message"`, this function will log the + same content only once, even if they are called from different places. + If called with `n=1, key=("caller", "message")`, this function + will not log only if the same caller has logged the same message before. + """ + if isinstance(key, str): + key = (key,) + assert len(key) > 0 + + caller_module, caller_key = _find_caller() + hash_key = () + if "caller" in key: + hash_key = hash_key + caller_key + if "message" in key: + hash_key = hash_key + (msg,) + + _LOG_COUNTER[hash_key] += 1 + if _LOG_COUNTER[hash_key] <= n: + logging.getLogger(name or caller_module).log(lvl, msg) + + +def log_every_n(lvl, msg, n=1, *, name=None): + """ + Log once per n times. + + Args: + lvl (int): the logging level + msg (str): + n (int): + name (str): name of the logger to use. Will use the caller's module by default. + """ + caller_module, key = _find_caller() + _LOG_COUNTER[key] += 1 + if n == 1 or _LOG_COUNTER[key] % n == 1: + logging.getLogger(name or caller_module).log(lvl, msg) + + +def log_every_n_seconds(lvl, msg, n=1, *, name=None): + """ + Log no more than once per n seconds. + + Args: + lvl (int): the logging level + msg (str): + n (int): + name (str): name of the logger to use. Will use the caller's module by default. + """ + caller_module, key = _find_caller() + last_logged = _LOG_TIMER.get(key, None) + current_time = time.time() + if last_logged is None or current_time - last_logged >= n: + logging.getLogger(name or caller_module).log(lvl, msg) + _LOG_TIMER[key] = current_time + + +def get_log_level(level_name: str): + """get the level by its name""" + level = getattr(logging, level_name.upper(), None) + if not isinstance(level, int): + raise ValueError(f"Invalid log level: {level_name}") + return level
