This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c5cc990780 feat(core)!: implement an interceptor for the logging layer
(#4961)
c5cc990780 is described below
commit c5cc990780b13cc328e01fd2924baa11bc7999dd
Author: Yingwen <[email protected]>
AuthorDate: Fri Aug 9 17:03:38 2024 +0800
feat(core)!: implement an interceptor for the logging layer (#4961)
* feat: init
* feat: accessor log
* feat: more logs
* feat: compile
* feat: use interceptor
* feat: use ctx to log
* feat: avoid formatting err first
* style: fix clippy
* chore: update comments
* refactor: pass operation
* feat: add context
* feat: remove some apis from DefaultLoggingLayer
* refactor: remove path
* docs: fix example
* test: fix haskell test
---
bindings/haskell/test/BasicTest.hs | 2 +-
core/src/layers/logging.rs | 1216 ++++++++++++++----------------------
core/src/layers/mod.rs | 1 +
core/src/raw/tests/utils.rs | 68 +-
4 files changed, 526 insertions(+), 761 deletions(-)
diff --git a/bindings/haskell/test/BasicTest.hs
b/bindings/haskell/test/BasicTest.hs
index 1311f9a05f..6c817d046e 100644
--- a/bindings/haskell/test/BasicTest.hs
+++ b/bindings/haskell/test/BasicTest.hs
@@ -109,7 +109,7 @@ testLogger = do
let logFn = LogAction $ logger state
Right _ <- newOperator "memory" {ocLogAction = Just logFn}
logStr <- readIORef state
- T.take 77 logStr @?= "service=memory operation=metadata ->
startedservice=memory operation=metadata"
+ T.take 78 logStr @?= "service=memory operation=metadata ->
startedservice=memory operation=metadata"
-- helper function
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 40294204f6..1dfdba85ef 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -23,9 +23,7 @@ use std::sync::Arc;
use bytes::Buf;
use futures::FutureExt;
use futures::TryFutureExt;
-use log::debug;
use log::log;
-use log::trace;
use log::Level;
use crate::raw::*;
@@ -80,74 +78,73 @@ use crate::*;
/// ```shell
/// RUST_LOG="info,opendal::services=debug" ./app
/// ```
-#[derive(Debug, Copy, Clone)]
-pub struct LoggingLayer {
- error_level: Option<Level>,
- failure_level: Option<Level>,
- backtrace_output: bool,
+///
+/// # Logging Interceptor
+///
+/// You can implement your own logging interceptor to customize the logging
behavior.
+///
+/// ```no_run
+/// use opendal::layers::LoggingInterceptor;
+/// use opendal::layers::LoggingLayer;
+/// use opendal::raw::Operation;
+/// use opendal::services;
+/// use opendal::Error;
+/// use opendal::Operator;
+/// use opendal::Scheme;
+///
+/// #[derive(Debug, Clone)]
+/// struct MyLoggingInterceptor;
+///
+/// impl LoggingInterceptor for MyLoggingInterceptor {
+/// fn log(
+/// &self,
+/// scheme: Scheme,
+/// operation: Operation,
+/// context: &str,
+/// message: &str,
+/// err: Option<&Error>,
+/// ) {
+/// // log something
+/// }
+/// }
+///
+/// let _ = Operator::new(services::Memory::default())
+/// .expect("must init")
+/// .layer(LoggingLayer::new(MyLoggingInterceptor))
+/// .finish();
+/// ```
+#[derive(Debug)]
+pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
+ notify: Arc<I>,
}
-impl Default for LoggingLayer {
- fn default() -> Self {
+impl<I> Clone for LoggingLayer<I> {
+ fn clone(&self) -> Self {
Self {
- error_level: Some(Level::Warn),
- failure_level: Some(Level::Error),
- backtrace_output: false,
+ notify: self.notify.clone(),
}
}
}
-impl LoggingLayer {
- /// Setting the log level while expected error happened.
- ///
- /// For example: accessor returns NotFound.
- ///
- /// `None` means disable the log for error.
- pub fn with_error_level(mut self, level: Option<&str>) -> Result<Self> {
- if let Some(level_str) = level {
- let level = level_str.parse().map_err(|_| {
- Error::new(ErrorKind::ConfigInvalid, "invalid log level")
- .with_context("level", level_str)
- })?;
- self.error_level = Some(level);
- } else {
- self.error_level = None;
+impl Default for LoggingLayer {
+ fn default() -> Self {
+ Self {
+ notify: Arc::new(DefaultLoggingInterceptor),
}
- Ok(self)
}
+}
- /// Setting the log level while unexpected failure happened.
- ///
- /// For example: accessor returns Unexpected network error.
- ///
- /// `None` means disable the log for failure.
- pub fn with_failure_level(mut self, level: Option<&str>) -> Result<Self> {
- if let Some(level_str) = level {
- let level = level_str.parse().map_err(|_| {
- Error::new(ErrorKind::ConfigInvalid, "invalid log level")
- .with_context("level", level_str)
- })?;
- self.failure_level = Some(level);
- } else {
- self.failure_level = None;
+impl LoggingLayer {
+ /// Create the layer with specific logging interceptor.
+ pub fn new<I: LoggingInterceptor>(notify: I) -> LoggingLayer<I> {
+ LoggingLayer {
+ notify: Arc::new(notify),
}
- Ok(self)
- }
-
- /// Setting whether to output backtrace while unexpected failure happened.
- ///
- /// # Notes
- ///
- /// - When the error is an expected error, backtrace will not be output.
- /// - backtrace output is disable by default.
- pub fn with_backtrace_output(mut self, enable: bool) -> Self {
- self.backtrace_output = enable;
- self
}
}
-impl<A: Access> Layer<A> for LoggingLayer {
- type LayeredAccess = LoggingAccessor<A>;
+impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
+ type LayeredAccess = LoggingAccessor<A, I>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
let meta = inner.info();
@@ -156,400 +153,342 @@ impl<A: Access> Layer<A> for LoggingLayer {
ctx: LoggingContext {
scheme: meta.scheme(),
- error_level: self.error_level,
- failure_level: self.failure_level,
- backtrace_output: self.backtrace_output,
+ notify: self.notify.clone(),
},
}
}
}
-#[derive(Clone, Debug)]
-pub struct LoggingContext {
+#[derive(Debug)]
+pub struct LoggingContext<I> {
scheme: Scheme,
- error_level: Option<Level>,
- failure_level: Option<Level>,
- backtrace_output: bool,
+ notify: Arc<I>,
}
-impl LoggingContext {
- #[inline]
- fn error_level(&self, err: &Error) -> Option<Level> {
- if err.kind() == ErrorKind::Unexpected {
- self.failure_level
- } else {
- self.error_level
+impl<I> Clone for LoggingContext<I> {
+ fn clone(&self) -> Self {
+ Self {
+ scheme: self.scheme,
+ notify: self.notify.clone(),
}
}
+}
- /// Print error with backtrace if it's unexpected error.
- #[inline]
- fn error_print(&self, err: &Error) -> String {
- // Don't print backtrace if it's not unexpected error.
- if err.kind() != ErrorKind::Unexpected {
- return format!("{err}");
+impl<I: LoggingInterceptor> LoggingContext<I> {
+ fn log(&self, operation: Operation, context: &str, message: &str, err:
Option<&Error>) {
+ self.notify
+ .log(self.scheme, operation, context, message, err)
+ }
+
+ fn log_with_path(&self, operation: Operation, path: &str, message: &str,
err: Option<&Error>) {
+ self.notify.log(
+ self.scheme,
+ operation,
+ &format!("path={path}"),
+ message,
+ err,
+ )
+ }
+}
+
+/// LoggingInterceptor is used to intercept the log.
+pub trait LoggingInterceptor: Debug + Send + Sync + 'static {
+ /// Everytime there is a log, this function will be called.
+ ///
+ /// # Inputs
+ ///
+ /// - scheme: The service generates the log.
+ /// - operation: The operation to log.
+ /// - context: Additional context of the log.
+ /// - message: The log message.
+ /// - err: The error to log.
+ ///
+ /// # Note
+ ///
+ /// Users should avoid calling resource-intensive operations such as I/O
or network
+ /// functions here, especially anything that takes longer than 10ms.
Otherwise, Opendal
+ /// could perform unexpectedly slow.
+ fn log(
+ &self,
+ scheme: Scheme,
+ operation: Operation,
+ context: &str,
+ message: &str,
+ err: Option<&Error>,
+ );
+}
+
+/// The DefaultLoggingInterceptor will log the message by the standard logging
macro.
+#[derive(Debug)]
+pub struct DefaultLoggingInterceptor;
+
+impl LoggingInterceptor for DefaultLoggingInterceptor {
+ fn log(
+ &self,
+ scheme: Scheme,
+ operation: Operation,
+ context: &str,
+ message: &str,
+ err: Option<&Error>,
+ ) {
+ let Some(err) = err else {
+ let lvl = self.operation_level(operation);
+ log!(
+ target: LOGGING_TARGET,
+ lvl,
+ "service={} operation={} {} -> {}",
+ scheme,
+ operation,
+ context,
+ message,
+ );
+ return;
+ };
+
+ let lvl = self.error_level(err);
+ log!(
+ target: LOGGING_TARGET,
+ lvl,
+ "service={} operation={} {} -> {} {}",
+ scheme,
+ operation,
+ context,
+ message,
+ err,
+ );
+ }
+}
+
+impl DefaultLoggingInterceptor {
+ fn operation_level(&self, operation: Operation) -> Level {
+ match operation {
+ Operation::ReaderRead
+ | Operation::BlockingReaderRead
+ | Operation::WriterWrite
+ | Operation::BlockingWriterWrite => Level::Trace,
+ _ => Level::Debug,
}
+ }
- if self.backtrace_output {
- format!("{err:?}")
+ #[inline]
+ fn error_level(&self, err: &Error) -> Level {
+ if err.kind() == ErrorKind::Unexpected {
+ Level::Error
} else {
- format!("{err}")
+ Level::Warn
}
}
}
#[derive(Clone, Debug)]
-pub struct LoggingAccessor<A: Access> {
+pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
inner: A,
- ctx: LoggingContext,
+ ctx: LoggingContext<I>,
}
static LOGGING_TARGET: &str = "opendal::services";
-impl<A: Access> LayeredAccess for LoggingAccessor<A> {
+impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I>
{
type Inner = A;
- type Reader = LoggingReader<A::Reader>;
- type BlockingReader = LoggingReader<A::BlockingReader>;
- type Writer = LoggingWriter<A::Writer>;
- type BlockingWriter = LoggingWriter<A::BlockingWriter>;
- type Lister = LoggingLister<A::Lister>;
- type BlockingLister = LoggingLister<A::BlockingLister>;
+ type Reader = LoggingReader<A::Reader, I>;
+ type BlockingReader = LoggingReader<A::BlockingReader, I>;
+ type Writer = LoggingWriter<A::Writer, I>;
+ type BlockingWriter = LoggingWriter<A::BlockingWriter, I>;
+ type Lister = LoggingLister<A::Lister, I>;
+ type BlockingLister = LoggingLister<A::BlockingLister, I>;
fn inner(&self) -> &Self::Inner {
&self.inner
}
fn metadata(&self) -> Arc<AccessorInfo> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} -> started",
- self.ctx.scheme,
- Operation::Info
- );
+ self.ctx.log(Operation::Info, "", "started", None);
let result = self.inner.info();
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} -> finished: {:?}",
- self.ctx.scheme,
+ self.ctx.log(
Operation::Info,
- result
+ "",
+ &format!("finished: {:?}", result),
+ None,
);
result
}
async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::CreateDir,
- path
- );
+ self.ctx
+ .log_with_path(Operation::CreateDir, path, "started", None);
self.inner
.create_dir(path, args)
.await
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished",
- self.ctx.scheme,
- Operation::CreateDir,
- path
- );
+ self.ctx
+ .log_with_path(Operation::CreateDir, path, "finished",
None);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::CreateDir,
- path,
- self.ctx.error_print(&err)
- )
- };
+ self.ctx
+ .log_with_path(Operation::CreateDir, path, "", Some(&err));
err
})
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::Read,
- path,
- );
+ self.ctx
+ .log_with_path(Operation::Read, path, "started", None);
self.inner
.read(path, args)
.await
.map(|(rp, r)| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> got reader",
- self.ctx.scheme,
- Operation::Read,
- path,
- );
+ self.ctx
+ .log_with_path(Operation::Read, path, "got reader", None);
(
rp,
LoggingReader::new(self.ctx.clone(), Operation::Read,
path, r),
)
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::Read,
- path,
- self.ctx.error_print(&err)
- )
- }
+ self.ctx
+ .log_with_path(Operation::Read, path, "", Some(&err));
err
})
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::Write,
- path
- );
+ self.ctx
+ .log_with_path(Operation::Write, path, "started", None);
self.inner
.write(path, args)
.await
.map(|(rp, w)| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> start writing",
- self.ctx.scheme,
- Operation::Write,
- path,
- );
+ self.ctx
+ .log_with_path(Operation::Write, path, "start writing",
None);
let w = LoggingWriter::new(self.ctx.clone(), Operation::Write,
path, w);
(rp, w)
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::Write,
- path,
- self.ctx.error_print(&err)
- )
- };
+ self.ctx
+ .log_with_path(Operation::Write, path, "", Some(&err));
err
})
}
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> started",
- self.ctx.scheme,
+ self.ctx.log(
Operation::Copy,
- from,
- to
+ &format!("from={from} to={to}"),
+ "started",
+ None,
);
self.inner
.copy(from, to, args)
.await
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> finished",
- self.ctx.scheme,
+ self.ctx.log(
Operation::Copy,
- from,
- to
+ &format!("from={from} to={to}"),
+ "finished",
+ None,
);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} from={} to={} -> {}",
- self.ctx.scheme,
- Operation::Copy,
- from,
- to,
- self.ctx.error_print(&err),
- )
- };
+ self.ctx.log(
+ Operation::Copy,
+ &format!("from={from} to={to}"),
+ "",
+ Some(&err),
+ );
err
})
}
async fn rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> started",
- self.ctx.scheme,
+ self.ctx.log(
Operation::Rename,
- from,
- to
+ &format!("from={from} to={to}"),
+ "started",
+ None,
);
self.inner
.rename(from, to, args)
.await
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> finished",
- self.ctx.scheme,
+ self.ctx.log(
Operation::Rename,
- from,
- to
+ &format!("from={from} to={to}"),
+ "finished",
+ None,
);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} from={} to={} -> {}",
- self.ctx.scheme,
- Operation::Rename,
- from,
- to,
- self.ctx.error_print(&err)
- )
- };
+ self.ctx.log(
+ Operation::Rename,
+ &format!("from={from} to={to}"),
+ "",
+ Some(&err),
+ );
err
})
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::Stat,
- path
- );
+ self.ctx
+ .log_with_path(Operation::Stat, path, "started", None);
self.inner
.stat(path, args)
.await
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished: {v:?}",
- self.ctx.scheme,
- Operation::Stat,
- path
- );
+ self.ctx
+ .log_with_path(Operation::Stat, path, "finished", None);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::Stat,
- path,
- self.ctx.error_print(&err)
- );
- };
+ self.ctx
+ .log_with_path(Operation::Stat, path, "", Some(&err));
err
})
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::Delete,
- path
- );
+ self.ctx
+ .log_with_path(Operation::Delete, path, "started", None);
self.inner
.delete(path, args.clone())
.inspect(|v| match v {
Ok(_) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished",
- self.ctx.scheme,
- Operation::Delete,
- path
- );
+ self.ctx
+ .log_with_path(Operation::Delete, path, "finished",
None);
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::Delete,
- path,
- self.ctx.error_print(err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::Delete, path, "", Some(err));
}
})
.await
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::List,
- path
- );
-
self.inner
.list(path, args)
.map(|v| match v {
Ok((rp, v)) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> start listing dir",
- self.ctx.scheme,
- Operation::List,
- path
- );
+ self.ctx
+ .log_with_path(Operation::List, path, "start listing
dir", None);
let streamer = LoggingLister::new(self.ctx.clone(), path,
Operation::List, v);
Ok((rp, streamer))
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::List,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::List, path, "", Some(&err));
Err(err)
}
})
@@ -557,39 +496,20 @@ impl<A: Access> LayeredAccess for LoggingAccessor<A> {
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::Presign,
- path
- );
+ self.ctx
+ .log_with_path(Operation::Presign, path, "started", None);
self.inner
.presign(path, args)
.await
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished: {v:?}",
- self.ctx.scheme,
- Operation::Presign,
- path
- );
+ self.ctx
+ .log_with_path(Operation::Presign, path, "finished", None);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::Presign,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::Presign, path, "", Some(&err));
err
})
}
@@ -597,352 +517,216 @@ impl<A: Access> LayeredAccess for LoggingAccessor<A> {
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let (op, count) = (args.operation()[0].1.operation(),
args.operation().len());
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={}-{op} count={count} -> started",
- self.ctx.scheme,
+ self.ctx.log(
Operation::Batch,
+ &format!("op={op} count={count} -> started"),
+ "started",
+ None,
);
self.inner
.batch(args)
.map_ok(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={}-{op} count={count} -> finished:
{}, succeed: {}, failed: {}",
- self.ctx.scheme,
+ self.ctx.log(
Operation::Batch,
- v.results().len(),
- v.results().iter().filter(|(_, v)|v.is_ok()).count(),
- v.results().iter().filter(|(_, v)|v.is_err()).count(),
+ &format!("op={op} count={count}"),
+ &format!(
+ "finished: {}, succeed: {}, failed: {}",
+ v.results().len(),
+ v.results().iter().filter(|(_, v)| v.is_ok()).count(),
+ v.results().iter().filter(|(_, v)| v.is_err()).count(),
+ ),
+ None,
);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={}-{op} count={count} -> {}",
- self.ctx.scheme,
- Operation::Batch,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx.log(
+ Operation::Batch,
+ &format!("op={op} count={count}"),
+ "",
+ Some(&err),
+ );
err
})
.await
}
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::BlockingCreateDir,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingCreateDir, path, "started",
None);
self.inner
.blocking_create_dir(path, args)
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished",
- self.ctx.scheme,
- Operation::BlockingCreateDir,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingCreateDir, path,
"finished", None);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::BlockingCreateDir,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::BlockingCreateDir, path, "",
Some(&err));
err
})
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::BlockingRead,
- path,
- );
+ self.ctx
+ .log_with_path(Operation::BlockingRead, path, "started", None);
self.inner
.blocking_read(path, args.clone())
.map(|(rp, r)| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> got reader",
- self.ctx.scheme,
- Operation::BlockingRead,
- path,
- );
+ self.ctx
+ .log_with_path(Operation::BlockingRead, path, "got
reader", None);
let r = LoggingReader::new(self.ctx.clone(),
Operation::BlockingRead, path, r);
(rp, r)
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::BlockingRead,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::BlockingRead, path, "",
Some(&err));
err
})
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::BlockingWrite,
- path,
- );
+ self.ctx
+ .log_with_path(Operation::BlockingWrite, path, "started", None);
self.inner
.blocking_write(path, args)
.map(|(rp, w)| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> start writing",
- self.ctx.scheme,
- Operation::BlockingWrite,
- path,
- );
+ self.ctx
+ .log_with_path(Operation::BlockingWrite, path, "start
writing", None);
let w = LoggingWriter::new(self.ctx.clone(),
Operation::BlockingWrite, path, w);
(rp, w)
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::BlockingWrite,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::BlockingWrite, path, "",
Some(&err));
err
})
}
fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) ->
Result<RpCopy> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> started",
- self.ctx.scheme,
+ self.ctx.log(
Operation::BlockingCopy,
- from,
- to,
+ &format!("from={from} to={to}"),
+ "started",
+ None,
);
self.inner
.blocking_copy(from, to, args)
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> finished",
- self.ctx.scheme,
+ self.ctx.log(
Operation::BlockingCopy,
- from,
- to,
+ &format!("from={from} to={to}"),
+ "finished",
+ None,
);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} from={} to={} -> {}",
- self.ctx.scheme,
- Operation::BlockingCopy,
- from,
- to,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx.log(
+ Operation::BlockingCopy,
+ &format!("from={from} to={to}"),
+ "",
+ Some(&err),
+ );
err
})
}
fn blocking_rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> started",
- self.ctx.scheme,
+ self.ctx.log(
Operation::BlockingRename,
- from,
- to,
+ &format!("from={from} to={to}"),
+ "started",
+ None,
);
self.inner
.blocking_rename(from, to, args)
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} from={} to={} -> finished",
- self.ctx.scheme,
+ self.ctx.log(
Operation::BlockingRename,
- from,
- to,
+ &format!("from={from} to={to}"),
+ "finished",
+ None,
);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} from={} to={} -> {}",
- self.ctx.scheme,
- Operation::BlockingRename,
- from,
- to,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx.log(
+ Operation::BlockingRename,
+ &format!("from={from} to={to}"),
+ "",
+ Some(&err),
+ );
err
})
}
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::BlockingStat,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingStat, path, "started", None);
self.inner
.blocking_stat(path, args)
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished: {v:?}",
- self.ctx.scheme,
- Operation::BlockingStat,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingStat, path, "finished",
None);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::BlockingStat,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::BlockingStat, path, "",
Some(&err));
err
})
}
fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::BlockingDelete,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingDelete, path, "started", None);
self.inner
.blocking_delete(path, args)
.map(|v| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished",
- self.ctx.scheme,
- Operation::BlockingDelete,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingDelete, path,
"finished", None);
v
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::BlockingDelete,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::BlockingDelete, path, "",
Some(&err));
err
})
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingLister)> {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> started",
- self.ctx.scheme,
- Operation::BlockingList,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingList, path, "started", None);
self.inner
.blocking_list(path, args)
.map(|(rp, v)| {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> got dir",
- self.ctx.scheme,
- Operation::BlockingList,
- path
- );
+ self.ctx
+ .log_with_path(Operation::BlockingList, path, "got dir",
None);
let li = LoggingLister::new(self.ctx.clone(), path,
Operation::BlockingList, v);
(rp, li)
})
.map_err(|err| {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- Operation::BlockingList,
- path,
- self.ctx.error_print(&err)
- );
- }
+ self.ctx
+ .log_with_path(Operation::BlockingList, path, "",
Some(&err));
err
})
}
}
/// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality.
-pub struct LoggingReader<R> {
- ctx: LoggingContext,
+pub struct LoggingReader<R, I: LoggingInterceptor> {
+ ctx: LoggingContext<I>,
path: String,
op: Operation,
@@ -950,8 +734,8 @@ pub struct LoggingReader<R> {
inner: R,
}
-impl<R> LoggingReader<R> {
- fn new(ctx: LoggingContext, op: Operation, path: &str, reader: R) -> Self {
+impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
+ fn new(ctx: LoggingContext<I>, op: Operation, path: &str, reader: R) ->
Self {
Self {
ctx,
op,
@@ -963,93 +747,93 @@ impl<R> LoggingReader<R> {
}
}
-impl<R> Drop for LoggingReader<R> {
+impl<R, I: LoggingInterceptor> Drop for LoggingReader<R, I> {
fn drop(&mut self) {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} read={} -> data read finished",
- self.ctx.scheme,
+ self.ctx.log(
self.op,
- self.path,
- self.read.load(Ordering::Relaxed)
+ &format!(
+ "path={} read={}",
+ self.path,
+ self.read.load(Ordering::Relaxed)
+ ),
+ "data read finished",
+ None,
);
}
}
-impl<R: oio::Read> oio::Read for LoggingReader<R> {
+impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
async fn read(&mut self) -> Result<Buffer> {
match self.inner.read().await {
Ok(bs) => {
self.read
.fetch_add(bs.remaining() as u64, Ordering::Relaxed);
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} read={} -> read returns
{}B",
- self.ctx.scheme,
+ self.ctx.log(
Operation::ReaderRead,
- self.path,
- self.read.load(Ordering::Relaxed),
- bs.remaining()
+ &format!(
+ "path={} read={}",
+ self.path,
+ self.read.load(Ordering::Relaxed),
+ ),
+ &format!("read returns {}B", bs.remaining()),
+ None,
);
Ok(bs)
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} read={} -> read
failed: {}",
- self.ctx.scheme,
- Operation::ReaderRead,
+ self.ctx.log(
+ Operation::ReaderRead,
+ &format!(
+ "path={} read={}",
self.path,
- self.read.load(Ordering::Relaxed),
- self.ctx.error_print(&err),
- )
- }
+ self.read.load(Ordering::Relaxed)
+ ),
+ "read failed:",
+ Some(&err),
+ );
Err(err)
}
}
}
}
-impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
+impl<R: oio::BlockingRead, I: LoggingInterceptor> oio::BlockingRead for
LoggingReader<R, I> {
fn read(&mut self) -> Result<Buffer> {
match self.inner.read() {
Ok(bs) => {
self.read
.fetch_add(bs.remaining() as u64, Ordering::Relaxed);
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} read={} -> read returns
{}B",
- self.ctx.scheme,
+ self.ctx.log(
Operation::BlockingReaderRead,
- self.path,
- self.read.load(Ordering::Relaxed),
- bs.remaining()
+ &format!(
+ "path={} read={}",
+ self.path,
+ self.read.load(Ordering::Relaxed),
+ ),
+ &format!("read returns {}B", bs.remaining()),
+ None,
);
Ok(bs)
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} read={} -> read
failed: {}",
- self.ctx.scheme,
- Operation::BlockingReaderRead,
+ self.ctx.log(
+ Operation::BlockingReaderRead,
+ &format!(
+ "path={} read={}",
self.path,
- self.read.load(Ordering::Relaxed),
- self.ctx.error_print(&err),
- );
- }
+ self.read.load(Ordering::Relaxed)
+ ),
+ "read failed:",
+ Some(&err),
+ );
Err(err)
}
}
}
}
-pub struct LoggingWriter<W> {
- ctx: LoggingContext,
+pub struct LoggingWriter<W, I> {
+ ctx: LoggingContext<I>,
op: Operation,
path: String,
@@ -1057,8 +841,8 @@ pub struct LoggingWriter<W> {
inner: W,
}
-impl<W> LoggingWriter<W> {
- fn new(ctx: LoggingContext, op: Operation, path: &str, writer: W) -> Self {
+impl<W, I> LoggingWriter<W, I> {
+ fn new(ctx: LoggingContext<I>, op: Operation, path: &str, writer: W) ->
Self {
Self {
ctx,
op,
@@ -1070,35 +854,26 @@ impl<W> LoggingWriter<W> {
}
}
-impl<W: oio::Write> oio::Write for LoggingWriter<W> {
+impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
match self.inner.write(bs).await {
Ok(_) => {
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={}B -> data write
{}B",
- self.ctx.scheme,
+ self.ctx.log(
Operation::WriterWrite,
- self.path,
- self.written,
- size,
+ &format!("path={} written={}B", self.path, self.written),
+ &format!("data write {}B", size),
+ None,
);
Ok(())
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={}B -> data
write failed: {}",
- self.ctx.scheme,
- Operation::WriterWrite,
- self.path,
- self.written,
- self.ctx.error_print(&err),
- )
- }
+ self.ctx.log(
+ Operation::WriterWrite,
+ &format!("path={} written={}B", self.path, self.written),
+ "data write failed:",
+ Some(&err),
+ );
Err(err)
}
}
@@ -1107,29 +882,21 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn abort(&mut self) -> Result<()> {
match self.inner.abort().await {
Ok(_) => {
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={}B -> abort
writer",
- self.ctx.scheme,
+ self.ctx.log(
Operation::WriterAbort,
- self.path,
- self.written,
+ &format!("path={} written={}B", self.path, self.written),
+ "abort writer",
+ None,
);
Ok(())
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={}B -> abort
writer failed: {}",
- self.ctx.scheme,
- Operation::WriterAbort,
- self.path,
- self.written,
- self.ctx.error_print(&err),
- )
- }
+ self.ctx.log(
+ Operation::WriterAbort,
+ &format!("path={} written={}B", self.path, self.written),
+ "abort writer failed:",
+ Some(&err),
+ );
Err(err)
}
}
@@ -1138,63 +905,46 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn close(&mut self) -> Result<()> {
match self.inner.close().await {
Ok(_) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={}B -> data
written finished",
- self.ctx.scheme,
+ self.ctx.log(
self.op,
- self.path,
- self.written
+ &format!("path={} written={}B", self.path, self.written),
+ "data written finished",
+ None,
);
Ok(())
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={}B -> data
close failed: {}",
- self.ctx.scheme,
- Operation::WriterClose,
- self.path,
- self.written,
- self.ctx.error_print(&err),
- )
- }
+ self.ctx.log(
+ Operation::WriterClose,
+ &format!("path={} written={}B", self.path, self.written),
+ "data close failed:",
+ Some(&err),
+ );
Err(err)
}
}
}
}
-impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
+impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for
LoggingWriter<W, I> {
fn write(&mut self, bs: Buffer) -> Result<()> {
match self.inner.write(bs.clone()) {
Ok(_) => {
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={}B -> data write
{}B",
- self.ctx.scheme,
+ self.ctx.log(
Operation::BlockingWriterWrite,
- self.path,
- self.written,
- bs.len(),
+ &format!("path={} written={}B", self.path, self.written),
+ &format!("data write {}B", bs.len()),
+ None,
);
Ok(())
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={}B -> data
write failed: {}",
- self.ctx.scheme,
- Operation::BlockingWriterWrite,
- self.path,
- self.written,
- self.ctx.error_print(&err),
- )
- }
+ self.ctx.log(
+ Operation::BlockingWriterWrite,
+ &format!("path={} written={}B", self.path, self.written),
+ "data write failed:",
+ Some(&err),
+ );
Err(err)
}
}
@@ -1203,37 +953,29 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for
LoggingWriter<W> {
fn close(&mut self) -> Result<()> {
match self.inner.close() {
Ok(_) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={}B -> data
written finished",
- self.ctx.scheme,
+ self.ctx.log(
self.op,
- self.path,
- self.written
+ &format!("path={} written={}B", self.path, self.written),
+ "data written finished",
+ None,
);
Ok(())
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} written={}B -> data
close failed: {}",
- self.ctx.scheme,
- Operation::BlockingWriterClose,
- self.path,
- self.written,
- self.ctx.error_print(&err),
- )
- }
+ self.ctx.log(
+ Operation::BlockingWriterClose,
+ &format!("path={} written={}B", self.path, self.written),
+ "data close failed:",
+ Some(&err),
+ );
Err(err)
}
}
}
}
-pub struct LoggingLister<P> {
- ctx: LoggingContext,
+pub struct LoggingLister<P, I: LoggingInterceptor> {
+ ctx: LoggingContext<I>,
path: String,
op: Operation,
@@ -1241,8 +983,8 @@ pub struct LoggingLister<P> {
inner: P,
}
-impl<P> LoggingLister<P> {
- fn new(ctx: LoggingContext, path: &str, op: Operation, inner: P) -> Self {
+impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
+ fn new(ctx: LoggingContext<I>, path: &str, op: Operation, inner: P) ->
Self {
Self {
ctx,
path: path.to_string(),
@@ -1253,65 +995,38 @@ impl<P> LoggingLister<P> {
}
}
-impl<P> Drop for LoggingLister<P> {
+impl<P, I: LoggingInterceptor> Drop for LoggingLister<P, I> {
fn drop(&mut self) {
if self.finished {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> all entries read finished",
- self.ctx.scheme,
- self.op,
- self.path
- );
+ self.ctx
+ .log_with_path(self.op, &self.path, "all entries read
finished", None);
} else {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> partial entries read
finished",
- self.ctx.scheme,
- self.op,
- self.path
- );
+ self.ctx
+ .log_with_path(self.op, &self.path, "partial entries read
finished", None);
}
}
}
-impl<P: oio::List> oio::List for LoggingLister<P> {
+impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
let res = self.inner.next().await;
match &res {
Ok(Some(de)) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> listed entry: {}",
- self.ctx.scheme,
+ self.ctx.log_with_path(
self.op,
- self.path,
- de.path(),
+ &self.path,
+ &format!("listed entry: {}", de.path()),
+ None,
);
}
Ok(None) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished",
- self.ctx.scheme,
- self.op,
- self.path
- );
+ self.ctx
+ .log_with_path(self.op, &self.path, "finished", None);
self.finished = true;
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- self.op,
- self.path,
- self.ctx.error_print(err)
- )
- }
+ self.ctx.log_with_path(self.op, &self.path, "", Some(err));
}
};
@@ -1319,43 +1034,26 @@ impl<P: oio::List> oio::List for LoggingLister<P> {
}
}
-impl<P: oio::BlockingList> oio::BlockingList for LoggingLister<P> {
+impl<P: oio::BlockingList, I: LoggingInterceptor> oio::BlockingList for
LoggingLister<P, I> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
let res = self.inner.next();
match &res {
Ok(Some(des)) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> listed entry: {}",
- self.ctx.scheme,
+ self.ctx.log_with_path(
self.op,
- self.path,
- des.path(),
+ &self.path,
+ &format!("listed entry: {}", des.path()),
+ None,
);
}
Ok(None) => {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} -> finished",
- self.ctx.scheme,
- self.op,
- self.path
- );
+ self.ctx
+ .log_with_path(self.op, &self.path, "finished", None);
self.finished = true;
}
Err(err) => {
- if let Some(lvl) = self.ctx.error_level(err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} -> {}",
- self.ctx.scheme,
- self.op,
- self.path,
- self.ctx.error_print(err)
- )
- }
+ self.ctx.log_with_path(self.op, &self.path, "", Some(err));
}
};
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index eff0bfb8b7..98372b2203 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -33,6 +33,7 @@ mod immutable_index;
pub use immutable_index::ImmutableIndexLayer;
mod logging;
+pub use logging::LoggingInterceptor;
pub use logging::LoggingLayer;
mod timeout;
diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs
index c9eb875d20..1854576428 100644
--- a/core/src/raw/tests/utils.rs
+++ b/core/src/raw/tests/utils.rs
@@ -19,8 +19,11 @@ use std::collections::HashMap;
use std::env;
use std::str::FromStr;
+use log::{log, Level};
use once_cell::sync::Lazy;
+use crate::layers::LoggingInterceptor;
+use crate::raw::Operation;
use crate::*;
/// TEST_RUNTIME is the runtime used for running tests.
@@ -73,7 +76,7 @@ pub fn init_test_service() -> Result<Option<Operator>> {
let op = { op.layer(layers::ChaosLayer::new(0.1)) };
let mut op = op
- .layer(layers::LoggingLayer::default().with_backtrace_output(true))
+ .layer(layers::LoggingLayer::new(BacktraceLoggingInterceptor))
.layer(layers::TimeoutLayer::new())
.layer(layers::RetryLayer::new().with_max_times(4));
@@ -88,3 +91,66 @@ pub fn init_test_service() -> Result<Option<Operator>> {
Ok(Some(op))
}
+
+/// A logging interceptor that logs the backtrace.
+#[derive(Debug, Clone)]
+struct BacktraceLoggingInterceptor;
+
+impl LoggingInterceptor for BacktraceLoggingInterceptor {
+ fn log(
+ &self,
+ scheme: Scheme,
+ operation: raw::Operation,
+ context: &str,
+ message: &str,
+ err: Option<&Error>,
+ ) {
+ let Some(err) = err else {
+ let lvl = self.operation_level(operation);
+ log!(
+ target: "opendal::services",
+ lvl,
+ "service={} operation={} {} -> {}",
+ scheme,
+ operation,
+ context,
+ message,
+ );
+ return;
+ };
+
+ let err_msg = if err.kind() != ErrorKind::Unexpected {
+ format!("{err}")
+ } else {
+ format!("{err:?}")
+ };
+ let lvl = if err.kind() == ErrorKind::Unexpected {
+ Level::Error
+ } else {
+ Level::Warn
+ };
+
+ log!(
+ target: "opendal::services",
+ lvl,
+ "service={} operation={} {} -> {} {}",
+ scheme,
+ operation,
+ context,
+ message,
+ err_msg,
+ );
+ }
+}
+
+impl BacktraceLoggingInterceptor {
+ fn operation_level(&self, operation: Operation) -> Level {
+ match operation {
+ Operation::ReaderRead
+ | Operation::BlockingReaderRead
+ | Operation::WriterWrite
+ | Operation::BlockingWriterWrite => Level::Trace,
+ _ => Level::Debug,
+ }
+ }
+}