This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new dac28d1263 refactor(core): Use kv based context to avoid allocations
(#4986)
dac28d1263 is described below
commit dac28d1263717672c43c3fe23ed87bd92d8b98f7
Author: Xuanwo <[email protected]>
AuthorDate: Fri Aug 9 20:22:42 2024 +0800
refactor(core): Use kv based context to avoid allocations (#4986)
* refactor(core): Use kv based context to avoid allocations
Signed-off-by: Xuanwo <[email protected]>
* Remove logger test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bindings/haskell/test/BasicTest.hs | 12 +-
core/src/layers/logging.rs | 1021 +++++++++++++++++++++++-------------
core/src/raw/operation.rs | 17 +
core/src/raw/tests/utils.rs | 68 +--
4 files changed, 676 insertions(+), 442 deletions(-)
diff --git a/bindings/haskell/test/BasicTest.hs
b/bindings/haskell/test/BasicTest.hs
index 6c817d046e..9733f7a128 100644
--- a/bindings/haskell/test/BasicTest.hs
+++ b/bindings/haskell/test/BasicTest.hs
@@ -31,8 +31,7 @@ basicTests =
"Basic Tests"
[ testCase "testBasicOperation" testRawOperation,
testCase "testMonad" testMonad,
- testCase "testError" testError,
- testCase "testLogger" testLogger
+ testCase "testError" testError
]
testRawOperation :: Assertion
@@ -102,15 +101,6 @@ testError = do
where
operation = readOp "non-exist-path"
-testLogger :: Assertion
-testLogger = do
- state <- newIORef ""
- let logger initStr msg = modifyIORef' initStr (<> msgText msg)
- let logFn = LogAction $ logger state
- Right _ <- newOperator "memory" {ocLogAction = Just logFn}
- logStr <- readIORef state
- T.take 78 logStr @?= "service=memory operation=metadata ->
startedservice=memory operation=metadata"
-
-- helper function
(?=) :: (MonadIO m, Eq a, Show a) => m a -> a -> m ()
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 1dfdba85ef..277c9dfa0c 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -16,11 +16,8 @@
// under the License.
use std::fmt::Debug;
-use std::sync::atomic::AtomicU64;
-use std::sync::atomic::Ordering;
use std::sync::Arc;
-use bytes::Buf;
use futures::FutureExt;
use futures::TryFutureExt;
use log::log;
@@ -36,18 +33,12 @@ use crate::*;
/// - OpenDAL will log in structural way.
/// - Every operation will start with a `started` log entry.
/// - Every operation will finish with the following status:
-/// - `finished`: the operation is successful.
-/// - `errored`: the operation returns an expected error like `NotFound`.
+/// - `succeeded`: the operation is successful, but might have more to take.
+/// - `finished`: the whole operation is finished.
/// - `failed`: the operation returns an unexpected error.
/// - The default log level while expected error happened is `Warn`.
/// - The default log level while unexpected failure happened is `Error`.
///
-/// # Todo
-///
-/// We should migrate to log's kv api after it's ready.
-///
-/// Tracking issue: <https://github.com/rust-lang/log/issues/328>
-///
/// # Examples
///
/// ```no_run
@@ -86,7 +77,7 @@ use crate::*;
/// ```no_run
/// use opendal::layers::LoggingInterceptor;
/// use opendal::layers::LoggingLayer;
-/// use opendal::raw::Operation;
+/// use opendal::raw;
/// use opendal::services;
/// use opendal::Error;
/// use opendal::Operator;
@@ -98,9 +89,9 @@ use crate::*;
/// impl LoggingInterceptor for MyLoggingInterceptor {
/// fn log(
/// &self,
-/// scheme: Scheme,
-/// operation: Operation,
-/// context: &str,
+/// info: &raw::AccessorInfo,
+/// operation: raw::Operation,
+/// context: &[(&str, &str)],
/// message: &str,
/// err: Option<&Error>,
/// ) {
@@ -115,31 +106,21 @@ use crate::*;
/// ```
#[derive(Debug)]
pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
- notify: Arc<I>,
-}
-
-impl<I> Clone for LoggingLayer<I> {
- fn clone(&self) -> Self {
- Self {
- notify: self.notify.clone(),
- }
- }
+ logger: I,
}
impl Default for LoggingLayer {
fn default() -> Self {
Self {
- notify: Arc::new(DefaultLoggingInterceptor),
+ logger: DefaultLoggingInterceptor,
}
}
}
impl LoggingLayer {
/// Create the layer with specific logging interceptor.
- pub fn new<I: LoggingInterceptor>(notify: I) -> LoggingLayer<I> {
- LoggingLayer {
- notify: Arc::new(notify),
- }
+ pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
+ LoggingLayer { logger }
}
}
@@ -147,59 +128,25 @@ impl<A: Access, I: LoggingInterceptor> Layer<A> for
LoggingLayer<I> {
type LayeredAccess = LoggingAccessor<A, I>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
- let meta = inner.info();
+ let info = inner.info();
LoggingAccessor {
inner,
- ctx: LoggingContext {
- scheme: meta.scheme(),
- notify: self.notify.clone(),
- },
- }
- }
-}
-
-#[derive(Debug)]
-pub struct LoggingContext<I> {
- scheme: Scheme,
- notify: Arc<I>,
-}
-
-impl<I> Clone for LoggingContext<I> {
- fn clone(&self) -> Self {
- Self {
- scheme: self.scheme,
- notify: self.notify.clone(),
+ info,
+ logger: self.logger.clone(),
}
}
}
-impl<I: LoggingInterceptor> LoggingContext<I> {
- fn log(&self, operation: Operation, context: &str, message: &str, err:
Option<&Error>) {
- self.notify
- .log(self.scheme, operation, context, message, err)
- }
-
- fn log_with_path(&self, operation: Operation, path: &str, message: &str,
err: Option<&Error>) {
- self.notify.log(
- self.scheme,
- operation,
- &format!("path={path}"),
- message,
- err,
- )
- }
-}
-
/// LoggingInterceptor is used to intercept the log.
-pub trait LoggingInterceptor: Debug + Send + Sync + 'static {
+pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
/// Everytime there is a log, this function will be called.
///
/// # Inputs
///
- /// - scheme: The service generates the log.
+ /// - info: The service's access info.
/// - operation: The operation to log.
- /// - context: Additional context of the log.
+ /// - context: Additional context of the log like path, etc.
/// - message: The log message.
/// - err: The error to log.
///
@@ -210,81 +157,97 @@ pub trait LoggingInterceptor: Debug + Send + Sync +
'static {
/// could perform unexpectedly slow.
fn log(
&self,
- scheme: Scheme,
+ info: &AccessorInfo,
operation: Operation,
- context: &str,
+ context: &[(&str, &str)],
message: &str,
err: Option<&Error>,
);
}
/// The DefaultLoggingInterceptor will log the message by the standard logging
macro.
-#[derive(Debug)]
+#[derive(Debug, Copy, Clone, Default)]
pub struct DefaultLoggingInterceptor;
impl LoggingInterceptor for DefaultLoggingInterceptor {
+ #[inline]
fn log(
&self,
- scheme: Scheme,
+ info: &AccessorInfo,
operation: Operation,
- context: &str,
+ context: &[(&str, &str)],
message: &str,
err: Option<&Error>,
) {
- let Some(err) = err else {
- let lvl = self.operation_level(operation);
+ if let Some(err) = err {
+ // Print error if it's unexpected, otherwise in warn.
+ let lvl = if err.kind() == ErrorKind::Unexpected {
+ Level::Error
+ } else {
+ Level::Warn
+ };
+
log!(
target: LOGGING_TARGET,
lvl,
- "service={} operation={} {} -> {}",
- scheme,
- operation,
- context,
- message,
+ "service={} name={} {}: {operation} {message} {}",
+ info.scheme(),
+ info.name(),
+ format_args!(
+ "{}",
+ context.iter().enumerate().map(|(i, (k, v))| {
+ if i > 0 {
+ format!(" {}={}", k, v)
+ } else {
+ format!("{}={}", k, v)
+ }
+ }).collect::<String>()
+ ),
+ // Print error message with debug output while unexpected
happened.
+ //
+ // It's super sad that we can't bind `format_args!()` here.
+ // See: https://github.com/rust-lang/rust/issues/92698
+ if err.kind() != ErrorKind::Unexpected {
+ format!("{err}")
+ } else {
+ format!("{err:?}")
+ }
);
- return;
+ }
+
+ // Print debug message if operation is oneshot, otherwise in trace.
+ let lvl = if operation.is_oneshot() {
+ Level::Debug
+ } else {
+ Level::Trace
};
- let lvl = self.error_level(err);
log!(
target: LOGGING_TARGET,
lvl,
- "service={} operation={} {} -> {} {}",
- scheme,
- operation,
- context,
- message,
- err,
+ "service={} name={} {}: {operation} {message}",
+ info.scheme(),
+ info.name(),
+ format_args!(
+ "{}",
+ context.iter().enumerate().map(|(i, (k, v))| {
+ if i > 0 {
+ format!(" {}={}", k, v)
+ } else {
+ format!("{}={}", k, v)
+ }
+ }).collect::<String>()
+ ),
);
}
}
-impl DefaultLoggingInterceptor {
- fn operation_level(&self, operation: Operation) -> Level {
- match operation {
- Operation::ReaderRead
- | Operation::BlockingReaderRead
- | Operation::WriterWrite
- | Operation::BlockingWriterWrite => Level::Trace,
- _ => Level::Debug,
- }
- }
-
- #[inline]
- fn error_level(&self, err: &Error) -> Level {
- if err.kind() == ErrorKind::Unexpected {
- Level::Error
- } else {
- Level::Warn
- }
- }
-}
-
#[derive(Clone, Debug)]
pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
inner: A,
- ctx: LoggingContext<I>,
+ info: Arc<AccessorInfo>,
+ logger: I,
}
static LOGGING_TARGET: &str = "opendal::services";
@@ -303,83 +266,128 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
}
fn metadata(&self) -> Arc<AccessorInfo> {
- self.ctx.log(Operation::Info, "", "started", None);
- let result = self.inner.info();
- self.ctx.log(
- Operation::Info,
- "",
- &format!("finished: {:?}", result),
- None,
- );
+ self.logger
+ .log(&self.info, Operation::Info, &[], "started", None);
+
+ let info = self.info.clone();
+
+ self.logger
+ .log(&self.info, Operation::Info, &[], "finished", None);
- result
+ info
}
async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- self.ctx
- .log_with_path(Operation::CreateDir, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::CreateDir,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.create_dir(path, args)
.await
.map(|v| {
- self.ctx
- .log_with_path(Operation::CreateDir, path, "finished",
None);
+ self.logger.log(
+ &self.info,
+ Operation::CreateDir,
+ &[("path", path)],
+ "finished",
+ None,
+ );
v
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::CreateDir, path, "", Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::CreateDir,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- self.ctx
- .log_with_path(Operation::Read, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::Read,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.read(path, args)
.await
.map(|(rp, r)| {
- self.ctx
- .log_with_path(Operation::Read, path, "got reader", None);
+ self.logger.log(
+ &self.info,
+ Operation::Read,
+ &[("path", path)],
+ "created reader",
+ None,
+ );
(
rp,
- LoggingReader::new(self.ctx.clone(), Operation::Read,
path, r),
+ LoggingReader::new(self.info.clone(), self.logger.clone(),
path, r),
)
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::Read, path, "", Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::Read,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- self.ctx
- .log_with_path(Operation::Write, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::Write,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.write(path, args)
.await
.map(|(rp, w)| {
- self.ctx
- .log_with_path(Operation::Write, path, "start writing",
None);
- let w = LoggingWriter::new(self.ctx.clone(), Operation::Write,
path, w);
+ self.logger.log(
+ &self.info,
+ Operation::Write,
+ &[("path", path)],
+ "created writer",
+ None,
+ );
+ let w = LoggingWriter::new(self.info.clone(),
self.logger.clone(), path, w);
(rp, w)
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::Write, path, "", Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::Write,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Copy,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"started",
None,
);
@@ -388,19 +396,21 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
.copy(from, to, args)
.await
.map(|v| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Copy,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"finished",
None,
);
v
})
.map_err(|err| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Copy,
- &format!("from={from} to={to}"),
- "",
+ &[("from", from), ("to", to)],
+ "failed",
Some(&err),
);
err
@@ -408,9 +418,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
}
async fn rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Rename,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"started",
None,
);
@@ -419,19 +430,21 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
.rename(from, to, args)
.await
.map(|v| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Rename,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"finished",
None,
);
v
})
.map_err(|err| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Rename,
- &format!("from={from} to={to}"),
- "",
+ &[("from", from), ("to", to)],
+ "failed",
Some(&err),
);
err
@@ -439,56 +452,105 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- self.ctx
- .log_with_path(Operation::Stat, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::Stat,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.stat(path, args)
.await
.map(|v| {
- self.ctx
- .log_with_path(Operation::Stat, path, "finished", None);
+ self.logger.log(
+ &self.info,
+ Operation::Stat,
+ &[("path", path)],
+ "finished",
+ None,
+ );
v
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::Stat, path, "", Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::Stat,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
- self.ctx
- .log_with_path(Operation::Delete, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::Delete,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.delete(path, args.clone())
.inspect(|v| match v {
Ok(_) => {
- self.ctx
- .log_with_path(Operation::Delete, path, "finished",
None);
+ self.logger.log(
+ &self.info,
+ Operation::Delete,
+ &[("path", path)],
+ "finished",
+ None,
+ );
}
Err(err) => {
- self.ctx
- .log_with_path(Operation::Delete, path, "", Some(err));
+ self.logger.log(
+ &self.info,
+ Operation::Delete,
+ &[("path", path)],
+ "failed",
+ Some(err),
+ );
}
})
.await
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ self.logger.log(
+ &self.info,
+ Operation::List,
+ &[("path", path)],
+ "started",
+ None,
+ );
+
self.inner
.list(path, args)
.map(|v| match v {
Ok((rp, v)) => {
- self.ctx
- .log_with_path(Operation::List, path, "start listing
dir", None);
- let streamer = LoggingLister::new(self.ctx.clone(), path,
Operation::List, v);
+ self.logger.log(
+ &self.info,
+ Operation::List,
+ &[("path", path)],
+ "created lister",
+ None,
+ );
+ let streamer =
+ LoggingLister::new(self.info.clone(),
self.logger.clone(), path, v);
Ok((rp, streamer))
}
Err(err) => {
- self.ctx
- .log_with_path(Operation::List, path, "", Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::List,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
Err(err)
}
})
@@ -496,20 +558,35 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
- self.ctx
- .log_with_path(Operation::Presign, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::Presign,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.presign(path, args)
.await
.map(|v| {
- self.ctx
- .log_with_path(Operation::Presign, path, "finished", None);
+ self.logger.log(
+ &self.info,
+ Operation::Presign,
+ &[("path", path)],
+ "finished",
+ None,
+ );
v
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::Presign, path, "", Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::Presign,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
@@ -517,9 +594,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let (op, count) = (args.operation()[0].1.operation(),
args.operation().len());
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Batch,
- &format!("op={op} count={count} -> started"),
+ &[("op", op.into_static()), ("count", &count.to_string())],
"started",
None,
);
@@ -527,9 +605,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
self.inner
.batch(args)
.map_ok(|v| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Batch,
- &format!("op={op} count={count}"),
+ &[("op", op.into_static()), ("count", &count.to_string())],
&format!(
"finished: {}, succeed: {}, failed: {}",
v.results().len(),
@@ -541,10 +620,11 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
v
})
.map_err(|err| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::Batch,
- &format!("op={op} count={count}"),
- "",
+ &[("op", op.into_static()), ("count", &count.to_string())],
+ "failed",
Some(&err),
);
err
@@ -553,65 +633,111 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
}
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- self.ctx
- .log_with_path(Operation::BlockingCreateDir, path, "started",
None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingCreateDir,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.blocking_create_dir(path, args)
.map(|v| {
- self.ctx
- .log_with_path(Operation::BlockingCreateDir, path,
"finished", None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingCreateDir,
+ &[("path", path)],
+ "finished",
+ None,
+ );
v
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::BlockingCreateDir, path, "",
Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::BlockingCreateDir,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
- self.ctx
- .log_with_path(Operation::BlockingRead, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingRead,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.blocking_read(path, args.clone())
.map(|(rp, r)| {
- self.ctx
- .log_with_path(Operation::BlockingRead, path, "got
reader", None);
- let r = LoggingReader::new(self.ctx.clone(),
Operation::BlockingRead, path, r);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingRead,
+ &[("path", path)],
+ "created reader",
+ None,
+ );
+ let r = LoggingReader::new(self.info.clone(),
self.logger.clone(), path, r);
(rp, r)
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::BlockingRead, path, "",
Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::BlockingRead,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
- self.ctx
- .log_with_path(Operation::BlockingWrite, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingWrite,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.blocking_write(path, args)
.map(|(rp, w)| {
- self.ctx
- .log_with_path(Operation::BlockingWrite, path, "start
writing", None);
- let w = LoggingWriter::new(self.ctx.clone(),
Operation::BlockingWrite, path, w);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingWrite,
+ &[("path", path)],
+ "created writer",
+ None,
+ );
+ let w = LoggingWriter::new(self.info.clone(),
self.logger.clone(), path, w);
(rp, w)
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::BlockingWrite, path, "",
Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::BlockingWrite,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) ->
Result<RpCopy> {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingCopy,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"started",
None,
);
@@ -619,18 +745,20 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
self.inner
.blocking_copy(from, to, args)
.map(|v| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingCopy,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"finished",
None,
);
v
})
.map_err(|err| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingCopy,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"",
Some(&err),
);
@@ -639,9 +767,10 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
}
fn blocking_rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingRename,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"started",
None,
);
@@ -649,19 +778,21 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
self.inner
.blocking_rename(from, to, args)
.map(|v| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingRename,
- &format!("from={from} to={to}"),
+ &[("from", from), ("to", to)],
"finished",
None,
);
v
})
.map_err(|err| {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingRename,
- &format!("from={from} to={to}"),
- "",
+ &[("from", from), ("to", to)],
+ "failed",
Some(&err),
);
err
@@ -669,56 +800,101 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
}
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- self.ctx
- .log_with_path(Operation::BlockingStat, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingStat,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.blocking_stat(path, args)
.map(|v| {
- self.ctx
- .log_with_path(Operation::BlockingStat, path, "finished",
None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingStat,
+ &[("path", path)],
+ "finished",
+ None,
+ );
v
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::BlockingStat, path, "",
Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::BlockingStat,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
- self.ctx
- .log_with_path(Operation::BlockingDelete, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingDelete,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.blocking_delete(path, args)
.map(|v| {
- self.ctx
- .log_with_path(Operation::BlockingDelete, path,
"finished", None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingDelete,
+ &[("path", path)],
+ "finished",
+ None,
+ );
v
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::BlockingDelete, path, "",
Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::BlockingDelete,
+ &[("path", path)],
+ "failed",
+ Some(&err),
+ );
err
})
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingLister)> {
- self.ctx
- .log_with_path(Operation::BlockingList, path, "started", None);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingList,
+ &[("path", path)],
+ "started",
+ None,
+ );
self.inner
.blocking_list(path, args)
.map(|(rp, v)| {
- self.ctx
- .log_with_path(Operation::BlockingList, path, "got dir",
None);
- let li = LoggingLister::new(self.ctx.clone(), path,
Operation::BlockingList, v);
+ self.logger.log(
+ &self.info,
+ Operation::BlockingList,
+ &[("path", path)],
+ "created lister",
+ None,
+ );
+ let li = LoggingLister::new(self.info.clone(),
self.logger.clone(), path, v);
(rp, li)
})
.map_err(|err| {
- self.ctx
- .log_with_path(Operation::BlockingList, path, "",
Some(&err));
+ self.logger.log(
+ &self.info,
+ Operation::BlockingList,
+ &[("path", path)],
+ "",
+ Some(&err),
+ );
err
})
}
@@ -726,69 +902,63 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for
LoggingAccessor<A, I> {
/// `LoggingReader` is a wrapper of `BytesReader`, with logging functionality.
pub struct LoggingReader<R, I: LoggingInterceptor> {
- ctx: LoggingContext<I>,
+ info: Arc<AccessorInfo>,
+ logger: I,
path: String,
- op: Operation,
- read: AtomicU64,
+ read: u64,
inner: R,
}
impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
- fn new(ctx: LoggingContext<I>, op: Operation, path: &str, reader: R) ->
Self {
+ fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
Self {
- ctx,
- op,
+ info,
+ logger,
path: path.to_string(),
- read: AtomicU64::new(0),
+ read: 0,
inner: reader,
}
}
}
-impl<R, I: LoggingInterceptor> Drop for LoggingReader<R, I> {
- fn drop(&mut self) {
- self.ctx.log(
- self.op,
- &format!(
- "path={} read={}",
- self.path,
- self.read.load(Ordering::Relaxed)
- ),
- "data read finished",
+impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
+ async fn read(&mut self) -> Result<Buffer> {
+ self.logger.log(
+ &self.info,
+ Operation::ReaderRead,
+ &[("path", &self.path), ("read", &self.read.to_string())],
+ "started",
None,
);
- }
-}
-impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
- async fn read(&mut self) -> Result<Buffer> {
match self.inner.read().await {
Ok(bs) => {
- self.read
- .fetch_add(bs.remaining() as u64, Ordering::Relaxed);
- self.ctx.log(
+ self.read += bs.len() as u64;
+ self.logger.log(
+ &self.info,
Operation::ReaderRead,
- &format!(
- "path={} read={}",
- self.path,
- self.read.load(Ordering::Relaxed),
- ),
- &format!("read returns {}B", bs.remaining()),
+ &[
+ ("path", &self.path),
+ ("read", &self.read.to_string()),
+ ("size", &bs.len().to_string()),
+ ],
+ if bs.is_empty() {
+ "finished"
+ } else {
+ "succeeded"
+ },
None,
);
Ok(bs)
}
Err(err) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::ReaderRead,
- &format!(
- "path={} read={}",
- self.path,
- self.read.load(Ordering::Relaxed)
- ),
- "read failed:",
+ &[("path", &self.path), ("read", &self.read.to_string())],
+ "failed",
Some(&err),
);
Err(err)
@@ -799,31 +969,40 @@ impl<R: oio::Read, I: LoggingInterceptor> oio::Read for
LoggingReader<R, I> {
impl<R: oio::BlockingRead, I: LoggingInterceptor> oio::BlockingRead for
LoggingReader<R, I> {
fn read(&mut self) -> Result<Buffer> {
+ self.logger.log(
+ &self.info,
+ Operation::BlockingReaderRead,
+ &[("path", &self.path), ("read", &self.read.to_string())],
+ "started",
+ None,
+ );
+
match self.inner.read() {
Ok(bs) => {
- self.read
- .fetch_add(bs.remaining() as u64, Ordering::Relaxed);
- self.ctx.log(
+ self.read += bs.len() as u64;
+ self.logger.log(
+ &self.info,
Operation::BlockingReaderRead,
- &format!(
- "path={} read={}",
- self.path,
- self.read.load(Ordering::Relaxed),
- ),
- &format!("read returns {}B", bs.remaining()),
+ &[
+ ("path", &self.path),
+ ("read", &self.read.to_string()),
+ ("size", &bs.len().to_string()),
+ ],
+ if bs.is_empty() {
+ "finished"
+ } else {
+ "succeeded"
+ },
None,
);
Ok(bs)
}
Err(err) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingReaderRead,
- &format!(
- "path={} read={}",
- self.path,
- self.read.load(Ordering::Relaxed)
- ),
- "read failed:",
+ &[("path", &self.path), ("read", &self.read.to_string())],
+ "failed",
Some(&err),
);
Err(err)
@@ -833,8 +1012,8 @@ impl<R: oio::BlockingRead, I: LoggingInterceptor>
oio::BlockingRead for LoggingR
}
pub struct LoggingWriter<W, I> {
- ctx: LoggingContext<I>,
- op: Operation,
+ info: Arc<AccessorInfo>,
+ logger: I,
path: String,
written: u64,
@@ -842,10 +1021,10 @@ pub struct LoggingWriter<W, I> {
}
impl<W, I> LoggingWriter<W, I> {
- fn new(ctx: LoggingContext<I>, op: Operation, path: &str, writer: W) ->
Self {
+ fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
Self {
- ctx,
- op,
+ info,
+ logger,
path: path.to_string(),
written: 0,
@@ -857,21 +1036,45 @@ impl<W, I> LoggingWriter<W, I> {
impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
+
+ self.logger.log(
+ &self.info,
+ Operation::WriterWrite,
+ &[
+ ("path", &self.path),
+ ("written", &self.written.to_string()),
+ ("size", &size.to_string()),
+ ],
+ "started",
+ None,
+ );
+
match self.inner.write(bs).await {
Ok(_) => {
- self.ctx.log(
+ self.written += size as u64;
+ self.logger.log(
+ &self.info,
Operation::WriterWrite,
- &format!("path={} written={}B", self.path, self.written),
- &format!("data write {}B", size),
+ &[
+ ("path", &self.path),
+ ("written", &self.written.to_string()),
+ ("size", &size.to_string()),
+ ],
+ "succeeded",
None,
);
Ok(())
}
Err(err) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::WriterWrite,
- &format!("path={} written={}B", self.path, self.written),
- "data write failed:",
+ &[
+ ("path", &self.path),
+ ("written", &self.written.to_string()),
+ ("size", &size.to_string()),
+ ],
+ "failed",
Some(&err),
);
Err(err)
@@ -880,21 +1083,31 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write
for LoggingWriter<W, I> {
}
async fn abort(&mut self) -> Result<()> {
+ self.logger.log(
+ &self.info,
+ Operation::WriterAbort,
+ &[("path", &self.path), ("written", &self.written.to_string())],
+ "started",
+ None,
+ );
+
match self.inner.abort().await {
Ok(_) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::WriterAbort,
- &format!("path={} written={}B", self.path, self.written),
- "abort writer",
+ &[("path", &self.path), ("written",
&self.written.to_string())],
+ "succeeded",
None,
);
Ok(())
}
Err(err) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::WriterAbort,
- &format!("path={} written={}B", self.path, self.written),
- "abort writer failed:",
+ &[("path", &self.path), ("written",
&self.written.to_string())],
+ "failed",
Some(&err),
);
Err(err)
@@ -903,21 +1116,31 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write
for LoggingWriter<W, I> {
}
async fn close(&mut self) -> Result<()> {
+ self.logger.log(
+ &self.info,
+ Operation::WriterClose,
+ &[("path", &self.path), ("written", &self.written.to_string())],
+ "started",
+ None,
+ );
+
match self.inner.close().await {
Ok(_) => {
- self.ctx.log(
- self.op,
- &format!("path={} written={}B", self.path, self.written),
- "data written finished",
+ self.logger.log(
+ &self.info,
+ Operation::WriterClose,
+ &[("path", &self.path), ("written",
&self.written.to_string())],
+ "succeeded",
None,
);
Ok(())
}
Err(err) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::WriterClose,
- &format!("path={} written={}B", self.path, self.written),
- "data close failed:",
+ &[("path", &self.path), ("written",
&self.written.to_string())],
+ "failed",
Some(&err),
);
Err(err)
@@ -928,21 +1151,45 @@ impl<W: oio::Write, I: LoggingInterceptor> oio::Write
for LoggingWriter<W, I> {
impl<W: oio::BlockingWrite, I: LoggingInterceptor> oio::BlockingWrite for
LoggingWriter<W, I> {
fn write(&mut self, bs: Buffer) -> Result<()> {
- match self.inner.write(bs.clone()) {
+ let size = bs.len();
+
+ self.logger.log(
+ &self.info,
+ Operation::BlockingWriterWrite,
+ &[
+ ("path", &self.path),
+ ("written", &self.written.to_string()),
+ ("size", &size.to_string()),
+ ],
+ "started",
+ None,
+ );
+
+ match self.inner.write(bs) {
Ok(_) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingWriterWrite,
- &format!("path={} written={}B", self.path, self.written),
- &format!("data write {}B", bs.len()),
+ &[
+ ("path", &self.path),
+ ("written", &self.written.to_string()),
+ ("size", &size.to_string()),
+ ],
+ "succeeded",
None,
);
Ok(())
}
Err(err) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingWriterWrite,
- &format!("path={} written={}B", self.path, self.written),
- "data write failed:",
+ &[
+ ("path", &self.path),
+ ("written", &self.written.to_string()),
+ ("size", &size.to_string()),
+ ],
+ "failed",
Some(&err),
);
Err(err)
@@ -951,21 +1198,31 @@ impl<W: oio::BlockingWrite, I: LoggingInterceptor>
oio::BlockingWrite for Loggin
}
fn close(&mut self) -> Result<()> {
+ self.logger.log(
+ &self.info,
+ Operation::BlockingWriterClose,
+ &[("path", &self.path), ("written", &self.written.to_string())],
+ "started",
+ None,
+ );
+
match self.inner.close() {
Ok(_) => {
- self.ctx.log(
- self.op,
- &format!("path={} written={}B", self.path, self.written),
- "data written finished",
+ self.logger.log(
+ &self.info,
+ Operation::BlockingWriterWrite,
+ &[("path", &self.path), ("written",
&self.written.to_string())],
+ "succeeded",
None,
);
Ok(())
}
Err(err) => {
- self.ctx.log(
+ self.logger.log(
+ &self.info,
Operation::BlockingWriterClose,
- &format!("path={} written={}B", self.path, self.written),
- "data close failed:",
+ &[("path", &self.path), ("written",
&self.written.to_string())],
+ "failed",
Some(&err),
);
Err(err)
@@ -975,58 +1232,71 @@ impl<W: oio::BlockingWrite, I: LoggingInterceptor>
oio::BlockingWrite for Loggin
}
pub struct LoggingLister<P, I: LoggingInterceptor> {
- ctx: LoggingContext<I>,
+ info: Arc<AccessorInfo>,
+ logger: I,
path: String,
- op: Operation,
- finished: bool,
+ listed: usize,
inner: P,
}
impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
- fn new(ctx: LoggingContext<I>, path: &str, op: Operation, inner: P) ->
Self {
+ fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
Self {
- ctx,
+ info,
+ logger,
path: path.to_string(),
- op,
- finished: false,
- inner,
- }
- }
-}
-impl<P, I: LoggingInterceptor> Drop for LoggingLister<P, I> {
- fn drop(&mut self) {
- if self.finished {
- self.ctx
- .log_with_path(self.op, &self.path, "all entries read
finished", None);
- } else {
- self.ctx
- .log_with_path(self.op, &self.path, "partial entries read
finished", None);
+ listed: 0,
+ inner,
}
}
}
impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
+ self.logger.log(
+ &self.info,
+ Operation::ListerNext,
+ &[("path", &self.path), ("listed", &self.listed.to_string())],
+ "started",
+ None,
+ );
+
let res = self.inner.next().await;
match &res {
Ok(Some(de)) => {
- self.ctx.log_with_path(
- self.op,
- &self.path,
- &format!("listed entry: {}", de.path()),
+ self.listed += 1;
+ self.logger.log(
+ &self.info,
+ Operation::ListerNext,
+ &[
+ ("path", &self.path),
+ ("listed", &self.listed.to_string()),
+ ("entry", de.path()),
+ ],
+ "succeeded",
None,
);
}
Ok(None) => {
- self.ctx
- .log_with_path(self.op, &self.path, "finished", None);
- self.finished = true;
+ self.logger.log(
+ &self.info,
+ Operation::ListerNext,
+ &[("path", &self.path), ("listed",
&self.listed.to_string())],
+ "finished",
+ None,
+ );
}
Err(err) => {
- self.ctx.log_with_path(self.op, &self.path, "", Some(err));
+ self.logger.log(
+ &self.info,
+ Operation::ListerNext,
+ &[("path", &self.path), ("listed",
&self.listed.to_string())],
+ "failed",
+ Some(err),
+ );
}
};
@@ -1036,24 +1306,47 @@ impl<P: oio::List, I: LoggingInterceptor> oio::List for
LoggingLister<P, I> {
impl<P: oio::BlockingList, I: LoggingInterceptor> oio::BlockingList for
LoggingLister<P, I> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
- let res = self.inner.next();
+ self.logger.log(
+ &self.info,
+ Operation::BlockingListerNext,
+ &[("path", &self.path), ("listed", &self.listed.to_string())],
+ "started",
+ None,
+ );
+ let res = self.inner.next();
match &res {
- Ok(Some(des)) => {
- self.ctx.log_with_path(
- self.op,
- &self.path,
- &format!("listed entry: {}", des.path()),
+ Ok(Some(de)) => {
+ self.listed += 1;
+ self.logger.log(
+ &self.info,
+ Operation::BlockingListerNext,
+ &[
+ ("path", &self.path),
+ ("listed", &self.listed.to_string()),
+ ("entry", de.path()),
+ ],
+ "succeeded",
None,
);
}
Ok(None) => {
- self.ctx
- .log_with_path(self.op, &self.path, "finished", None);
- self.finished = true;
+ self.logger.log(
+ &self.info,
+ Operation::BlockingListerNext,
+ &[("path", &self.path), ("listed",
&self.listed.to_string())],
+ "finished",
+ None,
+ );
}
Err(err) => {
- self.ctx.log_with_path(self.op, &self.path, "", Some(err));
+ self.logger.log(
+ &self.info,
+ Operation::BlockingListerNext,
+ &[("path", &self.path), ("listed",
&self.listed.to_string())],
+ "failed",
+ Some(err),
+ );
}
};
diff --git a/core/src/raw/operation.rs b/core/src/raw/operation.rs
index d195cb57ac..c5ef760474 100644
--- a/core/src/raw/operation.rs
+++ b/core/src/raw/operation.rs
@@ -86,6 +86,23 @@ impl Operation {
pub fn into_static(self) -> &'static str {
self.into()
}
+
+ /// Check if given operation is oneshot or not.
+ ///
+ /// For example, `Stat` is oneshot but `ReaderRead` could happen multiple
times.
+ ///
+ /// This function can be used to decide take actions based on operations
like logging.
+ pub fn is_oneshot(&self) -> bool {
+ !matches!(
+ self,
+ Operation::ReaderRead
+ | Operation::WriterWrite
+ | Operation::ListerNext
+ | Operation::BlockingReaderRead
+ | Operation::BlockingWriterWrite
+ | Operation::BlockingListerNext
+ )
+ }
}
impl Display for Operation {
diff --git a/core/src/raw/tests/utils.rs b/core/src/raw/tests/utils.rs
index 1854576428..808788f710 100644
--- a/core/src/raw/tests/utils.rs
+++ b/core/src/raw/tests/utils.rs
@@ -19,11 +19,8 @@ use std::collections::HashMap;
use std::env;
use std::str::FromStr;
-use log::{log, Level};
use once_cell::sync::Lazy;
-use crate::layers::LoggingInterceptor;
-use crate::raw::Operation;
use crate::*;
/// TEST_RUNTIME is the runtime used for running tests.
@@ -76,7 +73,7 @@ pub fn init_test_service() -> Result<Option<Operator>> {
let op = { op.layer(layers::ChaosLayer::new(0.1)) };
let mut op = op
- .layer(layers::LoggingLayer::new(BacktraceLoggingInterceptor))
+ .layer(layers::LoggingLayer::default())
.layer(layers::TimeoutLayer::new())
.layer(layers::RetryLayer::new().with_max_times(4));
@@ -91,66 +88,3 @@ pub fn init_test_service() -> Result<Option<Operator>> {
Ok(Some(op))
}
-
-/// A logging interceptor that logs the backtrace.
-#[derive(Debug, Clone)]
-struct BacktraceLoggingInterceptor;
-
-impl LoggingInterceptor for BacktraceLoggingInterceptor {
- fn log(
- &self,
- scheme: Scheme,
- operation: raw::Operation,
- context: &str,
- message: &str,
- err: Option<&Error>,
- ) {
- let Some(err) = err else {
- let lvl = self.operation_level(operation);
- log!(
- target: "opendal::services",
- lvl,
- "service={} operation={} {} -> {}",
- scheme,
- operation,
- context,
- message,
- );
- return;
- };
-
- let err_msg = if err.kind() != ErrorKind::Unexpected {
- format!("{err}")
- } else {
- format!("{err:?}")
- };
- let lvl = if err.kind() == ErrorKind::Unexpected {
- Level::Error
- } else {
- Level::Warn
- };
-
- log!(
- target: "opendal::services",
- lvl,
- "service={} operation={} {} -> {} {}",
- scheme,
- operation,
- context,
- message,
- err_msg,
- );
- }
-}
-
-impl BacktraceLoggingInterceptor {
- fn operation_level(&self, operation: Operation) -> Level {
- match operation {
- Operation::ReaderRead
- | Operation::BlockingReaderRead
- | Operation::WriterWrite
- | Operation::BlockingWriterWrite => Level::Trace,
- _ => Level::Debug,
- }
- }
-}