This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9510ca62 feat(layers): add a basic minitrace layer (#1931)
9510ca62 is described below
commit 9510ca6202fc18b1432a27b7b427e530e6ce542a
Author: Chojan Shang <[email protected]>
AuthorDate: Thu Apr 13 11:38:51 2023 +0800
feat(layers): add a basic minitrace layer (#1931)
* feat(layers): add minitrace layers
Signed-off-by: Chojan Shang <[email protected]>
* chore(layers/minitrace): add inner and blocking prefix
Signed-off-by: Chojan Shang <[email protected]>
* refactor(layers/minitrace): add relationship for blocking op
Signed-off-by: Chojan Shang <[email protected]>
* refactor(layers/minitrace): add relationship for async op
Signed-off-by: Chojan Shang <[email protected]>
* refactor(layers/minitrace): use enter with parent
Signed-off-by: Chojan Shang <[email protected]>
* refactor(layers/minitrace): enter on poll and better name
Signed-off-by: Chojan Shang <[email protected]>
* refactor(layers/minitrace): use into_static to get operation name
Signed-off-by: Chojan Shang <[email protected]>
* refactor(layers/minitrace): reuse span
Signed-off-by: Chojan Shang <[email protected]>
---------
Signed-off-by: Chojan Shang <[email protected]>
---
Cargo.lock | 68 ++++++++
core/Cargo.toml | 10 +-
core/src/layers/minitrace.rs | 377 +++++++++++++++++++++++++++++++++++++++++++
core/src/layers/mod.rs | 5 +
4 files changed, 459 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index 14bc3df7..08454add 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1920,6 +1920,43 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
+[[package]]
+name = "minitrace"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "317e28b8c337ada2fd437611c241ce053d5b7f5480b79e945597996b87b1de96"
+dependencies = [
+ "futures",
+ "minitrace-macro",
+ "minstant",
+ "once_cell",
+ "parking_lot 0.12.1",
+ "pin-project",
+]
+
+[[package]]
+name = "minitrace-macro"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77814d165883613a1846517efdc50b88fabd9c210b7ff4d3745b38b99d539652"
+dependencies = [
+ "proc-macro-error",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "minstant"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc5dcfca9a0725105ac948b84cfeb69c3942814c696326743797215413f854b9"
+dependencies = [
+ "ctor",
+ "libc",
+ "wasi 0.7.0",
+]
+
[[package]]
name = "mio"
version = "0.8.6"
@@ -2218,6 +2255,7 @@ dependencies = [
"log",
"md-5",
"metrics",
+ "minitrace",
"moka",
"once_cell",
"opentelemetry 0.19.0",
@@ -2738,6 +2776,30 @@ dependencies = [
"yansi",
]
+[[package]]
+name = "proc-macro-error"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
+dependencies = [
+ "proc-macro-error-attr",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+ "version_check",
+]
+
+[[package]]
+name = "proc-macro-error-attr"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "version_check",
+]
+
[[package]]
name = "proc-macro2"
version = "1.0.52"
@@ -4269,6 +4331,12 @@ dependencies = [
"try-lock",
]
+[[package]]
+name = "wasi"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d"
+
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 6c749ac8..c99fa4da 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -68,11 +68,18 @@ native-tls = ["reqwest/native-tls"]
native-tls-vendored = ["reqwest/native-tls-vendored"]
# Enable all layers.
-layers-all = ["layers-chaos", "layers-metrics", "layers-tracing"]
+layers-all = [
+ "layers-chaos",
+ "layers-metrics",
+ "layers-tracing",
+ "layers-minitrace",
+]
# Enable layers chaos support
layers-chaos = ["dep:rand"]
# Enable layers metrics support
layers-metrics = ["dep:metrics"]
+# Enable layers minitrace support.
+layers-minitrace = ["dep:minitrace"]
# Enable layers tracing support.
layers-tracing = ["dep:tracing"]
@@ -150,6 +157,7 @@ lazy-regex = { version = "2.5.0", optional = true }
log = "0.4"
md-5 = "0.10"
metrics = { version = "0.20", optional = true }
+minitrace = { version = "0.4.0", optional = true }
moka = { version = "0.10", optional = true, features = ["future"] }
once_cell = "1"
parking_lot = "0.12"
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
new file mode 100644
index 00000000..7585745f
--- /dev/null
+++ b/core/src/layers/minitrace.rs
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::io;
+use std::task::Context;
+use std::task::Poll;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::FutureExt;
+use minitrace::prelude::*;
+
+use crate::ops::*;
+use crate::raw::oio::{PageOperation, ReadOperation, WriteOperation};
+use crate::raw::*;
+use crate::*;
+
+/// Add [minitrace](https://docs.rs/minitrace/) for every operations.
+///
+/// # Examples
+///
+/// ## Basic Setup
+///
+/// ```
+/// use anyhow::Result;
+/// use opendal::layers::MinitraceLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+///
+/// let _ = Operator::new(services::Memory::default())
+/// .expect("must init")
+/// .layer(MinitraceLayer)
+/// .finish();
+/// ```
+///
+/// ## Real usage
+///
+/// ```no_run
+/// use std::error::Error;
+///
+/// use anyhow::Result;
+/// use futures::executor::block_on;
+/// use opendal::layers::MinitraceLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+///
+/// fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
+/// let collector = {
+/// let (span, collector) = minitrace::Span::root("op");
+/// let _g = span.set_local_parent();
+/// let runtime = tokio::runtime::Runtime::new()?;
+///
+/// runtime.block_on(async {
+/// let _ = dotenvy::dotenv();
+/// let op = Operator::from_env::<services::Memory>()
+/// .expect("init operator must succeed")
+/// .layer(MinitraceLayer)
+/// .finish();
+///
+/// op.write("test", "0".repeat(16 * 1024 * 1024).into_bytes())
+/// .await
+/// .expect("must succeed");
+/// op.stat("test").await.expect("must succeed");
+/// op.read("test").await.expect("must succeed");
+/// });
+/// collector
+/// };
+///
+/// let spans = block_on(collector.collect());
+///
+/// let bytes =
+/// minitrace_jaeger::encode("opendal".to_owned(), rand::random(), 0,
0, &spans).unwrap();
+/// minitrace_jaeger::report_blocking("127.0.0.1:6831".parse().unwrap(),
&bytes)
+/// .expect("report error");
+///
+/// Ok(())
+/// }
+/// ```
+///
+/// # Output
+///
+/// OpenDAL is using
[`minitrace`](https://docs.rs/minitrace/latest/minitrace/) for tracing
internally.
+///
+/// To enable minitrace output, please init one of the reporter that
`minitrace` supports.
+///
+/// For example:
+///
+/// ```ignore
+/// extern crate minitrace_jaeger;
+///
+/// let spans = block_on(collector.collect());
+///
+/// let bytes =
+/// minitrace_jaeger::encode("opendal".to_owned(), rand::random(), 0, 0,
&spans).unwrap();
+/// minitrace_jaeger::report_blocking("127.0.0.1:6831".parse().unwrap(),
&bytes).expect("report error");
+/// ```
+///
+/// For real-world usage, please take a look at
[`minitrace-datadog`](https://crates.io/crates/minitrace-datadog) or
[`minitrace-jaeger`](https://crates.io/crates/minitrace-jaeger) .
+pub struct MinitraceLayer;
+
+impl<A: Accessor> Layer<A> for MinitraceLayer {
+ type LayeredAccessor = MinitraceAccessor<A>;
+
+ fn layer(&self, inner: A) -> Self::LayeredAccessor {
+ MinitraceAccessor { inner }
+ }
+}
+
+#[derive(Debug)]
+pub struct MinitraceAccessor<A> {
+ inner: A,
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
+ type Inner = A;
+ type Reader = MinitraceWrapper<A::Reader>;
+ type BlockingReader = MinitraceWrapper<A::BlockingReader>;
+ type Writer = MinitraceWrapper<A::Writer>;
+ type BlockingWriter = MinitraceWrapper<A::BlockingWriter>;
+ type Pager = MinitraceWrapper<A::Pager>;
+ type BlockingPager = MinitraceWrapper<A::BlockingPager>;
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner
+ }
+
+ #[trace("metadata")]
+ fn metadata(&self) -> AccessorInfo {
+ self.inner.info()
+ }
+
+ #[trace("create", enter_on_poll = true)]
+ async fn create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+ self.inner.create(path, args).await
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let span = Span::enter_with_local_parent("read");
+ self.inner
+ .read(path, args)
+ .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+ .await
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let span = Span::enter_with_local_parent("write");
+ self.inner
+ .write(path, args)
+ .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+ .await
+ }
+
+ #[trace("stat", enter_on_poll = true)]
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.inner.stat(path, args).await
+ }
+
+ #[trace("delete", enter_on_poll = true)]
+ async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ self.inner.delete(path, args).await
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ let span = Span::enter_with_local_parent("list");
+ self.inner
+ .list(path, args)
+ .map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s))))
+ .await
+ }
+
+ async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::Pager)> {
+ let span = Span::enter_with_local_parent("scan");
+ self.inner
+ .scan(path, args)
+ .map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s))))
+ .await
+ }
+
+ #[trace("presign", enter_on_poll = true)]
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ self.inner.presign(path, args).await
+ }
+
+ #[trace("batch", enter_on_poll = true)]
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ self.inner.batch(args).await
+ }
+
+ #[trace("blocking_create")]
+ fn blocking_create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+ self.inner.blocking_create(path, args)
+ }
+
+ fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
+ let span = Span::enter_with_local_parent("blocking_read");
+ self.inner.blocking_read(path, args).map(|(rp, r)| {
+ (
+ rp,
+ MinitraceWrapper::new(Span::enter_with_parent("ReadOperation",
&span), r),
+ )
+ })
+ }
+
+ fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ let span = Span::enter_with_local_parent("blocking_write");
+ self.inner.blocking_write(path, args).map(|(rp, r)| {
+ (
+ rp,
+
MinitraceWrapper::new(Span::enter_with_parent("WriteOperation", &span), r),
+ )
+ })
+ }
+
+ #[trace("blocking_stat")]
+ fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.inner.blocking_stat(path, args)
+ }
+
+ #[trace("blocking_delete")]
+ fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ self.inner.blocking_delete(path, args)
+ }
+
+ fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
+ let span = Span::enter_with_local_parent("blocking_list");
+ self.inner.blocking_list(path, args).map(|(rp, it)| {
+ (
+ rp,
+ MinitraceWrapper::new(Span::enter_with_parent("PageOperation",
&span), it),
+ )
+ })
+ }
+
+ fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::BlockingPager)> {
+ let span = Span::enter_with_local_parent("blocking_scan");
+ self.inner.blocking_scan(path, args).map(|(rp, it)| {
+ (
+ rp,
+ MinitraceWrapper::new(Span::enter_with_parent("PageOperation",
&span), it),
+ )
+ })
+ }
+}
+
+pub struct MinitraceWrapper<R> {
+ span: Span,
+ inner: R,
+}
+
+impl<R> MinitraceWrapper<R> {
+ fn new(span: Span, inner: R) -> Self {
+ Self { span, inner }
+ }
+}
+
+impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
+ fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<Result<usize>> {
+ let _span = Span::enter_with_parent(ReadOperation::Read.into_static(),
&self.span);
+ self.inner.poll_read(cx, buf)
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) ->
Poll<Result<u64>> {
+ let _span = Span::enter_with_parent(ReadOperation::Seek.into_static(),
&self.span);
+ self.inner.poll_seek(cx, pos)
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ let _span = Span::enter_with_parent(ReadOperation::Next.into_static(),
&self.span);
+ self.inner.poll_next(cx)
+ }
+}
+
+impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
+ fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ let _span =
Span::enter_with_parent(ReadOperation::BlockingRead.into_static(), &self.span);
+ self.inner.read(buf)
+ }
+
+ fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
+ let _span =
Span::enter_with_parent(ReadOperation::BlockingSeek.into_static(), &self.span);
+ self.inner.seek(pos)
+ }
+
+ fn next(&mut self) -> Option<Result<Bytes>> {
+ let _span =
Span::enter_with_parent(ReadOperation::BlockingNext.into_static(), &self.span);
+ self.inner.next()
+ }
+}
+
+#[async_trait]
+impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ self.inner
+ .write(bs)
+ .in_span(Span::enter_with_parent(
+ WriteOperation::Write.into_static(),
+ &self.span,
+ ))
+ .await
+ }
+
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ self.inner
+ .append(bs)
+ .in_span(Span::enter_with_parent(
+ WriteOperation::Append.into_static(),
+ &self.span,
+ ))
+ .await
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ self.inner
+ .close()
+ .in_span(Span::enter_with_parent(
+ WriteOperation::Close.into_static(),
+ &self.span,
+ ))
+ .await
+ }
+}
+
+impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
+ fn write(&mut self, bs: Bytes) -> Result<()> {
+ let _span =
+
Span::enter_with_parent(WriteOperation::BlockingWrite.into_static(),
&self.span);
+ self.inner.write(bs)
+ }
+
+ fn append(&mut self, bs: Bytes) -> Result<()> {
+ let _span =
+
Span::enter_with_parent(WriteOperation::BlockingAppend.into_static(),
&self.span);
+ self.inner.append(bs)
+ }
+
+ fn close(&mut self) -> Result<()> {
+ let _span =
+
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(),
&self.span);
+ self.inner.close()
+ }
+}
+
+#[async_trait]
+impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
+ async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+ self.inner
+ .next()
+ .in_span(Span::enter_with_parent(
+ PageOperation::Next.into_static(),
+ &self.span,
+ ))
+ .await
+ }
+}
+
+impl<R: oio::BlockingPage> oio::BlockingPage for MinitraceWrapper<R> {
+ fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+ let _span =
Span::enter_with_parent(PageOperation::BlockingNext.into_static(), &self.span);
+ self.inner.next()
+ }
+}
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 26a949dc..8c64455f 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -44,6 +44,11 @@ mod tracing;
#[cfg(feature = "layers-tracing")]
pub use self::tracing::TracingLayer;
+#[cfg(feature = "layers-minitrace")]
+mod minitrace;
+#[cfg(feature = "layers-minitrace")]
+pub use self::minitrace::MinitraceLayer;
+
mod type_eraser;
pub(crate) use type_eraser::TypeEraseLayer;