This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 603ce61d4 fix(service/minitrace): should set local parent (#2620)
603ce61d4 is described below
commit 603ce61d443493df4e9594bb5e205bc152c2aeea
Author: Andy Lok <[email protected]>
AuthorDate: Wed Jul 12 15:32:49 2023 +0800
fix(service/minitrace): should set local parent (#2620)
---
core/src/layers/minitrace.rs | 115 +++++++++++++++++++++++++++++++++----------
1 file changed, 89 insertions(+), 26 deletions(-)
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index c7d7165e6..9f6516cb8 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -25,6 +25,7 @@ use bytes::Bytes;
use futures::FutureExt;
use minitrace::prelude::*;
+use crate::raw::oio::AppendOperation;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
@@ -134,7 +135,7 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
type BlockingReader = MinitraceWrapper<A::BlockingReader>;
type Writer = MinitraceWrapper<A::Writer>;
type BlockingWriter = MinitraceWrapper<A::BlockingWriter>;
- type Appender = A::Appender;
+ type Appender = MinitraceWrapper<A::Appender>;
type Pager = MinitraceWrapper<A::Pager>;
type BlockingPager = MinitraceWrapper<A::BlockingPager>;
@@ -142,7 +143,7 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
&self.inner
}
- #[trace("metadata")]
+ #[trace]
fn metadata(&self) -> AccessorInfo {
self.inner.info()
}
@@ -152,24 +153,49 @@ impl<A: Accessor> LayeredAccessor for
MinitraceAccessor<A> {
self.inner.create_dir(path, args).await
}
+ #[trace(enter_on_poll = true)]
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let span = Span::enter_with_local_parent("read");
self.inner
.read(path, args)
- .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+ .map(|v| {
+ v.map(|(rp, r)| {
+ (
+ rp,
+
MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
+ )
+ })
+ })
.await
}
+ #[trace(enter_on_poll = true)]
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let span = Span::enter_with_local_parent("write");
self.inner
.write(path, args)
- .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+ .map(|v| {
+ v.map(|(rp, r)| {
+ (
+ rp,
+
MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
+ )
+ })
+ })
.await
}
+ #[trace(enter_on_poll = true)]
async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
- self.inner.append(path, args).await
+ self.inner
+ .append(path, args)
+ .map(|v| {
+ v.map(|(rp, r)| {
+ (
+ rp,
+
MinitraceWrapper::new(Span::enter_with_local_parent("AppendOperation"), r),
+ )
+ })
+ })
+ .await
}
#[trace(enter_on_poll = true)]
@@ -192,11 +218,18 @@ impl<A: Accessor> LayeredAccessor for
MinitraceAccessor<A> {
self.inner.delete(path, args).await
}
+ #[trace(enter_on_poll = true)]
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let span = Span::enter_with_local_parent("list");
self.inner
.list(path, args)
- .map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s))))
+ .map(|v| {
+ v.map(|(rp, s)| {
+ (
+ rp,
+
MinitraceWrapper::new(Span::enter_with_local_parent("ListOperation"), s),
+ )
+ })
+ })
.await
}
@@ -215,22 +248,22 @@ impl<A: Accessor> LayeredAccessor for
MinitraceAccessor<A> {
self.inner.blocking_create_dir(path, args)
}
+ #[trace]
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
- let span = Span::enter_with_local_parent("blocking_read");
self.inner.blocking_read(path, args).map(|(rp, r)| {
(
rp,
- MinitraceWrapper::new(Span::enter_with_parent("ReadOperation",
&span), r),
+
MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
)
})
}
+ #[trace]
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
- let span = Span::enter_with_local_parent("blocking_write");
self.inner.blocking_write(path, args).map(|(rp, r)| {
(
rp,
-
MinitraceWrapper::new(Span::enter_with_parent("WriteOperation", &span), r),
+
MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
)
})
}
@@ -255,12 +288,12 @@ impl<A: Accessor> LayeredAccessor for
MinitraceAccessor<A> {
self.inner.blocking_delete(path, args)
}
+ #[trace]
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
- let span = Span::enter_with_local_parent("blocking_list");
self.inner.blocking_list(path, args).map(|(rp, it)| {
(
rp,
- MinitraceWrapper::new(Span::enter_with_parent("PageOperation",
&span), it),
+
MinitraceWrapper::new(Span::enter_with_local_parent("PageOperation"), it),
)
})
}
@@ -279,34 +312,40 @@ impl<R> MinitraceWrapper<R> {
impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<Result<usize>> {
- let _span = Span::enter_with_parent(ReadOperation::Read.into_static(),
&self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(ReadOperation::Read.into_static());
self.inner.poll_read(cx, buf)
}
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) ->
Poll<Result<u64>> {
- let _span = Span::enter_with_parent(ReadOperation::Seek.into_static(),
&self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(ReadOperation::Seek.into_static());
self.inner.poll_seek(cx, pos)
}
fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
- let _span = Span::enter_with_parent(ReadOperation::Next.into_static(),
&self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(ReadOperation::Next.into_static());
self.inner.poll_next(cx)
}
}
impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- let _span =
Span::enter_with_parent(ReadOperation::BlockingRead.into_static(), &self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
self.inner.read(buf)
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
- let _span =
Span::enter_with_parent(ReadOperation::BlockingSeek.into_static(), &self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(ReadOperation::BlockingSeek.into_static());
self.inner.seek(pos)
}
fn next(&mut self) -> Option<Result<Bytes>> {
- let _span =
Span::enter_with_parent(ReadOperation::BlockingNext.into_static(), &self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(ReadOperation::BlockingNext.into_static());
self.inner.next()
}
}
@@ -356,18 +395,41 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
fn write(&mut self, bs: Bytes) -> Result<()> {
- let _span =
-
Span::enter_with_parent(WriteOperation::BlockingWrite.into_static(),
&self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
}
fn close(&mut self) -> Result<()> {
- let _span =
-
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(),
&self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(WriteOperation::BlockingClose.into_static());
self.inner.close()
}
}
+#[async_trait]
+impl<R: oio::Append> oio::Append for MinitraceWrapper<R> {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ self.inner
+ .append(bs)
+ .in_span(Span::enter_with_parent(
+ AppendOperation::Append.into_static(),
+ &self.span,
+ ))
+ .await
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ self.inner
+ .close()
+ .in_span(Span::enter_with_parent(
+ AppendOperation::Close.into_static(),
+ &self.span,
+ ))
+ .await
+ }
+}
+
#[async_trait]
impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
@@ -383,7 +445,8 @@ impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
impl<R: oio::BlockingPage> oio::BlockingPage for MinitraceWrapper<R> {
fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
- let _span =
Span::enter_with_parent(PageOperation::BlockingNext.into_static(), &self.span);
+ let _g = self.span.set_local_parent();
+ let _span =
LocalSpan::enter_with_local_parent(PageOperation::BlockingNext.into_static());
self.inner.next()
}
}