This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c5cc990780 feat(core)!: implement an interceptor for the logging layer 
(#4961)
c5cc990780 is described below

commit c5cc990780b13cc328e01fd2924baa11bc7999dd
Author: Yingwen <[email protected]>
AuthorDate: Fri Aug 9 17:03:38 2024 +0800

    feat(core)!: implement an interceptor for the logging layer (#4961)
    
    * feat: init
    
    * feat: accessor log
    
    * feat: more logs
    
    * feat: compile
    
    * feat: use interceptor
    
    * feat: use ctx to log
    
    * feat: avoid formatting err first
    
    * style: fix clippy
    
    * chore: update comments
    
    * refactor: pass operation
    
    * feat: add context
    
    * feat: remove some apis from DefaultLoggingLayer
    
    * refactor: remove path
    
    * docs: fix example
    
    * test: fix haskell test
---
 bindings/haskell/test/BasicTest.hs |    2 +-
 core/src/layers/logging.rs         | 1216 ++++++++++++++----------------------
 core/src/layers/mod.rs             |    1 +
 core/src/raw/tests/utils.rs        |   68 +-
 4 files changed, 526 insertions(+), 761 deletions(-)

diff --git a/bindings/haskell/test/BasicTest.hs 
b/bindings/haskell/test/BasicTest.hs
index 1311f9a05f..6c817d046e 100644
--- a/bindings/haskell/test/BasicTest.hs
+++ b/bindings/haskell/test/BasicTest.hs
@@ -109,7 +109,7 @@ testLogger = do
   let logFn = LogAction $ logger state
   Right _ <- newOperator "memory" {ocLogAction = Just logFn}
   logStr <- readIORef state
-  T.take 77 logStr @?= "service=memory operation=metadata -> 
startedservice=memory operation=metadata"
+  T.take 78 logStr @?= "service=memory operation=metadata  -> 
startedservice=memory operation=metadata"
 
 -- helper function
 
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 40294204f6..1dfdba85ef 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -23,9 +23,7 @@ use std::sync::Arc;
 use bytes::Buf;
 use futures::FutureExt;
 use futures::TryFutureExt;
-use log::debug;
 use log::log;
-use log::trace;
 use log::Level;
 
 use crate::raw::*;
@@ -80,74 +78,73 @@ use crate::*;
 /// ```shell
 /// RUST_LOG="info,opendal::services=debug" ./app
 /// ```
-#[derive(Debug, Copy, Clone)]
-pub struct LoggingLayer {
-    error_level: Option<Level>,
-    failure_level: Option<Level>,
-    backtrace_output: bool,
+///
+/// # Logging Interceptor
+///
+/// You can implement your own logging interceptor to customize the logging 
behavior.
+///
+/// ```no_run
+/// use opendal::layers::LoggingInterceptor;
+/// use opendal::layers::LoggingLayer;
+/// use opendal::raw::Operation;
+/// use opendal::services;
+/// use opendal::Error;
+/// use opendal::Operator;
+/// use opendal::Scheme;
+///
+/// #[derive(Debug, Clone)]
+/// struct MyLoggingInterceptor;
+///
+/// impl LoggingInterceptor for MyLoggingInterceptor {
+///     fn log(
+///         &self,
+///         scheme: Scheme,
+///         operation: Operation,
+///         context: &str,
+///         message: &str,
+///         err: Option<&Error>,
+///     ) {
+///         // log something
+///     }
+/// }
+///
+/// let _ = Operator::new(services::Memory::default())
+///     .expect("must init")
+///     .layer(LoggingLayer::new(MyLoggingInterceptor))
+///     .finish();
+/// ```
+#[derive(Debug)]
+pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
+    notify: Arc<I>,
 }
 
-impl Default for LoggingLayer {
-    fn default() -> Self {
+impl<I> Clone for LoggingLayer<I> {
+    fn clone(&self) -> Self {
         Self {
-            error_level: Some(Level::Warn),
-            failure_level: Some(Level::Error),
-            backtrace_output: false,
+            notify: self.notify.clone(),
         }
     }
 }
 
-impl LoggingLayer {
-    /// Setting the log level while expected error happened.
-    ///
-    /// For example: accessor returns NotFound.
-    ///
-    /// `None` means disable the log for error.
-    pub fn with_error_level(mut self, level: Option<&str>) -> Result<Self> {
-        if let Some(level_str) = level {
-            let level = level_str.parse().map_err(|_| {
-                Error::new(ErrorKind::ConfigInvalid, "invalid log level")
-                    .with_context("level", level_str)
-            })?;
-            self.error_level = Some(level);
-        } else {
-            self.error_level = None;
+impl Default for LoggingLayer {
+    fn default() -> Self {
+        Self {
+            notify: Arc::new(DefaultLoggingInterceptor),
         }
-        Ok(self)
     }
+}
 
-    /// Setting the log level while unexpected failure happened.
-    ///
-    /// For example: accessor returns Unexpected network error.
-    ///
-    /// `None` means disable the log for failure.
-    pub fn with_failure_level(mut self, level: Option<&str>) -> Result<Self> {
-        if let Some(level_str) = level {
-            let level = level_str.parse().map_err(|_| {
-                Error::new(ErrorKind::ConfigInvalid, "invalid log level")
-                    .with_context("level", level_str)
-            })?;
-            self.failure_level = Some(level);
-        } else {
-            self.failure_level = None;
+impl LoggingLayer {
+    /// Create the layer with specific logging interceptor.
+    pub fn new<I: LoggingInterceptor>(notify: I) -> LoggingLayer<I> {
+        LoggingLayer {
+            notify: Arc::new(notify),
         }
-        Ok(self)
-    }
-
-    /// Setting whether to output backtrace while unexpected failure happened.
-    ///
-    /// # Notes
-    ///
-    /// - When the error is an expected error, backtrace will not be output.
-    /// - backtrace output is disable by default.
-    pub fn with_backtrace_output(mut self, enable: bool) -> Self {
-        self.backtrace_output = enable;
-        self
     }
 }
 
-impl<A: Access> Layer<A> for LoggingLayer {
-    type LayeredAccess = LoggingAccessor<A>;
+impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
+    type LayeredAccess = LoggingAccessor<A, I>;
 
     fn layer(&self, inner: A) -> Self::LayeredAccess {
         let meta = inner.info();
@@ -156,400 +153,342 @@ impl<A: Access> Layer<A> for LoggingLayer {
 
             ctx: LoggingContext {
                 scheme: meta.scheme(),
-                error_level: self.error_level,
-                failure_level: self.failure_level,
-                backtrace_output: self.backtrace_output,
+                notify: self.notify.clone(),
             },
         }
     }
 }
 
-#[derive(Clone, Debug)]
-pub struct LoggingContext {
+#[derive(Debug)]
+pub struct LoggingContext<I> {
     scheme: Scheme,
-    error_level: Option<Level>,
-    failure_level: Option<Level>,
-    backtrace_output: bool,
+    notify: Arc<I>,
 }
 
-impl LoggingContext {
-    #[inline]
-    fn error_level(&self, err: &Error) -> Option<Level> {
-        if err.kind() == ErrorKind::Unexpected {
-            self.failure_level
-        } else {
-            self.error_level
+impl<I> Clone for LoggingContext<I> {
+    fn clone(&self) -> Self {
+        Self {
+            scheme: self.scheme,
+            notify: self.notify.clone(),
         }
     }
+}
 
-    /// Print error with backtrace if it's unexpected error.
-    #[inline]
-    fn error_print(&self, err: &Error) -> String {
-        // Don't print backtrace if it's not unexpected error.
-        if err.kind() != ErrorKind::Unexpected {
-            return format!("{err}");
+impl<I: LoggingInterceptor> LoggingContext<I> {
+    fn log(&self, operation: Operation, context: &str, message: &str, err: 
Option<&Error>) {
+        self.notify
+            .log(self.scheme, operation, context, message, err)
+    }
+
+    fn log_with_path(&self, operation: Operation, path: &str, message: &str, 
err: Option<&Error>) {
+        self.notify.log(
+            self.scheme,
+            operation,
+            &format!("path={path}"),
+            message,
+            err,
+        )
+    }
+}
+
+/// LoggingInterceptor is used to intercept the log.
+pub trait LoggingInterceptor: Debug + Send + Sync + 'static {
+    /// Everytime there is a log, this function will be called.
+    ///
+    /// # Inputs
+    ///
+    /// - scheme: The service generates the log.
+    /// - operation: The operation to log.
+    /// - context: Additional context of the log.
+    /// - message: The log message.
+    /// - err: The error to log.
+    ///
+    /// # Note
+    ///
+    /// Users should avoid calling resource-intensive operations such as I/O 
or network
+    /// functions here, especially anything that takes longer than 10ms. 
Otherwise, Opendal
+    /// could perform unexpectedly slow.
+    fn log(
+        &self,
+        scheme: Scheme,
+        operation: Operation,
+        context: &str,
+        message: &str,
+        err: Option<&Error>,
+    );
+}
+
+/// The DefaultLoggingInterceptor will log the message by the standard logging 
macro.
+#[derive(Debug)]
+pub struct DefaultLoggingInterceptor;
+
+impl LoggingInterceptor for DefaultLoggingInterceptor {
+    fn log(
+        &self,
+        scheme: Scheme,
+        operation: Operation,
+        context: &str,
+        message: &str,
+        err: Option<&Error>,
+    ) {
+        let Some(err) = err else {
+            let lvl = self.operation_level(operation);
+            log!(
+                target: LOGGING_TARGET,
+                lvl,
+                "service={} operation={} {} -> {}",
+                scheme,
+                operation,
+                context,
+                message,
+            );
+            return;
+        };
+
+        let lvl = self.error_level(err);
+        log!(
+            target: LOGGING_TARGET,
+            lvl,
+            "service={} operation={} {} -> {} {}",
+            scheme,
+            operation,
+            context,
+            message,
+            err,
+        );
+    }
+}
+
+impl DefaultLoggingInterceptor {
+    fn operation_level(&self, operation: Operation) -> Level {
+        match operation {
+            Operation::ReaderRead
+            | Operation::BlockingReaderRead
+            | Operation::WriterWrite
+            | Operation::BlockingWriterWrite => Level::Trace,
+            _ => Level::Debug,
         }
+    }
 
-        if self.backtrace_output {
-            format!("{err:?}")
+    #[inline]
+    fn error_level(&self, err: &Error) -> Level {
+        if err.kind() == ErrorKind::Unexpected {
+            Level::Error
         } else {
-            format!("{err}")
+            Level::Warn
         }
     }
 }
 
 #[derive(Clone, Debug)]
-pub struct LoggingAccessor<A: Access> {
+pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
     inner: A,
 
-    ctx: LoggingContext,
+    ctx: LoggingContext<I>,
 }
 
 static LOGGING_TARGET: &str = "opendal::services";
 
-impl<A: Access> LayeredAccess for LoggingAccessor<A> {
+impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> 
{
     type Inner = A;
-    type Reader = LoggingReader<A::Reader>;
-    type BlockingReader = LoggingReader<A::BlockingReader>;
-    type Writer = LoggingWriter<A::Writer>;
-    type BlockingWriter = LoggingWriter<A::BlockingWriter>;
-    type Lister = LoggingLister<A::Lister>;
-    type BlockingLister = LoggingLister<A::BlockingLister>;
+    type Reader = LoggingReader<A::Reader, I>;
+    type BlockingReader = LoggingReader<A::BlockingReader, I>;
+    type Writer = LoggingWriter<A::Writer, I>;
+    type BlockingWriter = LoggingWriter<A::BlockingWriter, I>;
+    type Lister = LoggingLister<A::Lister, I>;
+    type BlockingLister = LoggingLister<A::BlockingLister, I>;
 
     fn inner(&self) -> &Self::Inner {
         &self.inner
     }
 
     fn metadata(&self) -> Arc<AccessorInfo> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} -> started",
-            self.ctx.scheme,
-            Operation::Info
-        );
+        self.ctx.log(Operation::Info, "", "started", None);
         let result = self.inner.info();
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} -> finished: {:?}",
-            self.ctx.scheme,
+        self.ctx.log(
             Operation::Info,
-            result
+            "",
+            &format!("finished: {:?}", result),
+            None,
         );
 
         result
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::CreateDir,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::CreateDir, path, "started", None);
 
         self.inner
             .create_dir(path, args)
             .await
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished",
-                    self.ctx.scheme,
-                    Operation::CreateDir,
-                    path
-                );
+                self.ctx
+                    .log_with_path(Operation::CreateDir, path, "finished", 
None);
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::CreateDir,
-                        path,
-                        self.ctx.error_print(&err)
-                    )
-                };
+                self.ctx
+                    .log_with_path(Operation::CreateDir, path, "", Some(&err));
                 err
             })
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::Read,
-            path,
-        );
+        self.ctx
+            .log_with_path(Operation::Read, path, "started", None);
 
         self.inner
             .read(path, args)
             .await
             .map(|(rp, r)| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> got reader",
-                    self.ctx.scheme,
-                    Operation::Read,
-                    path,
-                );
+                self.ctx
+                    .log_with_path(Operation::Read, path, "got reader", None);
                 (
                     rp,
                     LoggingReader::new(self.ctx.clone(), Operation::Read, 
path, r),
                 )
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::Read,
-                        path,
-                        self.ctx.error_print(&err)
-                    )
-                }
+                self.ctx
+                    .log_with_path(Operation::Read, path, "", Some(&err));
                 err
             })
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::Write,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::Write, path, "started", None);
 
         self.inner
             .write(path, args)
             .await
             .map(|(rp, w)| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> start writing",
-                    self.ctx.scheme,
-                    Operation::Write,
-                    path,
-                );
+                self.ctx
+                    .log_with_path(Operation::Write, path, "start writing", 
None);
                 let w = LoggingWriter::new(self.ctx.clone(), Operation::Write, 
path, w);
                 (rp, w)
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::Write,
-                        path,
-                        self.ctx.error_print(&err)
-                    )
-                };
+                self.ctx
+                    .log_with_path(Operation::Write, path, "", Some(&err));
                 err
             })
     }
 
     async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} from={} to={} -> started",
-            self.ctx.scheme,
+        self.ctx.log(
             Operation::Copy,
-            from,
-            to
+            &format!("from={from} to={to}"),
+            "started",
+            None,
         );
 
         self.inner
             .copy(from, to, args)
             .await
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} from={} to={} -> finished",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::Copy,
-                    from,
-                    to
+                    &format!("from={from} to={to}"),
+                    "finished",
+                    None,
                 );
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} from={} to={} -> {}",
-                        self.ctx.scheme,
-                        Operation::Copy,
-                        from,
-                        to,
-                        self.ctx.error_print(&err),
-                    )
-                };
+                self.ctx.log(
+                    Operation::Copy,
+                    &format!("from={from} to={to}"),
+                    "",
+                    Some(&err),
+                );
                 err
             })
     }
 
     async fn rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} from={} to={} -> started",
-            self.ctx.scheme,
+        self.ctx.log(
             Operation::Rename,
-            from,
-            to
+            &format!("from={from} to={to}"),
+            "started",
+            None,
         );
 
         self.inner
             .rename(from, to, args)
             .await
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} from={} to={} -> finished",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::Rename,
-                    from,
-                    to
+                    &format!("from={from} to={to}"),
+                    "finished",
+                    None,
                 );
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} from={} to={} -> {}",
-                        self.ctx.scheme,
-                        Operation::Rename,
-                        from,
-                        to,
-                        self.ctx.error_print(&err)
-                    )
-                };
+                self.ctx.log(
+                    Operation::Rename,
+                    &format!("from={from} to={to}"),
+                    "",
+                    Some(&err),
+                );
                 err
             })
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::Stat,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::Stat, path, "started", None);
 
         self.inner
             .stat(path, args)
             .await
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished: {v:?}",
-                    self.ctx.scheme,
-                    Operation::Stat,
-                    path
-                );
+                self.ctx
+                    .log_with_path(Operation::Stat, path, "finished", None);
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::Stat,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                };
+                self.ctx
+                    .log_with_path(Operation::Stat, path, "", Some(&err));
                 err
             })
     }
 
     async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::Delete,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::Delete, path, "started", None);
 
         self.inner
             .delete(path, args.clone())
             .inspect(|v| match v {
                 Ok(_) => {
-                    debug!(
-                        target: LOGGING_TARGET,
-                        "service={} operation={} path={} -> finished",
-                        self.ctx.scheme,
-                        Operation::Delete,
-                        path
-                    );
+                    self.ctx
+                        .log_with_path(Operation::Delete, path, "finished", 
None);
                 }
                 Err(err) => {
-                    if let Some(lvl) = self.ctx.error_level(err) {
-                        log!(
-                            target: LOGGING_TARGET,
-                            lvl,
-                            "service={} operation={} path={} -> {}",
-                            self.ctx.scheme,
-                            Operation::Delete,
-                            path,
-                            self.ctx.error_print(err)
-                        );
-                    }
+                    self.ctx
+                        .log_with_path(Operation::Delete, path, "", Some(err));
                 }
             })
             .await
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::List,
-            path
-        );
-
         self.inner
             .list(path, args)
             .map(|v| match v {
                 Ok((rp, v)) => {
-                    debug!(
-                        target: LOGGING_TARGET,
-                        "service={} operation={} path={} -> start listing dir",
-                        self.ctx.scheme,
-                        Operation::List,
-                        path
-                    );
+                    self.ctx
+                        .log_with_path(Operation::List, path, "start listing 
dir", None);
                     let streamer = LoggingLister::new(self.ctx.clone(), path, 
Operation::List, v);
                     Ok((rp, streamer))
                 }
                 Err(err) => {
-                    if let Some(lvl) = self.ctx.error_level(&err) {
-                        log!(
-                            target: LOGGING_TARGET,
-                            lvl,
-                            "service={} operation={} path={} -> {}",
-                            self.ctx.scheme,
-                            Operation::List,
-                            path,
-                            self.ctx.error_print(&err)
-                        );
-                    }
+                    self.ctx
+                        .log_with_path(Operation::List, path, "", Some(&err));
                     Err(err)
                 }
             })
@@ -557,39 +496,20 @@ impl<A: Access> LayeredAccess for LoggingAccessor<A> {
     }
 
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::Presign,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::Presign, path, "started", None);
 
         self.inner
             .presign(path, args)
             .await
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished: {v:?}",
-                    self.ctx.scheme,
-                    Operation::Presign,
-                    path
-                );
+                self.ctx
+                    .log_with_path(Operation::Presign, path, "finished", None);
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::Presign,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx
+                    .log_with_path(Operation::Presign, path, "", Some(&err));
                 err
             })
     }
@@ -597,352 +517,216 @@ impl<A: Access> LayeredAccess for LoggingAccessor<A> {
     async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
         let (op, count) = (args.operation()[0].1.operation(), 
args.operation().len());
 
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={}-{op} count={count} -> started",
-            self.ctx.scheme,
+        self.ctx.log(
             Operation::Batch,
+            &format!("op={op} count={count} -> started"),
+            "started",
+            None,
         );
 
         self.inner
             .batch(args)
             .map_ok(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={}-{op} count={count} -> finished: 
{}, succeed: {}, failed: {}",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::Batch,
-                    v.results().len(),
-                    v.results().iter().filter(|(_, v)|v.is_ok()).count(),
-                    v.results().iter().filter(|(_, v)|v.is_err()).count(),
+                    &format!("op={op} count={count}"),
+                    &format!(
+                        "finished: {}, succeed: {}, failed: {}",
+                        v.results().len(),
+                        v.results().iter().filter(|(_, v)| v.is_ok()).count(),
+                        v.results().iter().filter(|(_, v)| v.is_err()).count(),
+                    ),
+                    None,
                 );
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={}-{op} count={count} -> {}",
-                        self.ctx.scheme,
-                        Operation::Batch,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx.log(
+                    Operation::Batch,
+                    &format!("op={op} count={count}"),
+                    "",
+                    Some(&err),
+                );
                 err
             })
             .await
     }
 
     fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::BlockingCreateDir,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::BlockingCreateDir, path, "started", 
None);
 
         self.inner
             .blocking_create_dir(path, args)
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished",
-                    self.ctx.scheme,
-                    Operation::BlockingCreateDir,
-                    path
-                );
+                self.ctx
+                    .log_with_path(Operation::BlockingCreateDir, path, 
"finished", None);
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingCreateDir,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx
+                    .log_with_path(Operation::BlockingCreateDir, path, "", 
Some(&err));
                 err
             })
     }
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::BlockingRead,
-            path,
-        );
+        self.ctx
+            .log_with_path(Operation::BlockingRead, path, "started", None);
 
         self.inner
             .blocking_read(path, args.clone())
             .map(|(rp, r)| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> got reader",
-                    self.ctx.scheme,
-                    Operation::BlockingRead,
-                    path,
-                );
+                self.ctx
+                    .log_with_path(Operation::BlockingRead, path, "got 
reader", None);
                 let r = LoggingReader::new(self.ctx.clone(), 
Operation::BlockingRead, path, r);
                 (rp, r)
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingRead,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx
+                    .log_with_path(Operation::BlockingRead, path, "", 
Some(&err));
                 err
             })
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::BlockingWrite,
-            path,
-        );
+        self.ctx
+            .log_with_path(Operation::BlockingWrite, path, "started", None);
 
         self.inner
             .blocking_write(path, args)
             .map(|(rp, w)| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> start writing",
-                    self.ctx.scheme,
-                    Operation::BlockingWrite,
-                    path,
-                );
+                self.ctx
+                    .log_with_path(Operation::BlockingWrite, path, "start 
writing", None);
                 let w = LoggingWriter::new(self.ctx.clone(), 
Operation::BlockingWrite, path, w);
                 (rp, w)
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingWrite,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx
+                    .log_with_path(Operation::BlockingWrite, path, "", 
Some(&err));
                 err
             })
     }
 
     fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> 
Result<RpCopy> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} from={} to={} -> started",
-            self.ctx.scheme,
+        self.ctx.log(
             Operation::BlockingCopy,
-            from,
-            to,
+            &format!("from={from} to={to}"),
+            "started",
+            None,
         );
 
         self.inner
             .blocking_copy(from, to, args)
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} from={} to={} -> finished",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::BlockingCopy,
-                    from,
-                    to,
+                    &format!("from={from} to={to}"),
+                    "finished",
+                    None,
                 );
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} from={} to={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingCopy,
-                        from,
-                        to,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx.log(
+                    Operation::BlockingCopy,
+                    &format!("from={from} to={to}"),
+                    "",
+                    Some(&err),
+                );
                 err
             })
     }
 
     fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} from={} to={} -> started",
-            self.ctx.scheme,
+        self.ctx.log(
             Operation::BlockingRename,
-            from,
-            to,
+            &format!("from={from} to={to}"),
+            "started",
+            None,
         );
 
         self.inner
             .blocking_rename(from, to, args)
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} from={} to={} -> finished",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::BlockingRename,
-                    from,
-                    to,
+                    &format!("from={from} to={to}"),
+                    "finished",
+                    None,
                 );
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} from={} to={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingRename,
-                        from,
-                        to,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx.log(
+                    Operation::BlockingRename,
+                    &format!("from={from} to={to}"),
+                    "",
+                    Some(&err),
+                );
                 err
             })
     }
 
     fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::BlockingStat,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::BlockingStat, path, "started", None);
 
         self.inner
             .blocking_stat(path, args)
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished: {v:?}",
-                    self.ctx.scheme,
-                    Operation::BlockingStat,
-                    path
-                );
+                self.ctx
+                    .log_with_path(Operation::BlockingStat, path, "finished", 
None);
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingStat,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx
+                    .log_with_path(Operation::BlockingStat, path, "", 
Some(&err));
                 err
             })
     }
 
     fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::BlockingDelete,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::BlockingDelete, path, "started", None);
 
         self.inner
             .blocking_delete(path, args)
             .map(|v| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished",
-                    self.ctx.scheme,
-                    Operation::BlockingDelete,
-                    path
-                );
+                self.ctx
+                    .log_with_path(Operation::BlockingDelete, path, 
"finished", None);
                 v
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingDelete,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx
+                    .log_with_path(Operation::BlockingDelete, path, "", 
Some(&err));
                 err
             })
     }
 
     fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingLister)> {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} -> started",
-            self.ctx.scheme,
-            Operation::BlockingList,
-            path
-        );
+        self.ctx
+            .log_with_path(Operation::BlockingList, path, "started", None);
 
         self.inner
             .blocking_list(path, args)
             .map(|(rp, v)| {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> got dir",
-                    self.ctx.scheme,
-                    Operation::BlockingList,
-                    path
-                );
+                self.ctx
+                    .log_with_path(Operation::BlockingList, path, "got dir", 
None);
                 let li = LoggingLister::new(self.ctx.clone(), path, 
Operation::BlockingList, v);
                 (rp, li)
             })
             .map_err(|err| {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        Operation::BlockingList,
-                        path,
-                        self.ctx.error_print(&err)
-                    );
-                }
+                self.ctx
+                    .log_with_path(Operation::BlockingList, path, "", 
Some(&err));
                 err
             })
     }
 }
 
 /// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality.
-pub struct LoggingReader<R> {
-    ctx: LoggingContext,
+pub struct LoggingReader<R, I: LoggingInterceptor> {
+    ctx: LoggingContext<I>,
     path: String,
     op: Operation,
 
@@ -950,8 +734,8 @@ pub struct LoggingReader<R> {
     inner: R,
 }
 
-impl<R> LoggingReader<R> {
-    fn new(ctx: LoggingContext, op: Operation, path: &str, reader: R) -> Self {
+impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
+    fn new(ctx: LoggingContext<I>, op: Operation, path: &str, reader: R) -> 
Self {
         Self {
             ctx,
             op,
@@ -963,93 +747,93 @@ impl<R> LoggingReader<R> {
     }
 }
 
-impl<R> Drop for LoggingReader<R> {
+impl<R, I: LoggingInterceptor> Drop for LoggingReader<R, I> {
     fn drop(&mut self) {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} read={} -> data read finished",
-            self.ctx.scheme,
+        self.ctx.log(
             self.op,
-            self.path,
-            self.read.load(Ordering::Relaxed)
+            &format!(
+                "path={} read={}",
+                self.path,
+                self.read.load(Ordering::Relaxed)
+            ),
+            "data read finished",
+            None,
         );
     }
 }
 
-impl<R: oio::Read> oio::Read for LoggingReader<R> {
+impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
     async fn read(&mut self) -> Result<Buffer> {
         match self.inner.read().await {
             Ok(bs) => {
                 self.read
                     .fetch_add(bs.remaining() as u64, Ordering::Relaxed);
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} read={} -> read returns 
{}B",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::ReaderRead,
-                    self.path,
-                    self.read.load(Ordering::Relaxed),
-                    bs.remaining()
+                    &format!(
+                        "path={} read={}",
+                        self.path,
+                        self.read.load(Ordering::Relaxed),
+                    ),
+                    &format!("read returns {}B", bs.remaining()),
+                    None,
                 );
                 Ok(bs)
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} read={} -> read 
failed: {}",
-                        self.ctx.scheme,
-                        Operation::ReaderRead,
+                self.ctx.log(
+                    Operation::ReaderRead,
+                    &format!(
+                        "path={} read={}",
                         self.path,
-                        self.read.load(Ordering::Relaxed),
-                        self.ctx.error_print(&err),
-                    )
-                }
+                        self.read.load(Ordering::Relaxed)
+                    ),
+                    "read failed:",
+                    Some(&err),
+                );
                 Err(err)
             }
         }
     }
 }
 
-impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
+impl<R: oio::BlockingRead, I: LoggingInterceptor> oio::BlockingRead for 
LoggingReader<R, I> {
     fn read(&mut self) -> Result<Buffer> {
         match self.inner.read() {
             Ok(bs) => {
                 self.read
                     .fetch_add(bs.remaining() as u64, Ordering::Relaxed);
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} read={} -> read returns 
{}B",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::BlockingReaderRead,
-                    self.path,
-                    self.read.load(Ordering::Relaxed),
-                    bs.remaining()
+                    &format!(
+                        "path={} read={}",
+                        self.path,
+                        self.read.load(Ordering::Relaxed),
+                    ),
+                    &format!("read returns {}B", bs.remaining()),
+                    None,
                 );
                 Ok(bs)
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} read={} -> read 
failed: {}",
-                        self.ctx.scheme,
-                        Operation::BlockingReaderRead,
+                self.ctx.log(
+                    Operation::BlockingReaderRead,
+                    &format!(
+                        "path={} read={}",
                         self.path,
-                        self.read.load(Ordering::Relaxed),
-                        self.ctx.error_print(&err),
-                    );
-                }
+                        self.read.load(Ordering::Relaxed)
+                    ),
+                    "read failed:",
+                    Some(&err),
+                );
                 Err(err)
             }
         }
     }
 }
 
-pub struct LoggingWriter<W> {
-    ctx: LoggingContext,
+pub struct LoggingWriter<W, I> {
+    ctx: LoggingContext<I>,
     op: Operation,
     path: String,
 
@@ -1057,8 +841,8 @@ pub struct LoggingWriter<W> {
     inner: W,
 }
 
-impl<W> LoggingWriter<W> {
-    fn new(ctx: LoggingContext, op: Operation, path: &str, writer: W) -> Self {
+impl<W, I> LoggingWriter<W, I> {
+    fn new(ctx: LoggingContext<I>, op: Operation, path: &str, writer: W) -> 
Self {
         Self {
             ctx,
             op,
@@ -1070,35 +854,26 @@ impl<W> LoggingWriter<W> {
     }
 }
 
-impl<W: oio::Write> oio::Write for LoggingWriter<W> {
+impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
     async fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
         match self.inner.write(bs).await {
             Ok(_) => {
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> data write 
{}B",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::WriterWrite,
-                    self.path,
-                    self.written,
-                    size,
+                    &format!("path={} written={}B", self.path, self.written),
+                    &format!("data write {}B", size),
+                    None,
                 );
                 Ok(())
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={}B -> data 
write failed: {}",
-                        self.ctx.scheme,
-                        Operation::WriterWrite,
-                        self.path,
-                        self.written,
-                        self.ctx.error_print(&err),
-                    )
-                }
+                self.ctx.log(
+                    Operation::WriterWrite,
+                    &format!("path={} written={}B", self.path, self.written),
+                    "data write failed:",
+                    Some(&err),
+                );
                 Err(err)
             }
         }
@@ -1107,29 +882,21 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
     async fn abort(&mut self) -> Result<()> {
         match self.inner.abort().await {
             Ok(_) => {
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> abort 
writer",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::WriterAbort,
-                    self.path,
-                    self.written,
+                    &format!("path={} written={}B", self.path, self.written),
+                    "abort writer",
+                    None,
                 );
                 Ok(())
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={}B -> abort 
writer failed: {}",
-                        self.ctx.scheme,
-                        Operation::WriterAbort,
-                        self.path,
-                        self.written,
-                        self.ctx.error_print(&err),
-                    )
-                }
+                self.ctx.log(
+                    Operation::WriterAbort,
+                    &format!("path={} written={}B", self.path, self.written),
+                    "abort writer failed:",
+                    Some(&err),
+                );
                 Err(err)
             }
         }
@@ -1138,63 +905,46 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
     async fn close(&mut self) -> Result<()> {
         match self.inner.close().await {
             Ok(_) => {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> data 
written finished",
-                    self.ctx.scheme,
+                self.ctx.log(
                     self.op,
-                    self.path,
-                    self.written
+                    &format!("path={} written={}B", self.path, self.written),
+                    "data written finished",
+                    None,
                 );
                 Ok(())
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={}B -> data 
close failed: {}",
-                        self.ctx.scheme,
-                        Operation::WriterClose,
-                        self.path,
-                        self.written,
-                        self.ctx.error_print(&err),
-                    )
-                }
+                self.ctx.log(
+                    Operation::WriterClose,
+                    &format!("path={} written={}B", self.path, self.written),
+                    "data close failed:",
+                    Some(&err),
+                );
                 Err(err)
             }
         }
     }
 }
 
-impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
+impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for 
LoggingWriter<W, I> {
     fn write(&mut self, bs: Buffer) -> Result<()> {
         match self.inner.write(bs.clone()) {
             Ok(_) => {
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> data write 
{}B",
-                    self.ctx.scheme,
+                self.ctx.log(
                     Operation::BlockingWriterWrite,
-                    self.path,
-                    self.written,
-                    bs.len(),
+                    &format!("path={} written={}B", self.path, self.written),
+                    &format!("data write {}B", bs.len()),
+                    None,
                 );
                 Ok(())
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={}B -> data 
write failed: {}",
-                        self.ctx.scheme,
-                        Operation::BlockingWriterWrite,
-                        self.path,
-                        self.written,
-                        self.ctx.error_print(&err),
-                    )
-                }
+                self.ctx.log(
+                    Operation::BlockingWriterWrite,
+                    &format!("path={} written={}B", self.path, self.written),
+                    "data write failed:",
+                    Some(&err),
+                );
                 Err(err)
             }
         }
@@ -1203,37 +953,29 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
     fn close(&mut self) -> Result<()> {
         match self.inner.close() {
             Ok(_) => {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} written={}B -> data 
written finished",
-                    self.ctx.scheme,
+                self.ctx.log(
                     self.op,
-                    self.path,
-                    self.written
+                    &format!("path={} written={}B", self.path, self.written),
+                    "data written finished",
+                    None,
                 );
                 Ok(())
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} written={}B -> data 
close failed: {}",
-                        self.ctx.scheme,
-                        Operation::BlockingWriterClose,
-                        self.path,
-                        self.written,
-                        self.ctx.error_print(&err),
-                    )
-                }
+                self.ctx.log(
+                    Operation::BlockingWriterClose,
+                    &format!("path={} written={}B", self.path, self.written),
+                    "data close failed:",
+                    Some(&err),
+                );
                 Err(err)
             }
         }
     }
 }
 
-pub struct LoggingLister<P> {
-    ctx: LoggingContext,
+pub struct LoggingLister<P, I: LoggingInterceptor> {
+    ctx: LoggingContext<I>,
     path: String,
     op: Operation,
 
@@ -1241,8 +983,8 @@ pub struct LoggingLister<P> {
     inner: P,
 }
 
-impl<P> LoggingLister<P> {
-    fn new(ctx: LoggingContext, path: &str, op: Operation, inner: P) -> Self {
+impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
+    fn new(ctx: LoggingContext<I>, path: &str, op: Operation, inner: P) -> 
Self {
         Self {
             ctx,
             path: path.to_string(),
@@ -1253,65 +995,38 @@ impl<P> LoggingLister<P> {
     }
 }
 
-impl<P> Drop for LoggingLister<P> {
+impl<P, I: LoggingInterceptor> Drop for LoggingLister<P, I> {
     fn drop(&mut self) {
         if self.finished {
-            debug!(
-                target: LOGGING_TARGET,
-                "service={} operation={} path={} -> all entries read finished",
-                self.ctx.scheme,
-                self.op,
-                self.path
-            );
+            self.ctx
+                .log_with_path(self.op, &self.path, "all entries read 
finished", None);
         } else {
-            debug!(
-                target: LOGGING_TARGET,
-                "service={} operation={} path={} -> partial entries read 
finished",
-                self.ctx.scheme,
-                self.op,
-                self.path
-            );
+            self.ctx
+                .log_with_path(self.op, &self.path, "partial entries read 
finished", None);
         }
     }
 }
 
-impl<P: oio::List> oio::List for LoggingLister<P> {
+impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
     async fn next(&mut self) -> Result<Option<oio::Entry>> {
         let res = self.inner.next().await;
 
         match &res {
             Ok(Some(de)) => {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> listed entry: {}",
-                    self.ctx.scheme,
+                self.ctx.log_with_path(
                     self.op,
-                    self.path,
-                    de.path(),
+                    &self.path,
+                    &format!("listed entry: {}", de.path()),
+                    None,
                 );
             }
             Ok(None) => {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished",
-                    self.ctx.scheme,
-                    self.op,
-                    self.path
-                );
+                self.ctx
+                    .log_with_path(self.op, &self.path, "finished", None);
                 self.finished = true;
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        self.op,
-                        self.path,
-                        self.ctx.error_print(err)
-                    )
-                }
+                self.ctx.log_with_path(self.op, &self.path, "", Some(err));
             }
         };
 
@@ -1319,43 +1034,26 @@ impl<P: oio::List> oio::List for LoggingLister<P> {
     }
 }
 
-impl<P: oio::BlockingList> oio::BlockingList for LoggingLister<P> {
+impl<P: oio::BlockingList, I: LoggingInterceptor> oio::BlockingList for 
LoggingLister<P, I> {
     fn next(&mut self) -> Result<Option<oio::Entry>> {
         let res = self.inner.next();
 
         match &res {
             Ok(Some(des)) => {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> listed entry: {}",
-                    self.ctx.scheme,
+                self.ctx.log_with_path(
                     self.op,
-                    self.path,
-                    des.path(),
+                    &self.path,
+                    &format!("listed entry: {}", des.path()),
+                    None,
                 );
             }
             Ok(None) => {
-                debug!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> finished",
-                    self.ctx.scheme,
-                    self.op,
-                    self.path
-                );
+                self.ctx
+                    .log_with_path(self.op, &self.path, "finished", None);
                 self.finished = true;
             }
             Err(err) => {
-                if let Some(lvl) = self.ctx.error_level(err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} -> {}",
-                        self.ctx.scheme,
-                        self.op,
-                        self.path,
-                        self.ctx.error_print(err)
-                    )
-                }
+                self.ctx.log_with_path(self.op, &self.path, "", Some(err));
             }
         };
 
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index eff0bfb8b7..98372b2203 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -33,6 +33,7 @@ mod immutable_index;
 pub use immutable_index::ImmutableIndexLayer;
 
 mod logging;
+pub use logging::LoggingInterceptor;
 pub use logging::LoggingLayer;
 
 mod timeout;
diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs
index c9eb875d20..1854576428 100644
--- a/core/src/raw/tests/utils.rs
+++ b/core/src/raw/tests/utils.rs
@@ -19,8 +19,11 @@ use std::collections::HashMap;
 use std::env;
 use std::str::FromStr;
 
+use log::{log, Level};
 use once_cell::sync::Lazy;
 
+use crate::layers::LoggingInterceptor;
+use crate::raw::Operation;
 use crate::*;
 
 /// TEST_RUNTIME is the runtime used for running tests.
@@ -73,7 +76,7 @@ pub fn init_test_service() -> Result<Option<Operator>> {
     let op = { op.layer(layers::ChaosLayer::new(0.1)) };
 
     let mut op = op
-        .layer(layers::LoggingLayer::default().with_backtrace_output(true))
+        .layer(layers::LoggingLayer::new(BacktraceLoggingInterceptor))
         .layer(layers::TimeoutLayer::new())
         .layer(layers::RetryLayer::new().with_max_times(4));
 
@@ -88,3 +91,66 @@ pub fn init_test_service() -> Result<Option<Operator>> {
 
     Ok(Some(op))
 }
+
+/// A logging interceptor that logs the backtrace.
+#[derive(Debug, Clone)]
+struct BacktraceLoggingInterceptor;
+
+impl LoggingInterceptor for BacktraceLoggingInterceptor {
+    fn log(
+        &self,
+        scheme: Scheme,
+        operation: raw::Operation,
+        context: &str,
+        message: &str,
+        err: Option<&Error>,
+    ) {
+        let Some(err) = err else {
+            let lvl = self.operation_level(operation);
+            log!(
+                target: "opendal::services",
+                lvl,
+                "service={} operation={} {} -> {}",
+                scheme,
+                operation,
+                context,
+                message,
+            );
+            return;
+        };
+
+        let err_msg = if err.kind() != ErrorKind::Unexpected {
+            format!("{err}")
+        } else {
+            format!("{err:?}")
+        };
+        let lvl = if err.kind() == ErrorKind::Unexpected {
+            Level::Error
+        } else {
+            Level::Warn
+        };
+
+        log!(
+            target: "opendal::services",
+            lvl,
+            "service={} operation={} {} -> {} {}",
+            scheme,
+            operation,
+            context,
+            message,
+            err_msg,
+        );
+    }
+}
+
+impl BacktraceLoggingInterceptor {
+    fn operation_level(&self, operation: Operation) -> Level {
+        match operation {
+            Operation::ReaderRead
+            | Operation::BlockingReaderRead
+            | Operation::WriterWrite
+            | Operation::BlockingWriterWrite => Level::Trace,
+            _ => Level::Debug,
+        }
+    }
+}

Reply via email to