This is an automated email from the ASF dual-hosted git repository. abeizn pushed a commit to branch release-v1.0 in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit f4ec13cc5d27980b6384e01e87da1b09b6a0d360 Author: Keon Amini <[email protected]> AuthorDate: Sun Mar 24 23:11:32 2024 -0500 feat: Configurable logging for PyDevLake (#7206) Co-authored-by: Keon Amini <[email protected]> --- backend/python/pydevlake/pydevlake/api.py | 21 ++++++---- .../pydevlake/pydevlake/{logger.py => config.py} | 22 +++------- backend/python/pydevlake/pydevlake/ipc.py | 19 +++++---- backend/python/pydevlake/pydevlake/logger.py | 18 +++++++- backend/server/services/remote/bridge/cmd.go | 4 +- backend/server/services/remote/bridge/context.go | 49 +++++++++++----------- 6 files changed, 73 insertions(+), 60 deletions(-) diff --git a/backend/python/pydevlake/pydevlake/api.py b/backend/python/pydevlake/pydevlake/api.py index 10547cb8a..a50b71d94 100644 --- a/backend/python/pydevlake/pydevlake/api.py +++ b/backend/python/pydevlake/pydevlake/api.py @@ -23,8 +23,10 @@ import json import time import requests as req +import requests.models -from pydevlake.logger import logger +import pydevlake.logger +from pydevlake.logger import logger, DEBUG from pydevlake.model import Connection @@ -108,8 +110,8 @@ class APIBase: def base_url(self) -> Optional[str]: return None - def send(self, request: Request): - request = self._apply_hooks(request, self.request_hooks()) + def send(self, request: Request) -> Response: + request: Request = self._apply_hooks(request, self.request_hooks()) if request is ABORT: return ABORT @@ -117,7 +119,7 @@ class APIBase: if self.proxy: proxies['http'] = self.proxy proxies['https'] = self.proxy - res = self.session.get( + res: requests.models.Response = self.session.get( url=request.url, headers=request.headers, params=request.query_args, @@ -145,11 +147,14 @@ class APIBase: target = result return target - def get(self, *path_args, **query_args): - parts = [self.base_url, *path_args] if self.base_url else path_args - url = "/".join([str(a).strip('/') for a in parts]) + def get(self, *path_args, **query_args) -> Response: + parts: list[str] = [self.base_url, *path_args] if self.base_url else path_args + url: str = "/".join([str(a).strip('/') for a in parts]) req = Request(url, query_args) - return self.send(req) + resp = self.send(req) + if logger.isEnabledFor(DEBUG): # explicit check because logger call is potentially expensive + logger.debug(f'PyDevlake REST call to GET {resp.get_url_with_query_string()} responded with {resp.status} and {resp.json}') + return resp def request_hooks(self): if not hasattr(self, '_request_hooks'): diff --git a/backend/python/pydevlake/pydevlake/logger.py b/backend/python/pydevlake/pydevlake/config.py similarity index 64% copy from backend/python/pydevlake/pydevlake/logger.py copy to backend/python/pydevlake/pydevlake/config.py index 6122c285a..24a3c08f1 100644 --- a/backend/python/pydevlake/pydevlake/logger.py +++ b/backend/python/pydevlake/pydevlake/config.py @@ -13,22 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pydevlake.logger import log_levels, logger, INFO -import sys -import logging - -stdout_handler = logging.StreamHandler(sys.stdout) -stdout_handler.addFilter(lambda rec: rec.levelno < logging.ERROR) - - -stderr_handler = logging.StreamHandler(sys.stderr) -stderr_handler.addFilter(lambda rec: rec.levelno >= logging.ERROR) - -logging.basicConfig( - level=logging.DEBUG, - format='%(levelname)s: %(message)s', - handlers=[stdout_handler, stderr_handler] -) - -logger = logging.getLogger() +# sets the global config of pydevlake +def set_config(cfg: dict): + log_level = cfg.get("log_level") + logger.setLevel(log_levels.get(log_level, INFO)) diff --git a/backend/python/pydevlake/pydevlake/ipc.py b/backend/python/pydevlake/pydevlake/ipc.py index 963b66b2a..135d41cc1 100644 --- a/backend/python/pydevlake/pydevlake/ipc.py +++ b/backend/python/pydevlake/pydevlake/ipc.py @@ -27,6 +27,7 @@ from sqlalchemy.engine import Engine from pydevlake.context import Context from pydevlake.message import Message from pydevlake.model import SubtaskRun +from pydevlake.config import set_config def plugin_method(func): @@ -51,6 +52,9 @@ def plugin_method(func): @wraps(func) @SetParseFn(parse_arg) def wrapper(self, *args): + # first arg will always be arg - pluck it out + cfg = args[0] + set_config(cfg) ret = func(self, *args) if ret is not None: with open_send_channel() as send_ch: @@ -64,31 +68,32 @@ def plugin_method(func): return wrapper +# all remote-callable (@plugin_method) methods should have a _ placeholder first param (until/unless I figure out a better way) class PluginCommands: def __init__(self, plugin): self._plugin = plugin @plugin_method - def collect(self, ctx: dict, stream: str): + def collect(self, _, ctx: dict, stream: str): yield from self._plugin.collect(self._mk_context(ctx), stream) @plugin_method - def extract(self, ctx: dict, stream: str): + def extract(self, _, ctx: dict, stream: str): yield from self._plugin.extract(self._mk_context(ctx), stream) @plugin_method - def convert(self, ctx: dict, stream: str): + def convert(self, _, ctx: dict, stream: str): yield from self._plugin.convert(self._mk_context(ctx), stream) @plugin_method - def test_connection(self, connection: dict): + def test_connection(self, _, connection: dict): if "name" not in connection: connection["name"] = "Test connection" connection = self._plugin.connection_type(**connection) return self._plugin.test_connection(connection) @plugin_method - def make_pipeline(self, scope_config_pairs: list[tuple[dict, dict]], connection: dict): + def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connection: dict): connection = self._plugin.connection_type(**connection) scope_config_pairs = [ ( @@ -100,11 +105,11 @@ class PluginCommands: return self._plugin.make_pipeline(scope_config_pairs, connection) @plugin_method - def plugin_info(self): + def plugin_info(self, _): return self._plugin.plugin_info() @plugin_method - def remote_scopes(self, connection: dict, group_id: Optional[str] = None): + def remote_scopes(self, _, connection: dict, group_id: Optional[str] = None): c = self._plugin.connection_type(**connection) yield from self._plugin.make_remote_scopes(c, group_id) diff --git a/backend/python/pydevlake/pydevlake/logger.py b/backend/python/pydevlake/pydevlake/logger.py index 6122c285a..bb497754e 100644 --- a/backend/python/pydevlake/pydevlake/logger.py +++ b/backend/python/pydevlake/pydevlake/logger.py @@ -17,16 +17,30 @@ import sys import logging +CRITICAL = logging.CRITICAL +FATAL = logging.FATAL +ERROR = logging.ERROR +WARNING = logging.WARNING +INFO = logging.INFO +DEBUG = logging.DEBUG +NOTSET = logging.NOTSET + +# mappings from main Go server to Python logging levels +log_levels = { + 'debug': DEBUG, + 'info': INFO, + 'warn': WARNING, + 'error': ERROR, +} stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.addFilter(lambda rec: rec.levelno < logging.ERROR) - stderr_handler = logging.StreamHandler(sys.stderr) stderr_handler.addFilter(lambda rec: rec.levelno >= logging.ERROR) logging.basicConfig( - level=logging.DEBUG, + level=INFO, # default format='%(levelname)s: %(message)s', handlers=[stdout_handler, stderr_handler] ) diff --git a/backend/server/services/remote/bridge/cmd.go b/backend/server/services/remote/bridge/cmd.go index 25326ad44..1f47f9432 100644 --- a/backend/server/services/remote/bridge/cmd.go +++ b/backend/server/services/remote/bridge/cmd.go @@ -51,7 +51,7 @@ func NewCmdInvoker(execPath string) *CmdInvoker { } func (c *CmdInvoker) Call(methodName string, ctx plugin.ExecContext, args ...any) *CallResult { - serializedArgs, err := serialize(args...) + serializedArgs, err := serialize(append([]any{DefaultContext.GetRemoteConfig()}, args...)...) if err != nil { return &CallResult{ Err: err, @@ -92,7 +92,7 @@ func (c *CmdInvoker) Stream(methodName string, ctx plugin.ExecContext, args ...a outbound: nil, inbound: recvChannel, } - serializedArgs, err := serialize(args...) + serializedArgs, err := serialize(append([]any{DefaultContext.GetRemoteConfig()}, args...)...) if err != nil { recvChannel <- NewStreamResult(nil, err) return stream diff --git a/backend/server/services/remote/bridge/context.go b/backend/server/services/remote/bridge/context.go index 57dd78dae..e66d1e66e 100644 --- a/backend/server/services/remote/bridge/context.go +++ b/backend/server/services/remote/bridge/context.go @@ -26,7 +26,6 @@ import ( "github.com/apache/incubator-devlake/core/log" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/impls/logruslog" - "github.com/spf13/cast" "github.com/spf13/viper" ) @@ -40,18 +39,23 @@ type RemoteProgress struct { type RemoteContext interface { plugin.ExecContext - GetSettings() map[string]any + GetRemoteConfig() *RemoteConfig +} + +// RemoteConfig holds common configuration for all remote calls +type RemoteConfig struct { + LogLevel string `json:"log_level"` } type remoteContextImpl struct { - parent plugin.ExecContext - logger log.Logger - ctx context.Context - Settings map[string]any `json:"settings"` + parent plugin.ExecContext + logger log.Logger + ctx context.Context + remoteConfig *RemoteConfig } -func (r remoteContextImpl) GetSettings() map[string]any { - return r.Settings +func (r remoteContextImpl) GetRemoteConfig() *RemoteConfig { + return r.remoteConfig } func (r remoteContextImpl) GetConfigReader() config.ConfigReader { @@ -60,10 +64,9 @@ func (r remoteContextImpl) GetConfigReader() config.ConfigReader { func (r remoteContextImpl) ReplaceLogger(logger log.Logger) ctx.BasicRes { return &remoteContextImpl{ - parent: r.parent, - logger: logger, - ctx: r.ctx, - Settings: r.Settings, + parent: r.parent, + logger: logger, + ctx: r.ctx, } } @@ -72,28 +75,26 @@ func (r remoteContextImpl) NestedLogger(name string) ctx.BasicRes { } func NewRemoteContext(logger log.Logger, cfg *viper.Viper) RemoteContext { + remoteCfg := &RemoteConfig{ + LogLevel: cfg.GetString("LOGGING_LEVEL"), + } return &remoteContextImpl{ - logger: logger, - Settings: cfg.AllSettings(), - ctx: context.Background(), + logger: logger, + ctx: context.Background(), + remoteConfig: remoteCfg, } } func NewChildRemoteContext(ec plugin.ExecContext) RemoteContext { return &remoteContextImpl{ - parent: ec, - logger: ec.GetLogger(), - ctx: ec.GetContext(), - Settings: DefaultContext.GetSettings(), + parent: ec, + logger: ec.GetLogger(), + ctx: ec.GetContext(), } } func (r remoteContextImpl) GetConfig(name string) string { - val, ok := r.Settings[name] - if !ok { - return "" - } - return cast.ToString(val) + return config.GetConfig().GetString(name) } func (r remoteContextImpl) GetLogger() log.Logger {
