This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 603ce61d4 fix(service/minitrace): should set local parent (#2620)
603ce61d4 is described below

commit 603ce61d443493df4e9594bb5e205bc152c2aeea
Author: Andy Lok <[email protected]>
AuthorDate: Wed Jul 12 15:32:49 2023 +0800

    fix(service/minitrace): should set local parent (#2620)
---
 core/src/layers/minitrace.rs | 115 +++++++++++++++++++++++++++++++++----------
 1 file changed, 89 insertions(+), 26 deletions(-)

diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index c7d7165e6..9f6516cb8 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -25,6 +25,7 @@ use bytes::Bytes;
 use futures::FutureExt;
 use minitrace::prelude::*;
 
+use crate::raw::oio::AppendOperation;
 use crate::raw::oio::PageOperation;
 use crate::raw::oio::ReadOperation;
 use crate::raw::oio::WriteOperation;
@@ -134,7 +135,7 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
     type BlockingReader = MinitraceWrapper<A::BlockingReader>;
     type Writer = MinitraceWrapper<A::Writer>;
     type BlockingWriter = MinitraceWrapper<A::BlockingWriter>;
-    type Appender = A::Appender;
+    type Appender = MinitraceWrapper<A::Appender>;
     type Pager = MinitraceWrapper<A::Pager>;
     type BlockingPager = MinitraceWrapper<A::BlockingPager>;
 
@@ -142,7 +143,7 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
         &self.inner
     }
 
-    #[trace("metadata")]
+    #[trace]
     fn metadata(&self) -> AccessorInfo {
         self.inner.info()
     }
@@ -152,24 +153,49 @@ impl<A: Accessor> LayeredAccessor for 
MinitraceAccessor<A> {
         self.inner.create_dir(path, args).await
     }
 
+    #[trace(enter_on_poll = true)]
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let span = Span::enter_with_local_parent("read");
         self.inner
             .read(path, args)
-            .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+            .map(|v| {
+                v.map(|(rp, r)| {
+                    (
+                        rp,
+                        
MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
+                    )
+                })
+            })
             .await
     }
 
+    #[trace(enter_on_poll = true)]
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let span = Span::enter_with_local_parent("write");
         self.inner
             .write(path, args)
-            .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
+            .map(|v| {
+                v.map(|(rp, r)| {
+                    (
+                        rp,
+                        
MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
+                    )
+                })
+            })
             .await
     }
 
+    #[trace(enter_on_poll = true)]
     async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
-        self.inner.append(path, args).await
+        self.inner
+            .append(path, args)
+            .map(|v| {
+                v.map(|(rp, r)| {
+                    (
+                        rp,
+                        
MinitraceWrapper::new(Span::enter_with_local_parent("AppendOperation"), r),
+                    )
+                })
+            })
+            .await
     }
 
     #[trace(enter_on_poll = true)]
@@ -192,11 +218,18 @@ impl<A: Accessor> LayeredAccessor for 
MinitraceAccessor<A> {
         self.inner.delete(path, args).await
     }
 
+    #[trace(enter_on_poll = true)]
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
-        let span = Span::enter_with_local_parent("list");
         self.inner
             .list(path, args)
-            .map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s))))
+            .map(|v| {
+                v.map(|(rp, s)| {
+                    (
+                        rp,
+                        
MinitraceWrapper::new(Span::enter_with_local_parent("ListOperation"), s),
+                    )
+                })
+            })
             .await
     }
 
@@ -215,22 +248,22 @@ impl<A: Accessor> LayeredAccessor for 
MinitraceAccessor<A> {
         self.inner.blocking_create_dir(path, args)
     }
 
+    #[trace]
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        let span = Span::enter_with_local_parent("blocking_read");
         self.inner.blocking_read(path, args).map(|(rp, r)| {
             (
                 rp,
-                MinitraceWrapper::new(Span::enter_with_parent("ReadOperation", 
&span), r),
+                
MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
             )
         })
     }
 
+    #[trace]
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        let span = Span::enter_with_local_parent("blocking_write");
         self.inner.blocking_write(path, args).map(|(rp, r)| {
             (
                 rp,
-                
MinitraceWrapper::new(Span::enter_with_parent("WriteOperation", &span), r),
+                
MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
             )
         })
     }
@@ -255,12 +288,12 @@ impl<A: Accessor> LayeredAccessor for 
MinitraceAccessor<A> {
         self.inner.blocking_delete(path, args)
     }
 
+    #[trace]
     fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
-        let span = Span::enter_with_local_parent("blocking_list");
         self.inner.blocking_list(path, args).map(|(rp, it)| {
             (
                 rp,
-                MinitraceWrapper::new(Span::enter_with_parent("PageOperation", 
&span), it),
+                
MinitraceWrapper::new(Span::enter_with_local_parent("PageOperation"), it),
             )
         })
     }
@@ -279,34 +312,40 @@ impl<R> MinitraceWrapper<R> {
 
 impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
     fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
-        let _span = Span::enter_with_parent(ReadOperation::Read.into_static(), 
&self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::Read.into_static());
         self.inner.poll_read(cx, buf)
     }
 
     fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> 
Poll<Result<u64>> {
-        let _span = Span::enter_with_parent(ReadOperation::Seek.into_static(), 
&self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::Seek.into_static());
         self.inner.poll_seek(cx, pos)
     }
 
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
-        let _span = Span::enter_with_parent(ReadOperation::Next.into_static(), 
&self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::Next.into_static());
         self.inner.poll_next(cx)
     }
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        let _span = 
Span::enter_with_parent(ReadOperation::BlockingRead.into_static(), &self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
         self.inner.read(buf)
     }
 
     fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
-        let _span = 
Span::enter_with_parent(ReadOperation::BlockingSeek.into_static(), &self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::BlockingSeek.into_static());
         self.inner.seek(pos)
     }
 
     fn next(&mut self) -> Option<Result<Bytes>> {
-        let _span = 
Span::enter_with_parent(ReadOperation::BlockingNext.into_static(), &self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::BlockingNext.into_static());
         self.inner.next()
     }
 }
@@ -356,18 +395,41 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        let _span =
-            
Span::enter_with_parent(WriteOperation::BlockingWrite.into_static(), 
&self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
         self.inner.write(bs)
     }
 
     fn close(&mut self) -> Result<()> {
-        let _span =
-            
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(), 
&self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(WriteOperation::BlockingClose.into_static());
         self.inner.close()
     }
 }
 
+#[async_trait]
+impl<R: oio::Append> oio::Append for MinitraceWrapper<R> {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        self.inner
+            .append(bs)
+            .in_span(Span::enter_with_parent(
+                AppendOperation::Append.into_static(),
+                &self.span,
+            ))
+            .await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        self.inner
+            .close()
+            .in_span(Span::enter_with_parent(
+                AppendOperation::Close.into_static(),
+                &self.span,
+            ))
+            .await
+    }
+}
+
 #[async_trait]
 impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
     async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
@@ -383,7 +445,8 @@ impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
 
 impl<R: oio::BlockingPage> oio::BlockingPage for MinitraceWrapper<R> {
     fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        let _span = 
Span::enter_with_parent(PageOperation::BlockingNext.into_static(), &self.span);
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(PageOperation::BlockingNext.into_static());
         self.inner.next()
     }
 }

Reply via email to