This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 787c3c8d feat: Add OpenTelemetry Trace Layer (#2001)
787c3c8d is described below

commit 787c3c8dd0eed1fabd676e943163e1a3a70a70fc
Author: teckick <[email protected]>
AuthorDate: Tue Apr 18 22:01:11 2023 +0800

    feat: Add OpenTelemetry Trace Layer (#2001)
    
    * temp commit
    
    * temp commit
    
    * temp commit
    
    * temp commit
    
    * temp commit
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * temp commit
    
    * fix
    
    * fix impl
    
    * fix
    
    * implement OtelTraceWrapper
    
    * fix feature name
    
    * remove unused import
    
    * fix OtelTraceWrapper
---
 core/Cargo.toml              |   3 +
 core/src/layers/mod.rs       |   5 +
 core/src/layers/oteltrace.rs | 380 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 388 insertions(+)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index ead02145..13c4e9e2 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -88,6 +88,8 @@ layers-madsim = ["dep:madsim"]
 layers-minitrace = ["dep:minitrace"]
 # Enable layers tracing support.
 layers-tracing = ["dep:tracing"]
+# Enable layers oteltrace support.
+layers-otel-trace = ["dep:opentelemetry"]
 
 services-azblob = [
   "dep:reqsign",
@@ -172,6 +174,7 @@ metrics = { version = "0.20", optional = true }
 minitrace = { version = "0.4.0", optional = true }
 moka = { version = "0.10", optional = true, features = ["future"] }
 once_cell = "1"
+opentelemetry = { version = "0.19.0", optional = true }
 parking_lot = "0.12"
 percent-encoding = "2"
 pin-project = "1"
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index e18045f0..c0b5ba63 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -73,3 +73,8 @@ pub use self::madsim::MadsimLayer;
 #[cfg(feature = "layers-madsim")]
 #[cfg(madsim)]
 pub use self::madsim::MadsimServer;
+
+#[cfg(feature = "layers-otel-trace")]
+mod oteltrace;
+#[cfg(feature = "layers-otel-trace")]
+pub use self::oteltrace::OtelTraceLayer;
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
new file mode 100644
index 00000000..02b8129c
--- /dev/null
+++ b/core/src/layers/oteltrace.rs
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io;
+use std::task;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::FutureExt;
+use opentelemetry::global;
+use opentelemetry::global::BoxedSpan;
+use opentelemetry::trace::FutureExt as TraceFutureExt;
+use opentelemetry::trace::Span;
+use opentelemetry::trace::TraceContextExt;
+use opentelemetry::trace::Tracer;
+use opentelemetry::Context;
+use opentelemetry::KeyValue;
+
+use crate::ops::*;
+use crate::raw::*;
+use crate::*;
+
+/// Add 
[opentelemetry::trace](https://docs.rs/opentelemetry/latest/opentelemetry/trace/index.html)
 for every operations.
+///
+/// Examples
+///
+/// ## Basic Setup
+///
+/// ```
+/// use anyhow::Result;
+/// use opendal::layers::OtelTraceLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+///
+/// let _ = Operator::new(services::Memory::default())
+///     .expect("must init")
+///     .layer(OtelTraceLayer)
+///     .finish();
+/// ```
+pub struct OtelTraceLayer;
+
+impl<A: Accessor> Layer<A> for OtelTraceLayer {
+    type LayeredAccessor = OtelTraceAccessor<A>;
+
+    fn layer(&self, inner: A) -> Self::LayeredAccessor {
+        OtelTraceAccessor { inner }
+    }
+}
+
+#[derive(Debug)]
+pub struct OtelTraceAccessor<A> {
+    inner: A,
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for OtelTraceAccessor<A> {
+    type Inner = A;
+    type Reader = OtelTraceWrapper<A::Reader>;
+    type BlockingReader = OtelTraceWrapper<A::BlockingReader>;
+    type Writer = OtelTraceWrapper<A::Writer>;
+    type BlockingWriter = OtelTraceWrapper<A::BlockingWriter>;
+    type Pager = OtelTraceWrapper<A::Pager>;
+    type BlockingPager = OtelTraceWrapper<A::BlockingPager>;
+
+    fn inner(&self) -> &Self::Inner {
+        &self.inner
+    }
+
+    fn metadata(&self) -> AccessorInfo {
+        let tracer = global::tracer("opendal");
+        tracer.in_span("metadata", |_cx| self.inner.info())
+    }
+
+    async fn create_dir(&self, path: &str, args: OpCreate) -> Result<RpCreate> 
{
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("create");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        let cx = Context::current_with_span(span);
+        self.inner.create_dir(path, args).with_context(cx).await
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("read");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .read(path, args)
+            .map(|v| v.map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r))))
+            .await
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("write");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .write(path, args)
+            .await
+            .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
+    }
+
+    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("copy");
+        span.set_attribute(KeyValue::new("from", from.to_string()));
+        span.set_attribute(KeyValue::new("to", to.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        let cx = Context::current_with_span(span);
+        self.inner().copy(from, to, args).with_context(cx).await
+    }
+
+    async fn rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("rename");
+        span.set_attribute(KeyValue::new("from", from.to_string()));
+        span.set_attribute(KeyValue::new("to", to.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        let cx = Context::current_with_span(span);
+        self.inner().rename(from, to, args).with_context(cx).await
+    }
+
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("stat");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        let cx = Context::current_with_span(span);
+        self.inner().stat(path, args).with_context(cx).await
+    }
+
+    async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("delete");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        let cx = Context::current_with_span(span);
+        self.inner().delete(path, args).with_context(cx).await
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("list");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .list(path, args)
+            .map(|v| v.map(|(rp, s)| (rp, OtelTraceWrapper::new(span, s))))
+            .await
+    }
+
+    async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::Pager)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("scan");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .scan(path, args)
+            .map(|v| v.map(|(rp, s)| (rp, OtelTraceWrapper::new(span, s))))
+            .await
+    }
+
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("batch");
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        let cx = Context::current_with_span(span);
+        self.inner().batch(args).with_context(cx).await
+    }
+
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("presign");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        let cx = Context::current_with_span(span);
+        self.inner().presign(path, args).with_context(cx).await
+    }
+
+    fn blocking_create_dir(&self, path: &str, args: OpCreate) -> 
Result<RpCreate> {
+        let tracer = global::tracer("opendal");
+        tracer.in_span("blocking_create_dir", |cx| {
+            let span = cx.span(); // let mut span = cx.();
+            span.set_attribute(KeyValue::new("path", path.to_string()));
+            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+            self.inner().blocking_create_dir(path, args)
+        })
+    }
+
+    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("blocking_read");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .blocking_read(path, args)
+            .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
+    }
+
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("blocking_write");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .blocking_write(path, args)
+            .map(|(rp, r)| (rp, OtelTraceWrapper::new(span, r)))
+    }
+
+    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> 
Result<RpCopy> {
+        let tracer = global::tracer("opendal");
+        tracer.in_span("blocking_copy", |cx| {
+            let span = cx.span();
+            span.set_attribute(KeyValue::new("from", from.to_string()));
+            span.set_attribute(KeyValue::new("to", to.to_string()));
+            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+            self.inner().blocking_copy(from, to, args)
+        })
+    }
+
+    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
+        let tracer = global::tracer("opendal");
+        tracer.in_span("blocking_rename", |cx| {
+            let span = cx.span();
+            span.set_attribute(KeyValue::new("from", from.to_string()));
+            span.set_attribute(KeyValue::new("to", to.to_string()));
+            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+            self.inner().blocking_rename(from, to, args)
+        })
+    }
+
+    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        let tracer = global::tracer("opendal");
+        tracer.in_span("blocking_stat", |cx| {
+            let span = cx.span();
+            span.set_attribute(KeyValue::new("path", path.to_string()));
+            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+            self.inner().blocking_stat(path, args)
+        })
+    }
+
+    fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        let tracer = global::tracer("opendal");
+        tracer.in_span("blocking_delete", |cx| {
+            let span = cx.span();
+            span.set_attribute(KeyValue::new("path", path.to_string()));
+            span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+            self.inner().blocking_delete(path, args)
+        })
+    }
+
+    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("blocking_list");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .blocking_list(path, args)
+            .map(|(rp, it)| (rp, OtelTraceWrapper::new(span, it)))
+    }
+
+    fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::BlockingPager)> {
+        let tracer = global::tracer("opendal");
+        let mut span = tracer.start("blocking_scan");
+        span.set_attribute(KeyValue::new("path", path.to_string()));
+        span.set_attribute(KeyValue::new("args", format!("{:?}", args)));
+        self.inner
+            .blocking_scan(path, args)
+            .map(|(rp, it)| (rp, OtelTraceWrapper::new(span, it)))
+    }
+}
+
+pub struct OtelTraceWrapper<R> {
+    _span: BoxedSpan,
+    inner: R,
+}
+
+impl<R> OtelTraceWrapper<R> {
+    fn new(_span: BoxedSpan, inner: R) -> Self {
+        Self { _span, inner }
+    }
+}
+
+impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
+    fn poll_read(
+        &mut self,
+        cx: &mut task::Context<'_>,
+        buf: &mut [u8],
+    ) -> task::Poll<Result<usize>> {
+        self.inner.poll_read(cx, buf)
+    }
+
+    fn poll_seek(
+        &mut self,
+        cx: &mut task::Context<'_>,
+        pos: io::SeekFrom,
+    ) -> task::Poll<Result<u64>> {
+        self.inner.poll_seek(cx, pos)
+    }
+
+    fn poll_next(&mut self, cx: &mut task::Context<'_>) -> 
task::Poll<Option<Result<Bytes>>> {
+        self.inner.poll_next(cx)
+    }
+}
+
+impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        self.inner.read(buf)
+    }
+
+    fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64> {
+        self.inner.seek(pos)
+    }
+
+    fn next(&mut self) -> Option<Result<Bytes>> {
+        self.inner.next()
+    }
+}
+
+#[async_trait]
+impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        self.inner.write(bs).with_context(cx).await
+    }
+
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        self.inner.append(bs).with_context(cx).await
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.inner.abort().with_context(cx).await
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().with_context(cx).await
+    }
+}
+
+impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
+    fn write(&mut self, bs: Bytes) -> Result<()> {
+        self.inner.write(bs)
+    }
+
+    fn append(&mut self, bs: Bytes) -> Result<()> {
+        self.inner.append(bs)
+    }
+
+    fn close(&mut self) -> Result<()> {
+        self.inner.close()
+    }
+}
+
+#[async_trait]
+impl<R: oio::Page> oio::Page for OtelTraceWrapper<R> {
+    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        self.inner.next().await
+    }
+}
+
+impl<R: oio::BlockingPage> oio::BlockingPage for OtelTraceWrapper<R> {
+    fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        self.inner.next()
+    }
+}

Reply via email to