This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch try-use-log-kv
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 5844f616faa13d2e90593a2c6a4f2b8997e87cd3
Author: Xuanwo <[email protected]>
AuthorDate: Fri Aug 9 19:19:15 2024 +0800

    refactor(core): Use kv based context to avoid allocations
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/logging.rs  | 1021 ++++++++++++++++++++++++++++---------------
 core/src/raw/operation.rs   |   17 +
 core/src/raw/tests/utils.rs |   68 +--
 3 files changed, 675 insertions(+), 431 deletions(-)

diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1dfdba85ef..277c9dfa0c 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -16,11 +16,8 @@
 // under the License.
 
 use std::fmt::Debug;
-use std::sync::atomic::AtomicU64;
-use std::sync::atomic::Ordering;
 use std::sync::Arc;
 
-use bytes::Buf;
 use futures::FutureExt;
 use futures::TryFutureExt;
 use log::log;
@@ -36,18 +33,12 @@ use crate::*;
 /// - OpenDAL will log in structural way.
 /// - Every operation will start with a `started` log entry.
 /// - Every operation will finish with the following status:
-///   - `finished`: the operation is successful.
-///   - `errored`: the operation returns an expected error like `NotFound`.
+///   - `succeeded`: the operation is successful, but might have more to take.
+///   - `finished`: the whole operation is finished.
 ///   - `failed`: the operation returns an unexpected error.
 /// - The default log level while expected error happened is `Warn`.
 /// - The default log level while unexpected failure happened is `Error`.
 ///
-/// # Todo
-///
-/// We should migrate to log's kv api after it's ready.
-///
-/// Tracking issue: <https://github.com/rust-lang/log/issues/328>
-///
 /// # Examples
 ///
 /// ```no_run
@@ -86,7 +77,7 @@ use crate::*;
 /// ```no_run
 /// use opendal::layers::LoggingInterceptor;
 /// use opendal::layers::LoggingLayer;
-/// use opendal::raw::Operation;
+/// use opendal::raw;
 /// use opendal::services;
 /// use opendal::Error;
 /// use opendal::Operator;
@@ -98,9 +89,9 @@ use crate::*;
 /// impl LoggingInterceptor for MyLoggingInterceptor {
 ///     fn log(
 ///         &self,
-///         scheme: Scheme,
-///         operation: Operation,
-///         context: &str,
+///         info: &raw::AccessorInfo,
+///         operation: raw::Operation,
+///         context: &[(&str, &str)],
 ///         message: &str,
 ///         err: Option<&Error>,
 ///     ) {
@@ -115,31 +106,21 @@ use crate::*;
 /// ```
 #[derive(Debug)]
 pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
-    notify: Arc<I>,
-}
-
-impl<I> Clone for LoggingLayer<I> {
-    fn clone(&self) -> Self {
-        Self {
-            notify: self.notify.clone(),
-        }
-    }
+    logger: I,
 }
 
 impl Default for LoggingLayer {
     fn default() -> Self {
         Self {
-            notify: Arc::new(DefaultLoggingInterceptor),
+            logger: DefaultLoggingInterceptor,
         }
     }
 }
 
 impl LoggingLayer {
     /// Create the layer with specific logging interceptor.
-    pub fn new<I: LoggingInterceptor>(notify: I) -> LoggingLayer<I> {
-        LoggingLayer {
-            notify: Arc::new(notify),
-        }
+    pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
+        LoggingLayer { logger }
     }
 }
 
@@ -147,59 +128,25 @@ impl<A: Access, I: LoggingInterceptor> Layer<A> for 
LoggingLayer<I> {
     type LayeredAccess = LoggingAccessor<A, I>;
 
     fn layer(&self, inner: A) -> Self::LayeredAccess {
-        let meta = inner.info();
+        let info = inner.info();
         LoggingAccessor {
             inner,
 
-            ctx: LoggingContext {
-                scheme: meta.scheme(),
-                notify: self.notify.clone(),
-            },
-        }
-    }
-}
-
-#[derive(Debug)]
-pub struct LoggingContext<I> {
-    scheme: Scheme,
-    notify: Arc<I>,
-}
-
-impl<I> Clone for LoggingContext<I> {
-    fn clone(&self) -> Self {
-        Self {
-            scheme: self.scheme,
-            notify: self.notify.clone(),
+            info,
+            logger: self.logger.clone(),
         }
     }
 }
 
-impl<I: LoggingInterceptor> LoggingContext<I> {
-    fn log(&self, operation: Operation, context: &str, message: &str, err: 
Option<&Error>) {
-        self.notify
-            .log(self.scheme, operation, context, message, err)
-    }
-
-    fn log_with_path(&self, operation: Operation, path: &str, message: &str, 
err: Option<&Error>) {
-        self.notify.log(
-            self.scheme,
-            operation,
-            &format!("path={path}"),
-            message,
-            err,
-        )
-    }
-}
-
 /// LoggingInterceptor is used to intercept the log.
-pub trait LoggingInterceptor: Debug + Send + Sync + 'static {
+pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
     /// Everytime there is a log, this function will be called.
     ///
     /// # Inputs
     ///
-    /// - scheme: The service generates the log.
+    /// - info: The service's access info.
     /// - operation: The operation to log.
-    /// - context: Additional context of the log.
+    /// - context: Additional context of the log like path, etc.
     /// - message: The log message.
     /// - err: The error to log.
     ///
@@ -210,81 +157,97 @@ pub trait LoggingInterceptor: Debug + Send + Sync + 
'static {
     /// could perform unexpectedly slow.
     fn log(
         &self,
-        scheme: Scheme,
+        info: &AccessorInfo,
         operation: Operation,
-        context: &str,
+        context: &[(&str, &str)],
         message: &str,
         err: Option<&Error>,
     );
 }
 
 /// The DefaultLoggingInterceptor will log the message by the standard logging 
macro.
-#[derive(Debug)]
+#[derive(Debug, Copy, Clone, Default)]
 pub struct DefaultLoggingInterceptor;
 
 impl LoggingInterceptor for DefaultLoggingInterceptor {
+    #[inline]
     fn log(
         &self,
-        scheme: Scheme,
+        info: &AccessorInfo,
         operation: Operation,
-        context: &str,
+        context: &[(&str, &str)],
         message: &str,
         err: Option<&Error>,
     ) {
-        let Some(err) = err else {
-            let lvl = self.operation_level(operation);
+        if let Some(err) = err {
+            // Print error if it's unexpected, otherwise in warn.
+            let lvl = if err.kind() == ErrorKind::Unexpected {
+                Level::Error
+            } else {
+                Level::Warn
+            };
+
             log!(
                 target: LOGGING_TARGET,
                 lvl,
-                "service={} operation={} {} -> {}",
-                scheme,
-                operation,
-                context,
-                message,
+                "service={} name={} {}: {operation} {message} {}",
+                info.scheme(),
+                info.name(),
+                format_args!(
+                    "{}",
+                    context.iter().enumerate().map(|(i, (k, v))| {
+                        if i > 0 {
+                            format!(" {}={}", k, v)
+                        } else {
+                            format!("{}={}", k, v)
+                        }
+                    }).collect::<String>()
+                ),
+                // Print error message with debug output while unexpected 
happened.
+                //
+                // It's super sad that we can't bind `format_args!()` here.
+                // See: https://github.com/rust-lang/rust/issues/92698
+                if err.kind() != ErrorKind::Unexpected {
+                   format!("{err}")
+                } else {
+                   format!("{err:?}")
+                }
             );
-            return;
+        }
+
+        // Print debug message if operation is oneshot, otherwise in trace.
+        let lvl = if operation.is_oneshot() {
+            Level::Debug
+        } else {
+            Level::Trace
         };
 
-        let lvl = self.error_level(err);
         log!(
             target: LOGGING_TARGET,
             lvl,
-            "service={} operation={} {} -> {} {}",
-            scheme,
-            operation,
-            context,
-            message,
-            err,
+            "service={} name={} {}: {operation} {message}",
+            info.scheme(),
+            info.name(),
+            format_args!(
+                "{}",
+                context.iter().enumerate().map(|(i, (k, v))| {
+                    if i > 0 {
+                        format!(" {}={}", k, v)
+                    } else {
+                        format!("{}={}", k, v)
+                    }
+                }).collect::<String>()
+            ),
         );
     }
 }
 
-impl DefaultLoggingInterceptor {
-    fn operation_level(&self, operation: Operation) -> Level {
-        match operation {
-            Operation::ReaderRead
-            | Operation::BlockingReaderRead
-            | Operation::WriterWrite
-            | Operation::BlockingWriterWrite => Level::Trace,
-            _ => Level::Debug,
-        }
-    }
-
-    #[inline]
-    fn error_level(&self, err: &Error) -> Level {
-        if err.kind() == ErrorKind::Unexpected {
-            Level::Error
-        } else {
-            Level::Warn
-        }
-    }
-}
-
 #[derive(Clone, Debug)]
 pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
     inner: A,
 
-    ctx: LoggingContext<I>,
+    info: Arc<AccessorInfo>,
+    logger: I,
 }
 
 static LOGGING_TARGET: &str = "opendal::services";
@@ -303,83 +266,128 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     }
 
     fn metadata(&self) -> Arc<AccessorInfo> {
-        self.ctx.log(Operation::Info, "", "started", None);
-        let result = self.inner.info();
-        self.ctx.log(
-            Operation::Info,
-            "",
-            &format!("finished: {:?}", result),
-            None,
-        );
+        self.logger
+            .log(&self.info, Operation::Info, &[], "started", None);
+
+        let info = self.info.clone();
+
+        self.logger
+            .log(&self.info, Operation::Info, &[], "finished", None);
 
-        result
+        info
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        self.ctx
-            .log_with_path(Operation::CreateDir, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::CreateDir,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .create_dir(path, args)
             .await
             .map(|v| {
-                self.ctx
-                    .log_with_path(Operation::CreateDir, path, "finished", 
None);
+                self.logger.log(
+                    &self.info,
+                    Operation::CreateDir,
+                    &[("path", path)],
+                    "finished",
+                    None,
+                );
                 v
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::CreateDir, path, "", Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::CreateDir,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        self.ctx
-            .log_with_path(Operation::Read, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::Read,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .read(path, args)
             .await
             .map(|(rp, r)| {
-                self.ctx
-                    .log_with_path(Operation::Read, path, "got reader", None);
+                self.logger.log(
+                    &self.info,
+                    Operation::Read,
+                    &[("path", path)],
+                    "created reader",
+                    None,
+                );
                 (
                     rp,
-                    LoggingReader::new(self.ctx.clone(), Operation::Read, 
path, r),
+                    LoggingReader::new(self.info.clone(), self.logger.clone(), 
path, r),
                 )
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::Read, path, "", Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::Read,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        self.ctx
-            .log_with_path(Operation::Write, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::Write,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .write(path, args)
             .await
             .map(|(rp, w)| {
-                self.ctx
-                    .log_with_path(Operation::Write, path, "start writing", 
None);
-                let w = LoggingWriter::new(self.ctx.clone(), Operation::Write, 
path, w);
+                self.logger.log(
+                    &self.info,
+                    Operation::Write,
+                    &[("path", path)],
+                    "created writer",
+                    None,
+                );
+                let w = LoggingWriter::new(self.info.clone(), 
self.logger.clone(), path, w);
                 (rp, w)
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::Write, path, "", Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::Write,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
-        self.ctx.log(
+        self.logger.log(
+            &self.info,
             Operation::Copy,
-            &format!("from={from} to={to}"),
+            &[("from", from), ("to", to)],
             "started",
             None,
         );
@@ -388,19 +396,21 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
             .copy(from, to, args)
             .await
             .map(|v| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::Copy,
-                    &format!("from={from} to={to}"),
+                    &[("from", from), ("to", to)],
                     "finished",
                     None,
                 );
                 v
             })
             .map_err(|err| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::Copy,
-                    &format!("from={from} to={to}"),
-                    "",
+                    &[("from", from), ("to", to)],
+                    "failed",
                     Some(&err),
                 );
                 err
@@ -408,9 +418,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     }
 
     async fn rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
-        self.ctx.log(
+        self.logger.log(
+            &self.info,
             Operation::Rename,
-            &format!("from={from} to={to}"),
+            &[("from", from), ("to", to)],
             "started",
             None,
         );
@@ -419,19 +430,21 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
             .rename(from, to, args)
             .await
             .map(|v| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::Rename,
-                    &format!("from={from} to={to}"),
+                    &[("from", from), ("to", to)],
                     "finished",
                     None,
                 );
                 v
             })
             .map_err(|err| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::Rename,
-                    &format!("from={from} to={to}"),
-                    "",
+                    &[("from", from), ("to", to)],
+                    "failed",
                     Some(&err),
                 );
                 err
@@ -439,56 +452,105 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        self.ctx
-            .log_with_path(Operation::Stat, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::Stat,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .stat(path, args)
             .await
             .map(|v| {
-                self.ctx
-                    .log_with_path(Operation::Stat, path, "finished", None);
+                self.logger.log(
+                    &self.info,
+                    Operation::Stat,
+                    &[("path", path)],
+                    "finished",
+                    None,
+                );
                 v
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::Stat, path, "", Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::Stat,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
-        self.ctx
-            .log_with_path(Operation::Delete, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::Delete,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .delete(path, args.clone())
             .inspect(|v| match v {
                 Ok(_) => {
-                    self.ctx
-                        .log_with_path(Operation::Delete, path, "finished", 
None);
+                    self.logger.log(
+                        &self.info,
+                        Operation::Delete,
+                        &[("path", path)],
+                        "finished",
+                        None,
+                    );
                 }
                 Err(err) => {
-                    self.ctx
-                        .log_with_path(Operation::Delete, path, "", Some(err));
+                    self.logger.log(
+                        &self.info,
+                        Operation::Delete,
+                        &[("path", path)],
+                        "failed",
+                        Some(err),
+                    );
                 }
             })
             .await
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        self.logger.log(
+            &self.info,
+            Operation::List,
+            &[("path", path)],
+            "started",
+            None,
+        );
+
         self.inner
             .list(path, args)
             .map(|v| match v {
                 Ok((rp, v)) => {
-                    self.ctx
-                        .log_with_path(Operation::List, path, "start listing 
dir", None);
-                    let streamer = LoggingLister::new(self.ctx.clone(), path, 
Operation::List, v);
+                    self.logger.log(
+                        &self.info,
+                        Operation::List,
+                        &[("path", path)],
+                        "created lister",
+                        None,
+                    );
+                    let streamer =
+                        LoggingLister::new(self.info.clone(), 
self.logger.clone(), path, v);
                     Ok((rp, streamer))
                 }
                 Err(err) => {
-                    self.ctx
-                        .log_with_path(Operation::List, path, "", Some(&err));
+                    self.logger.log(
+                        &self.info,
+                        Operation::List,
+                        &[("path", path)],
+                        "failed",
+                        Some(&err),
+                    );
                     Err(err)
                 }
             })
@@ -496,20 +558,35 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     }
 
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
-        self.ctx
-            .log_with_path(Operation::Presign, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::Presign,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .presign(path, args)
             .await
             .map(|v| {
-                self.ctx
-                    .log_with_path(Operation::Presign, path, "finished", None);
+                self.logger.log(
+                    &self.info,
+                    Operation::Presign,
+                    &[("path", path)],
+                    "finished",
+                    None,
+                );
                 v
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::Presign, path, "", Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::Presign,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
@@ -517,9 +594,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
         let (op, count) = (args.operation()[0].1.operation(), 
args.operation().len());
 
-        self.ctx.log(
+        self.logger.log(
+            &self.info,
             Operation::Batch,
-            &format!("op={op} count={count} -> started"),
+            &[("op", op.into_static()), ("count", &count.to_string())],
             "started",
             None,
         );
@@ -527,9 +605,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
         self.inner
             .batch(args)
             .map_ok(|v| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::Batch,
-                    &format!("op={op} count={count}"),
+                    &[("op", op.into_static()), ("count", &count.to_string())],
                     &format!(
                         "finished: {}, succeed: {}, failed: {}",
                         v.results().len(),
@@ -541,10 +620,11 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
                 v
             })
             .map_err(|err| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::Batch,
-                    &format!("op={op} count={count}"),
-                    "",
+                    &[("op", op.into_static()), ("count", &count.to_string())],
+                    "failed",
                     Some(&err),
                 );
                 err
@@ -553,65 +633,111 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     }
 
     fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        self.ctx
-            .log_with_path(Operation::BlockingCreateDir, path, "started", 
None);
+        self.logger.log(
+            &self.info,
+            Operation::BlockingCreateDir,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .blocking_create_dir(path, args)
             .map(|v| {
-                self.ctx
-                    .log_with_path(Operation::BlockingCreateDir, path, 
"finished", None);
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingCreateDir,
+                    &[("path", path)],
+                    "finished",
+                    None,
+                );
                 v
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::BlockingCreateDir, path, "", 
Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingCreateDir,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        self.ctx
-            .log_with_path(Operation::BlockingRead, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::BlockingRead,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .blocking_read(path, args.clone())
             .map(|(rp, r)| {
-                self.ctx
-                    .log_with_path(Operation::BlockingRead, path, "got 
reader", None);
-                let r = LoggingReader::new(self.ctx.clone(), 
Operation::BlockingRead, path, r);
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingRead,
+                    &[("path", path)],
+                    "created reader",
+                    None,
+                );
+                let r = LoggingReader::new(self.info.clone(), 
self.logger.clone(), path, r);
                 (rp, r)
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::BlockingRead, path, "", 
Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingRead,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        self.ctx
-            .log_with_path(Operation::BlockingWrite, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::BlockingWrite,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .blocking_write(path, args)
             .map(|(rp, w)| {
-                self.ctx
-                    .log_with_path(Operation::BlockingWrite, path, "start 
writing", None);
-                let w = LoggingWriter::new(self.ctx.clone(), 
Operation::BlockingWrite, path, w);
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingWrite,
+                    &[("path", path)],
+                    "created writer",
+                    None,
+                );
+                let w = LoggingWriter::new(self.info.clone(), 
self.logger.clone(), path, w);
                 (rp, w)
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::BlockingWrite, path, "", 
Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingWrite,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> 
Result<RpCopy> {
-        self.ctx.log(
+        self.logger.log(
+            &self.info,
             Operation::BlockingCopy,
-            &format!("from={from} to={to}"),
+            &[("from", from), ("to", to)],
             "started",
             None,
         );
@@ -619,18 +745,20 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
         self.inner
             .blocking_copy(from, to, args)
             .map(|v| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingCopy,
-                    &format!("from={from} to={to}"),
+                    &[("from", from), ("to", to)],
                     "finished",
                     None,
                 );
                 v
             })
             .map_err(|err| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingCopy,
-                    &format!("from={from} to={to}"),
+                    &[("from", from), ("to", to)],
                     "",
                     Some(&err),
                 );
@@ -639,9 +767,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     }
 
     fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
-        self.ctx.log(
+        self.logger.log(
+            &self.info,
             Operation::BlockingRename,
-            &format!("from={from} to={to}"),
+            &[("from", from), ("to", to)],
             "started",
             None,
         );
@@ -649,19 +778,21 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
         self.inner
             .blocking_rename(from, to, args)
             .map(|v| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingRename,
-                    &format!("from={from} to={to}"),
+                    &[("from", from), ("to", to)],
                     "finished",
                     None,
                 );
                 v
             })
             .map_err(|err| {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingRename,
-                    &format!("from={from} to={to}"),
-                    "",
+                    &[("from", from), ("to", to)],
+                    "failed",
                     Some(&err),
                 );
                 err
@@ -669,56 +800,101 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
     }
 
     fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        self.ctx
-            .log_with_path(Operation::BlockingStat, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::BlockingStat,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .blocking_stat(path, args)
             .map(|v| {
-                self.ctx
-                    .log_with_path(Operation::BlockingStat, path, "finished", 
None);
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingStat,
+                    &[("path", path)],
+                    "finished",
+                    None,
+                );
                 v
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::BlockingStat, path, "", 
Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingStat,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
-        self.ctx
-            .log_with_path(Operation::BlockingDelete, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::BlockingDelete,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .blocking_delete(path, args)
             .map(|v| {
-                self.ctx
-                    .log_with_path(Operation::BlockingDelete, path, 
"finished", None);
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingDelete,
+                    &[("path", path)],
+                    "finished",
+                    None,
+                );
                 v
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::BlockingDelete, path, "", 
Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingDelete,
+                    &[("path", path)],
+                    "failed",
+                    Some(&err),
+                );
                 err
             })
     }
 
     fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingLister)> {
-        self.ctx
-            .log_with_path(Operation::BlockingList, path, "started", None);
+        self.logger.log(
+            &self.info,
+            Operation::BlockingList,
+            &[("path", path)],
+            "started",
+            None,
+        );
 
         self.inner
             .blocking_list(path, args)
             .map(|(rp, v)| {
-                self.ctx
-                    .log_with_path(Operation::BlockingList, path, "got dir", 
None);
-                let li = LoggingLister::new(self.ctx.clone(), path, 
Operation::BlockingList, v);
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingList,
+                    &[("path", path)],
+                    "created lister",
+                    None,
+                );
+                let li = LoggingLister::new(self.info.clone(), 
self.logger.clone(), path, v);
                 (rp, li)
             })
             .map_err(|err| {
-                self.ctx
-                    .log_with_path(Operation::BlockingList, path, "", 
Some(&err));
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingList,
+                    &[("path", path)],
+                    "",
+                    Some(&err),
+                );
                 err
             })
     }
@@ -726,69 +902,63 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for 
LoggingAccessor<A, I> {
 
 /// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality.
 pub struct LoggingReader<R, I: LoggingInterceptor> {
-    ctx: LoggingContext<I>,
+    info: Arc<AccessorInfo>,
+    logger: I,
     path: String,
-    op: Operation,
 
-    read: AtomicU64,
+    read: u64,
     inner: R,
 }
 
 impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
-    fn new(ctx: LoggingContext<I>, op: Operation, path: &str, reader: R) -> 
Self {
+    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
         Self {
-            ctx,
-            op,
+            info,
+            logger,
             path: path.to_string(),
 
-            read: AtomicU64::new(0),
+            read: 0,
             inner: reader,
         }
     }
 }
 
-impl<R, I: LoggingInterceptor> Drop for LoggingReader<R, I> {
-    fn drop(&mut self) {
-        self.ctx.log(
-            self.op,
-            &format!(
-                "path={} read={}",
-                self.path,
-                self.read.load(Ordering::Relaxed)
-            ),
-            "data read finished",
+impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
+    async fn read(&mut self) -> Result<Buffer> {
+        self.logger.log(
+            &self.info,
+            Operation::ReaderRead,
+            &[("path", &self.path), ("read", &self.read.to_string())],
+            "started",
             None,
         );
-    }
-}
 
-impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
-    async fn read(&mut self) -> Result<Buffer> {
         match self.inner.read().await {
             Ok(bs) => {
-                self.read
-                    .fetch_add(bs.remaining() as u64, Ordering::Relaxed);
-                self.ctx.log(
+                self.read += bs.len() as u64;
+                self.logger.log(
+                    &self.info,
                     Operation::ReaderRead,
-                    &format!(
-                        "path={} read={}",
-                        self.path,
-                        self.read.load(Ordering::Relaxed),
-                    ),
-                    &format!("read returns {}B", bs.remaining()),
+                    &[
+                        ("path", &self.path),
+                        ("read", &self.read.to_string()),
+                        ("size", &bs.len().to_string()),
+                    ],
+                    if bs.is_empty() {
+                        "finished"
+                    } else {
+                        "succeeded"
+                    },
                     None,
                 );
                 Ok(bs)
             }
             Err(err) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::ReaderRead,
-                    &format!(
-                        "path={} read={}",
-                        self.path,
-                        self.read.load(Ordering::Relaxed)
-                    ),
-                    "read failed:",
+                    &[("path", &self.path), ("read", &self.read.to_string())],
+                    "failed",
                     Some(&err),
                 );
                 Err(err)
@@ -799,31 +969,40 @@ impl<R: oio::Read, I: LoggingInterceptor> oio::Read for 
LoggingReader<R, I> {
 
 impl<R: oio::BlockingRead, I: LoggingInterceptor> oio::BlockingRead for 
LoggingReader<R, I> {
     fn read(&mut self) -> Result<Buffer> {
+        self.logger.log(
+            &self.info,
+            Operation::BlockingReaderRead,
+            &[("path", &self.path), ("read", &self.read.to_string())],
+            "started",
+            None,
+        );
+
         match self.inner.read() {
             Ok(bs) => {
-                self.read
-                    .fetch_add(bs.remaining() as u64, Ordering::Relaxed);
-                self.ctx.log(
+                self.read += bs.len() as u64;
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingReaderRead,
-                    &format!(
-                        "path={} read={}",
-                        self.path,
-                        self.read.load(Ordering::Relaxed),
-                    ),
-                    &format!("read returns {}B", bs.remaining()),
+                    &[
+                        ("path", &self.path),
+                        ("read", &self.read.to_string()),
+                        ("size", &bs.len().to_string()),
+                    ],
+                    if bs.is_empty() {
+                        "finished"
+                    } else {
+                        "succeeded"
+                    },
                     None,
                 );
                 Ok(bs)
             }
             Err(err) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingReaderRead,
-                    &format!(
-                        "path={} read={}",
-                        self.path,
-                        self.read.load(Ordering::Relaxed)
-                    ),
-                    "read failed:",
+                    &[("path", &self.path), ("read", &self.read.to_string())],
+                    "failed",
                     Some(&err),
                 );
                 Err(err)
@@ -833,8 +1012,8 @@ impl<R: oio::BlockingRead, I: LoggingInterceptor> 
oio::BlockingRead for LoggingR
 }
 
 pub struct LoggingWriter<W, I> {
-    ctx: LoggingContext<I>,
-    op: Operation,
+    info: Arc<AccessorInfo>,
+    logger: I,
     path: String,
 
     written: u64,
@@ -842,10 +1021,10 @@ pub struct LoggingWriter<W, I> {
 }
 
 impl<W, I> LoggingWriter<W, I> {
-    fn new(ctx: LoggingContext<I>, op: Operation, path: &str, writer: W) -> 
Self {
+    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
         Self {
-            ctx,
-            op,
+            info,
+            logger,
             path: path.to_string(),
 
             written: 0,
@@ -857,21 +1036,45 @@ impl<W, I> LoggingWriter<W, I> {
 impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
     async fn write(&mut self, bs: Buffer) -> Result<()> {
         let size = bs.len();
+
+        self.logger.log(
+            &self.info,
+            Operation::WriterWrite,
+            &[
+                ("path", &self.path),
+                ("written", &self.written.to_string()),
+                ("size", &size.to_string()),
+            ],
+            "started",
+            None,
+        );
+
         match self.inner.write(bs).await {
             Ok(_) => {
-                self.ctx.log(
+                self.written += size as u64;
+                self.logger.log(
+                    &self.info,
                     Operation::WriterWrite,
-                    &format!("path={} written={}B", self.path, self.written),
-                    &format!("data write {}B", size),
+                    &[
+                        ("path", &self.path),
+                        ("written", &self.written.to_string()),
+                        ("size", &size.to_string()),
+                    ],
+                    "succeeded",
                     None,
                 );
                 Ok(())
             }
             Err(err) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::WriterWrite,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "data write failed:",
+                    &[
+                        ("path", &self.path),
+                        ("written", &self.written.to_string()),
+                        ("size", &size.to_string()),
+                    ],
+                    "failed",
                     Some(&err),
                 );
                 Err(err)
@@ -880,21 +1083,31 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write 
for LoggingWriter<W, I> {
     }
 
     async fn abort(&mut self) -> Result<()> {
+        self.logger.log(
+            &self.info,
+            Operation::WriterAbort,
+            &[("path", &self.path), ("written", &self.written.to_string())],
+            "started",
+            None,
+        );
+
         match self.inner.abort().await {
             Ok(_) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::WriterAbort,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "abort writer",
+                    &[("path", &self.path), ("written", 
&self.written.to_string())],
+                    "succeeded",
                     None,
                 );
                 Ok(())
             }
             Err(err) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::WriterAbort,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "abort writer failed:",
+                    &[("path", &self.path), ("written", 
&self.written.to_string())],
+                    "failed",
                     Some(&err),
                 );
                 Err(err)
@@ -903,21 +1116,31 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write 
for LoggingWriter<W, I> {
     }
 
     async fn close(&mut self) -> Result<()> {
+        self.logger.log(
+            &self.info,
+            Operation::WriterClose,
+            &[("path", &self.path), ("written", &self.written.to_string())],
+            "started",
+            None,
+        );
+
         match self.inner.close().await {
             Ok(_) => {
-                self.ctx.log(
-                    self.op,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "data written finished",
+                self.logger.log(
+                    &self.info,
+                    Operation::WriterClose,
+                    &[("path", &self.path), ("written", 
&self.written.to_string())],
+                    "succeeded",
                     None,
                 );
                 Ok(())
             }
             Err(err) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::WriterClose,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "data close failed:",
+                    &[("path", &self.path), ("written", 
&self.written.to_string())],
+                    "failed",
                     Some(&err),
                 );
                 Err(err)
@@ -928,21 +1151,45 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write 
for LoggingWriter<W, I> {
 
 impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for 
LoggingWriter<W, I> {
     fn write(&mut self, bs: Buffer) -> Result<()> {
-        match self.inner.write(bs.clone()) {
+        let size = bs.len();
+
+        self.logger.log(
+            &self.info,
+            Operation::BlockingWriterWrite,
+            &[
+                ("path", &self.path),
+                ("written", &self.written.to_string()),
+                ("size", &size.to_string()),
+            ],
+            "started",
+            None,
+        );
+
+        match self.inner.write(bs) {
             Ok(_) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingWriterWrite,
-                    &format!("path={} written={}B", self.path, self.written),
-                    &format!("data write {}B", bs.len()),
+                    &[
+                        ("path", &self.path),
+                        ("written", &self.written.to_string()),
+                        ("size", &size.to_string()),
+                    ],
+                    "succeeded",
                     None,
                 );
                 Ok(())
             }
             Err(err) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingWriterWrite,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "data write failed:",
+                    &[
+                        ("path", &self.path),
+                        ("written", &self.written.to_string()),
+                        ("size", &size.to_string()),
+                    ],
+                    "failed",
                     Some(&err),
                 );
                 Err(err)
@@ -951,21 +1198,31 @@ impl<W: oio::BlockingWrite, I: LoggingInterceptor> 
oio::BlockingWrite for Loggin
     }
 
     fn close(&mut self) -> Result<()> {
+        self.logger.log(
+            &self.info,
+            Operation::BlockingWriterClose,
+            &[("path", &self.path), ("written", &self.written.to_string())],
+            "started",
+            None,
+        );
+
         match self.inner.close() {
             Ok(_) => {
-                self.ctx.log(
-                    self.op,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "data written finished",
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingWriterWrite,
+                    &[("path", &self.path), ("written", 
&self.written.to_string())],
+                    "succeeded",
                     None,
                 );
                 Ok(())
             }
             Err(err) => {
-                self.ctx.log(
+                self.logger.log(
+                    &self.info,
                     Operation::BlockingWriterClose,
-                    &format!("path={} written={}B", self.path, self.written),
-                    "data close failed:",
+                    &[("path", &self.path), ("written", 
&self.written.to_string())],
+                    "failed",
                     Some(&err),
                 );
                 Err(err)
@@ -975,58 +1232,71 @@ impl<W: oio::BlockingWrite, I: LoggingInterceptor> 
oio::BlockingWrite for Loggin
 }
 
 pub struct LoggingLister<P, I: LoggingInterceptor> {
-    ctx: LoggingContext<I>,
+    info: Arc<AccessorInfo>,
+    logger: I,
     path: String,
-    op: Operation,
 
-    finished: bool,
+    listed: usize,
     inner: P,
 }
 
 impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
-    fn new(ctx: LoggingContext<I>, path: &str, op: Operation, inner: P) -> 
Self {
+    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
         Self {
-            ctx,
+            info,
+            logger,
             path: path.to_string(),
-            op,
-            finished: false,
-            inner,
-        }
-    }
-}
 
-impl<P, I: LoggingInterceptor> Drop for LoggingLister<P, I> {
-    fn drop(&mut self) {
-        if self.finished {
-            self.ctx
-                .log_with_path(self.op, &self.path, "all entries read 
finished", None);
-        } else {
-            self.ctx
-                .log_with_path(self.op, &self.path, "partial entries read 
finished", None);
+            listed: 0,
+            inner,
         }
     }
 }
 
 impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
     async fn next(&mut self) -> Result<Option<oio::Entry>> {
+        self.logger.log(
+            &self.info,
+            Operation::ListerNext,
+            &[("path", &self.path), ("listed", &self.listed.to_string())],
+            "started",
+            None,
+        );
+
         let res = self.inner.next().await;
 
         match &res {
             Ok(Some(de)) => {
-                self.ctx.log_with_path(
-                    self.op,
-                    &self.path,
-                    &format!("listed entry: {}", de.path()),
+                self.listed += 1;
+                self.logger.log(
+                    &self.info,
+                    Operation::ListerNext,
+                    &[
+                        ("path", &self.path),
+                        ("listed", &self.listed.to_string()),
+                        ("entry", de.path()),
+                    ],
+                    "succeeded",
                     None,
                 );
             }
             Ok(None) => {
-                self.ctx
-                    .log_with_path(self.op, &self.path, "finished", None);
-                self.finished = true;
+                self.logger.log(
+                    &self.info,
+                    Operation::ListerNext,
+                    &[("path", &self.path), ("listed", 
&self.listed.to_string())],
+                    "finished",
+                    None,
+                );
             }
             Err(err) => {
-                self.ctx.log_with_path(self.op, &self.path, "", Some(err));
+                self.logger.log(
+                    &self.info,
+                    Operation::ListerNext,
+                    &[("path", &self.path), ("listed", 
&self.listed.to_string())],
+                    "failed",
+                    Some(err),
+                );
             }
         };
 
@@ -1036,24 +1306,47 @@ impl<P: oio::List, I: LoggingInterceptor> oio::List for 
LoggingLister<P, I> {
 
 impl<P: oio::BlockingList, I: LoggingInterceptor> oio::BlockingList for 
LoggingLister<P, I> {
     fn next(&mut self) -> Result<Option<oio::Entry>> {
-        let res = self.inner.next();
+        self.logger.log(
+            &self.info,
+            Operation::BlockingListerNext,
+            &[("path", &self.path), ("listed", &self.listed.to_string())],
+            "started",
+            None,
+        );
 
+        let res = self.inner.next();
         match &res {
-            Ok(Some(des)) => {
-                self.ctx.log_with_path(
-                    self.op,
-                    &self.path,
-                    &format!("listed entry: {}", des.path()),
+            Ok(Some(de)) => {
+                self.listed += 1;
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingListerNext,
+                    &[
+                        ("path", &self.path),
+                        ("listed", &self.listed.to_string()),
+                        ("entry", de.path()),
+                    ],
+                    "succeeded",
                     None,
                 );
             }
             Ok(None) => {
-                self.ctx
-                    .log_with_path(self.op, &self.path, "finished", None);
-                self.finished = true;
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingListerNext,
+                    &[("path", &self.path), ("listed", 
&self.listed.to_string())],
+                    "finished",
+                    None,
+                );
             }
             Err(err) => {
-                self.ctx.log_with_path(self.op, &self.path, "", Some(err));
+                self.logger.log(
+                    &self.info,
+                    Operation::BlockingListerNext,
+                    &[("path", &self.path), ("listed", 
&self.listed.to_string())],
+                    "failed",
+                    Some(err),
+                );
             }
         };
 
diff --git a/core/src/raw/operation.rs b/core/src/raw/operation.rs
index d195cb57ac..c5ef760474 100644
--- a/core/src/raw/operation.rs
+++ b/core/src/raw/operation.rs
@@ -86,6 +86,23 @@ impl Operation {
     pub fn into_static(self) -> &'static str {
         self.into()
     }
+
+    /// Check if given operation is oneshot or not.
+    ///
+    /// For example, `Stat` is oneshot but `ReaderRead` could happen multiple 
times.
+    ///
+    /// This function can be used to decide take actions based on operations 
like logging.
+    pub fn is_oneshot(&self) -> bool {
+        !matches!(
+            self,
+            Operation::ReaderRead
+                | Operation::WriterWrite
+                | Operation::ListerNext
+                | Operation::BlockingReaderRead
+                | Operation::BlockingWriterWrite
+                | Operation::BlockingListerNext
+        )
+    }
 }
 
 impl Display for Operation {
diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs
index 1854576428..808788f710 100644
--- a/core/src/raw/tests/utils.rs
+++ b/core/src/raw/tests/utils.rs
@@ -19,11 +19,8 @@ use std::collections::HashMap;
 use std::env;
 use std::str::FromStr;
 
-use log::{log, Level};
 use once_cell::sync::Lazy;
 
-use crate::layers::LoggingInterceptor;
-use crate::raw::Operation;
 use crate::*;
 
 /// TEST_RUNTIME is the runtime used for running tests.
@@ -76,7 +73,7 @@ pub fn init_test_service() -> Result<Option<Operator>> {
     let op = { op.layer(layers::ChaosLayer::new(0.1)) };
 
     let mut op = op
-        .layer(layers::LoggingLayer::new(BacktraceLoggingInterceptor))
+        .layer(layers::LoggingLayer::default())
         .layer(layers::TimeoutLayer::new())
         .layer(layers::RetryLayer::new().with_max_times(4));
 
@@ -91,66 +88,3 @@ pub fn init_test_service() -> Result<Option<Operator>> {
 
     Ok(Some(op))
 }
-
-/// A logging interceptor that logs the backtrace.
-#[derive(Debug, Clone)]
-struct BacktraceLoggingInterceptor;
-
-impl LoggingInterceptor for BacktraceLoggingInterceptor {
-    fn log(
-        &self,
-        scheme: Scheme,
-        operation: raw::Operation,
-        context: &str,
-        message: &str,
-        err: Option<&Error>,
-    ) {
-        let Some(err) = err else {
-            let lvl = self.operation_level(operation);
-            log!(
-                target: "opendal::services",
-                lvl,
-                "service={} operation={} {} -> {}",
-                scheme,
-                operation,
-                context,
-                message,
-            );
-            return;
-        };
-
-        let err_msg = if err.kind() != ErrorKind::Unexpected {
-            format!("{err}")
-        } else {
-            format!("{err:?}")
-        };
-        let lvl = if err.kind() == ErrorKind::Unexpected {
-            Level::Error
-        } else {
-            Level::Warn
-        };
-
-        log!(
-            target: "opendal::services",
-            lvl,
-            "service={} operation={} {} -> {} {}",
-            scheme,
-            operation,
-            context,
-            message,
-            err_msg,
-        );
-    }
-}
-
-impl BacktraceLoggingInterceptor {
-    fn operation_level(&self, operation: Operation) -> Level {
-        match operation {
-            Operation::ReaderRead
-            | Operation::BlockingReaderRead
-            | Operation::WriterWrite
-            | Operation::BlockingWriterWrite => Level::Trace,
-            _ => Level::Debug,
-        }
-    }
-}

Reply via email to