This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch impl_opentel_runtime_sdk
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 0e03b65d4caa3424ef019d5ecec0dbe48226f815
Author: numminex <[email protected]>
AuthorDate: Sat Sep 27 13:38:29 2025 +0200

    feat(io_uring): implement the runtime trait from opentelemetry_sdk
---
 core/server/Cargo.toml         |  2 -
 core/server/src/log/logger.rs  |  5 ++-
 core/server/src/log/mod.rs     |  1 +
 core/server/src/log/runtime.rs | 83 ++++++++++++++++++++++++++++++------------
 4 files changed, 64 insertions(+), 27 deletions(-)

diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index c2c241e8..64f71a31 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -92,10 +92,8 @@ opentelemetry-otlp = { version = "0.30.0", features = [
 ] }
 opentelemetry-semantic-conventions = "0.30.0"
 opentelemetry_sdk = { version = "0.30.0", features = [
-    "rt-tokio",
     "logs",
     "trace",
-    "tokio",
     "experimental_async_runtime",
     "experimental_logs_batch_log_processor_with_async_runtime",
     "experimental_trace_batch_span_processor_with_async_runtime",
diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs
index c83da4c8..59725685 100644
--- a/core/server/src/log/logger.rs
+++ b/core/server/src/log/logger.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::log::runtime::CompioRuntime;
 use crate::VERSION;
 use crate::configs::server::{TelemetryConfig, TelemetryTransport};
 use crate::configs::system::LoggingConfig;
@@ -217,7 +218,7 @@ impl Logging {
                     .with_log_processor(
                         
log_processor_with_async_runtime::BatchLogProcessor::builder(
                             log_exporter,
-                            runtime::Tokio,
+                            CompioRuntime,
                         )
                         .build(),
                     )
@@ -249,7 +250,7 @@ impl Logging {
                     .with_span_processor(
                         
span_processor_with_async_runtime::BatchSpanProcessor::builder(
                             trace_exporter,
-                            runtime::Tokio,
+                            CompioRuntime
                         )
                         .build(),
                     )
diff --git a/core/server/src/log/mod.rs b/core/server/src/log/mod.rs
index 2af4bb33..f3acb50b 100644
--- a/core/server/src/log/mod.rs
+++ b/core/server/src/log/mod.rs
@@ -16,3 +16,4 @@
  * under the License.
  */
 pub mod logger;
+pub mod runtime;
diff --git a/core/server/src/log/runtime.rs b/core/server/src/log/runtime.rs
index 36bbdd80..1683b34a 100644
--- a/core/server/src/log/runtime.rs
+++ b/core/server/src/log/runtime.rs
@@ -1,48 +1,85 @@
-use std::{pin::Pin, time::Duration};
+use std::{pin::Pin, task::Poll, time::Duration};
 
-use futures::{FutureExt, SinkExt, Stream};
+use futures::{channel::mpsc, future::poll_fn, FutureExt, SinkExt, Stream, 
StreamExt};
 use opentelemetry_sdk::runtime::{Runtime, RuntimeChannel, TrySend};
 
 #[derive(Clone)]
-pub struct MonoioRuntime;
+pub struct CompioRuntime;
 
-impl Runtime for MonoioRuntime {
+impl Runtime for CompioRuntime {
     fn spawn<F>(&self, future: F)
     where
         F: Future<Output = ()> + Send + 'static,
     {
-        // TODO: This wont' work, we init Opentelemetry in the main thread, 
when there is no instance of monoio runtime
-        // running yet....
-        monoio::spawn(future);
+        // It's fine to detach this task, the documentation for `spawn` method 
on `Runtime` trait says:
+        //
+        //
+        // "This is mainly used to run batch span processing in the 
background. Note, that the function
+        // does not return a handle. OpenTelemetry will use a different way to 
wait for the future to
+        // finish when the caller shuts down.""
+        compio::runtime::spawn(future).detach();
     }
 
     fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 
'static {
-        let sleep = Sleep::new(duration);
-        sleep
+        compio::time::sleep(duration)
     }
 }
 
+#[derive(Debug)]
+pub struct CompioSender<T> {
+    sender: mpsc::UnboundedSender<T>,
+}
+
+impl<T> CompioSender<T> {
+    pub fn new(sender: mpsc::UnboundedSender<T>) -> Self {
+        Self { sender }
+    }
+}   
 
-pub struct Sleep {
-    pub inner: Pin<Box<monoio::time::Sleep>>,
+// Safety: Since we use compio runtime which is single-threaded, or rather the 
Future: !Send + !Sync, 
+// we can implement those traits, to satisfy the trait bounds from `Runtime` 
and `RuntimeChannel` traits.
+unsafe impl<T> Send for CompioSender<T> {}
+unsafe impl<T> Sync for CompioSender<T> {}
+
+impl<T: std::fmt::Debug + Send> TrySend for CompioSender<T>  {
+    type Message = T;
+
+    fn try_send(&self, item: Self::Message) -> Result<(), 
opentelemetry_sdk::runtime::TrySendError> {
+        self.sender.unbounded_send(item).map_err(|_err| {
+            // Unbounded channels can only fail if disconnected, never full
+            opentelemetry_sdk::runtime::TrySendError::ChannelClosed
+        })
+    }
 }
 
-impl Sleep {
-    pub fn new(duration: Duration) -> Self {
-        Self {
-            inner: Box::pin(monoio::time::sleep(duration)),
-        }
+pub struct CompioReceiver<T> {
+    receiver: mpsc::UnboundedReceiver<T>,
+}
+
+impl<T> CompioReceiver<T> {
+    pub fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
+        Self { receiver }
     }
 }
 
-impl Future for Sleep {
-    type Output = ();
+impl<T: std::fmt::Debug + Send> Stream for CompioReceiver<T> {
+    type Item = T;
 
-    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut 
std::task::Context<'_>) -> std::task::Poll<Self::Output> {
-        self.inner.as_mut().poll_unpin(cx)
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<Option<Self::Item>> {
+        self.receiver.poll_next_unpin(cx)
     }
 }
 
-// Safety: There is no way for `Sleep` future to be flipped like a burger to 
another thread,
-// because we create instance of OpenTelemetry SDK runtime in the main thread, 
and monoio futures don't require Send & Sync bounds.
-unsafe impl Send for Sleep {}
\ No newline at end of file
+impl RuntimeChannel for CompioRuntime {
+    type Receiver<T: std::fmt::Debug + Send> = CompioReceiver<T>;
+    type Sender<T: std::fmt::Debug + Send> = CompioSender<T>;
+
+    fn batch_message_channel<T: std::fmt::Debug + Send>(
+        &self,
+        _capacity: usize,
+    ) -> (Self::Sender<T>, Self::Receiver<T>) {
+        // Use the unbounded channel, this trait is used for batch processing, 
which naturally will limit the number of messages.
+        let (sender, receiver) = mpsc::unbounded();
+        (CompioSender::new(sender), CompioReceiver::new(receiver))
+    }
+}

Reply via email to