This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9510ca62 feat(layers): add a basic minitrace layer (#1931)
9510ca62 is described below

commit 9510ca6202fc18b1432a27b7b427e530e6ce542a
Author: Chojan Shang <[email protected]>
AuthorDate: Thu Apr 13 11:38:51 2023 +0800

    feat(layers): add a basic minitrace layer (#1931)
    
    * feat(layers): add minitrace layers
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * chore(layers/minitrace): add inner and blocking prefix
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(layers/minitrace): add relationship for blocking op
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(layers/minitrace): add relationship for async op
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(layers/minitrace): use enter with parent
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(layers/minitrace): enter on poll and better name
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(layers/minitrace): use into_static to get operation name
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(layers/minitrace): reuse span
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    ---------
    
    Signed-off-by: Chojan Shang <[email protected]>
---
 Cargo.lock                   |  68 ++++++++
 core/Cargo.toml              |  10 +-
 core/src/layers/minitrace.rs | 377 +++++++++++++++++++++++++++++++++++++++++++
 core/src/layers/mod.rs       |   5 +
 4 files changed, 459 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index 14bc3df7..08454add 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1920,6 +1920,43 @@ version = "0.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
 
+[[package]]
+name = "minitrace"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "317e28b8c337ada2fd437611c241ce053d5b7f5480b79e945597996b87b1de96"
+dependencies = [
+ "futures",
+ "minitrace-macro",
+ "minstant",
+ "once_cell",
+ "parking_lot 0.12.1",
+ "pin-project",
+]
+
+[[package]]
+name = "minitrace-macro"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "77814d165883613a1846517efdc50b88fabd9c210b7ff4d3745b38b99d539652"
+dependencies = [
+ "proc-macro-error",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "minstant"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bc5dcfca9a0725105ac948b84cfeb69c3942814c696326743797215413f854b9"
+dependencies = [
+ "ctor",
+ "libc",
+ "wasi 0.7.0",
+]
+
 [[package]]
 name = "mio"
 version = "0.8.6"
@@ -2218,6 +2255,7 @@ dependencies = [
  "log",
  "md-5",
  "metrics",
+ "minitrace",
  "moka",
  "once_cell",
  "opentelemetry 0.19.0",
@@ -2738,6 +2776,30 @@ dependencies = [
  "yansi",
 ]
 
+[[package]]
+name = "proc-macro-error"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
+dependencies = [
+ "proc-macro-error-attr",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+ "version_check",
+]
+
+[[package]]
+name = "proc-macro-error-attr"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "version_check",
+]
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.52"
@@ -4269,6 +4331,12 @@ dependencies = [
  "try-lock",
 ]
 
+[[package]]
+name = "wasi"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d"
+
 [[package]]
 name = "wasi"
 version = "0.9.0+wasi-snapshot-preview1"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 6c749ac8..c99fa4da 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -68,11 +68,18 @@ native-tls = ["reqwest/native-tls"]
 native-tls-vendored = ["reqwest/native-tls-vendored"]
 
 # Enable all layers.
-layers-all = ["layers-chaos", "layers-metrics", "layers-tracing"]
+layers-all = [
+  "layers-chaos",
+  "layers-metrics",
+  "layers-tracing",
+  "layers-minitrace",
+]
 # Enable layers chaos support
 layers-chaos = ["dep:rand"]
 # Enable layers metrics support
 layers-metrics = ["dep:metrics"]
+# Enable layers minitrace support.
+layers-minitrace = ["dep:minitrace"]
 # Enable layers tracing support.
 layers-tracing = ["dep:tracing"]
 
@@ -150,6 +157,7 @@ lazy-regex = { version = "2.5.0", optional = true }
 log = "0.4"
 md-5 = "0.10"
 metrics = { version = "0.20", optional = true }
+minitrace = { version = "0.4.0", optional = true }
 moka = { version = "0.10", optional = true, features = ["future"] }
 once_cell = "1"
 parking_lot = "0.12"
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
new file mode 100644
index 00000000..7585745f
--- /dev/null
+++ b/core/src/layers/minitrace.rs
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::io;
+use std::task::Context;
+use std::task::Poll;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::FutureExt;
+use minitrace::prelude::*;
+
+use crate::ops::*;
+use crate::raw::oio::{PageOperation, ReadOperation, WriteOperation};
+use crate::raw::*;
+use crate::*;
+
+/// Add [minitrace](https://docs.rs/minitrace/) for every operations.
+///
+/// # Examples
+///
+/// ## Basic Setup
+///
+/// ```
+/// use anyhow::Result;
+/// use opendal::layers::MinitraceLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+///
+/// let _ = Operator::new(services::Memory::default())
+///     .expect("must init")
+///     .layer(MinitraceLayer)
+///     .finish();
+/// ```
+///
+/// ## Real usage
+///
+/// ```no_run
+/// use std::error::Error;
+///
+/// use anyhow::Result;
+/// use futures::executor::block_on;
+/// use opendal::layers::MinitraceLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+///
+/// fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
+///     let collector = {
+///         let (span, collector) = minitrace::Span::root("op");
+///         let _g = span.set_local_parent();
+///         let runtime = tokio::runtime::Runtime::new()?;
+///
+///         runtime.block_on(async {
+///             let _ = dotenvy::dotenv();
+///             let op = Operator::from_env::<services::Memory>()
+///                 .expect("init operator must succeed")
+///                 .layer(MinitraceLayer)
+///                 .finish();
+///
+///             op.write("test", "0".repeat(16 * 1024 * 1024).into_bytes())
+///                 .await
+///                 .expect("must succeed");
+///             op.stat("test").await.expect("must succeed");
+///             op.read("test").await.expect("must succeed");
+///         });
+///         collector
+///     };
+///
+///     let spans = block_on(collector.collect());
+///
+///     let bytes =
+///         minitrace_jaeger::encode("opendal".to_owned(), rand::random(), 0, 
0, &spans).unwrap();
+///     minitrace_jaeger::report_blocking("127.0.0.1:6831".parse().unwrap(), 
&bytes)
+///         .expect("report error");
+///
+///     Ok(())
+/// }
+/// ```
+///
+/// # Output
+///
+/// OpenDAL is using 
[`minitrace`](https://docs.rs/minitrace/latest/minitrace/) for tracing 
internally.
+///
+/// To enable minitrace output, please init one of the reporter that 
`minitrace` supports.
+///
+/// For example:
+///
+/// ```ignore
+/// extern crate minitrace_jaeger;
+///
+/// let spans = block_on(collector.collect());
+///
+/// let bytes =
+///     minitrace_jaeger::encode("opendal".to_owned(), rand::random(), 0, 0, 
&spans).unwrap();
+/// minitrace_jaeger::report_blocking("127.0.0.1:6831".parse().unwrap(), 
&bytes).expect("report error");
+/// ```
+///
+/// For real-world usage, please take a look at 
[`minitrace-datadog`](https://crates.io/crates/minitrace-datadog) or 
[`minitrace-jaeger`](https://crates.io/crates/minitrace-jaeger) .
+pub struct MinitraceLayer;
+
+impl<A: Accessor> Layer<A> for MinitraceLayer {
+    type LayeredAccessor = MinitraceAccessor<A>;
+
+    fn layer(&self, inner: A) -> Self::LayeredAccessor {
+        MinitraceAccessor { inner }
+    }
+}
+
+#[derive(Debug)]
+pub struct MinitraceAccessor<A> {
+    inner: A,
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
+    type Inner = A;
+    type Reader = MinitraceWrapper<A::Reader>;
+    type BlockingReader = MinitraceWrapper<A::BlockingReader>;
+    type Writer = MinitraceWrapper<A::Writer>;
+    type BlockingWriter = MinitraceWrapper<A::BlockingWriter>;
+    type Pager = MinitraceWrapper<A::Pager>;
+    type BlockingPager = MinitraceWrapper<A::BlockingPager>;
+
+    fn inner(&self) -> &Self::Inner {
+        &self.inner
+    }
+
+    #[trace("metadata")]
+    fn metadata(&self) -> AccessorInfo {
+        self.inner.info()
+    }
+
+    #[trace("create", enter_on_poll = true)]
+    async fn create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+        self.inner.create(path, args).await
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let span = Span::enter_with_local_parent("read");
+        self.inner
+            .read(path, args)
+            .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+            .await
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let span = Span::enter_with_local_parent("write");
+        self.inner
+            .write(path, args)
+            .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+            .await
+    }
+
+    #[trace("stat", enter_on_poll = true)]
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        self.inner.stat(path, args).await
+    }
+
+    #[trace("delete", enter_on_poll = true)]
+    async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        self.inner.delete(path, args).await
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        let span = Span::enter_with_local_parent("list");
+        self.inner
+            .list(path, args)
+            .map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s))))
+            .await
+    }
+
+    async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::Pager)> {
+        let span = Span::enter_with_local_parent("scan");
+        self.inner
+            .scan(path, args)
+            .map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s))))
+            .await
+    }
+
+    #[trace("presign", enter_on_poll = true)]
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        self.inner.presign(path, args).await
+    }
+
+    #[trace("batch", enter_on_poll = true)]
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        self.inner.batch(args).await
+    }
+
+    #[trace("blocking_create")]
+    fn blocking_create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+        self.inner.blocking_create(path, args)
+    }
+
+    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
+        let span = Span::enter_with_local_parent("blocking_read");
+        self.inner.blocking_read(path, args).map(|(rp, r)| {
+            (
+                rp,
+                MinitraceWrapper::new(Span::enter_with_parent("ReadOperation", 
&span), r),
+            )
+        })
+    }
+
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+        let span = Span::enter_with_local_parent("blocking_write");
+        self.inner.blocking_write(path, args).map(|(rp, r)| {
+            (
+                rp,
+                
MinitraceWrapper::new(Span::enter_with_parent("WriteOperation", &span), r),
+            )
+        })
+    }
+
+    #[trace("blocking_stat")]
+    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        self.inner.blocking_stat(path, args)
+    }
+
+    #[trace("blocking_delete")]
+    fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        self.inner.blocking_delete(path, args)
+    }
+
+    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
+        let span = Span::enter_with_local_parent("blocking_list");
+        self.inner.blocking_list(path, args).map(|(rp, it)| {
+            (
+                rp,
+                MinitraceWrapper::new(Span::enter_with_parent("PageOperation", 
&span), it),
+            )
+        })
+    }
+
+    fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::BlockingPager)> {
+        let span = Span::enter_with_local_parent("blocking_scan");
+        self.inner.blocking_scan(path, args).map(|(rp, it)| {
+            (
+                rp,
+                MinitraceWrapper::new(Span::enter_with_parent("PageOperation", 
&span), it),
+            )
+        })
+    }
+}
+
+pub struct MinitraceWrapper<R> {
+    span: Span,
+    inner: R,
+}
+
+impl<R> MinitraceWrapper<R> {
+    fn new(span: Span, inner: R) -> Self {
+        Self { span, inner }
+    }
+}
+
+impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        let _span = Span::enter_with_parent(ReadOperation::Read.into_static(), 
&self.span);
+        self.inner.poll_read(cx, buf)
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> 
Poll<Result<u64>> {
+        let _span = Span::enter_with_parent(ReadOperation::Seek.into_static(), 
&self.span);
+        self.inner.poll_seek(cx, pos)
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        let _span = Span::enter_with_parent(ReadOperation::Next.into_static(), 
&self.span);
+        self.inner.poll_next(cx)
+    }
+}
+
+impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        let _span = 
Span::enter_with_parent(ReadOperation::BlockingRead.into_static(), &self.span);
+        self.inner.read(buf)
+    }
+
+    fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
+        let _span = 
Span::enter_with_parent(ReadOperation::BlockingSeek.into_static(), &self.span);
+        self.inner.seek(pos)
+    }
+
+    fn next(&mut self) -> Option<Result<Bytes>> {
+        let _span = 
Span::enter_with_parent(ReadOperation::BlockingNext.into_static(), &self.span);
+        self.inner.next()
+    }
+}
+
+#[async_trait]
+impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        self.inner
+            .write(bs)
+            .in_span(Span::enter_with_parent(
+                WriteOperation::Write.into_static(),
+                &self.span,
+            ))
+            .await
+    }
+
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        self.inner
+            .append(bs)
+            .in_span(Span::enter_with_parent(
+                WriteOperation::Append.into_static(),
+                &self.span,
+            ))
+            .await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        self.inner
+            .close()
+            .in_span(Span::enter_with_parent(
+                WriteOperation::Close.into_static(),
+                &self.span,
+            ))
+            .await
+    }
+}
+
+impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
+    fn write(&mut self, bs: Bytes) -> Result<()> {
+        let _span =
+            
Span::enter_with_parent(WriteOperation::BlockingWrite.into_static(), 
&self.span);
+        self.inner.write(bs)
+    }
+
+    fn append(&mut self, bs: Bytes) -> Result<()> {
+        let _span =
+            
Span::enter_with_parent(WriteOperation::BlockingAppend.into_static(), 
&self.span);
+        self.inner.append(bs)
+    }
+
+    fn close(&mut self) -> Result<()> {
+        let _span =
+            
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(), 
&self.span);
+        self.inner.close()
+    }
+}
+
+#[async_trait]
+impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
+    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        self.inner
+            .next()
+            .in_span(Span::enter_with_parent(
+                PageOperation::Next.into_static(),
+                &self.span,
+            ))
+            .await
+    }
+}
+
+impl<R: oio::BlockingPage> oio::BlockingPage for MinitraceWrapper<R> {
+    fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        let _span = 
Span::enter_with_parent(PageOperation::BlockingNext.into_static(), &self.span);
+        self.inner.next()
+    }
+}
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 26a949dc..8c64455f 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -44,6 +44,11 @@ mod tracing;
 #[cfg(feature = "layers-tracing")]
 pub use self::tracing::TracingLayer;
 
+#[cfg(feature = "layers-minitrace")]
+mod minitrace;
+#[cfg(feature = "layers-minitrace")]
+pub use self::minitrace::MinitraceLayer;
+
 mod type_eraser;
 pub(crate) use type_eraser::TypeEraseLayer;
 

Reply via email to