This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch connectors-mcp-otel
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 95d91076961d8d1f623db7fe8938e30da49f0dd3
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Fri Jan 23 08:14:54 2026 +0100

    feat(connectors,mcp): implement logging with telemetry
---
 Cargo.lock                                         |  15 ++
 Cargo.toml                                         |  19 ++
 core/ai/mcp/Cargo.toml                             |   7 +
 core/ai/mcp/config.toml                            |  12 ++
 core/ai/mcp/src/configs.rs                         | 108 ++++++++++-
 core/ai/mcp/src/log.rs                             | 197 +++++++++++++++++++++
 core/ai/mcp/src/main.rs                            |  20 +--
 core/connectors/runtime/Cargo.toml                 |   7 +
 core/connectors/runtime/config.toml                |  12 ++
 core/connectors/runtime/example_config/config.toml |   3 +
 core/connectors/runtime/src/configs/runtime.rs     | 106 ++++++++++-
 core/connectors/runtime/src/log.rs                 | 181 +++++++++++++++++++
 core/connectors/runtime/src/main.rs                |  21 ++-
 core/connectors/runtime/src/sink.rs                |   8 +-
 core/connectors/runtime/src/source.rs              |   2 +
 core/connectors/sdk/src/lib.rs                     |   2 +
 core/connectors/sdk/src/log.rs                     |  96 ++++++++++
 core/connectors/sdk/src/sink.rs                    |  17 +-
 core/connectors/sdk/src/source.rs                  |   8 +-
 core/server/Cargo.toml                             |  25 +--
 docker-compose.yml                                 |  56 +++---
 greptime_users                                     |   1 +
 otel-collector-config.yaml                         | 110 ++++++++++++
 23 files changed, 956 insertions(+), 77 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 5f7fceb1a..69a0d1650 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4578,6 +4578,7 @@ dependencies = [
  "chrono",
  "clap",
  "dashmap",
+ "derive_more",
  "dirs",
  "dlopen2",
  "dotenvy",
@@ -4590,6 +4591,11 @@ dependencies = [
  "iggy_connector_sdk",
  "mimalloc",
  "once_cell",
+ "opentelemetry",
+ "opentelemetry-appender-tracing",
+ "opentelemetry-otlp",
+ "opentelemetry-semantic-conventions",
+ "opentelemetry_sdk",
  "postcard",
  "reqwest",
  "reqwest-middleware",
@@ -4606,6 +4612,7 @@ dependencies = [
  "toml 0.9.11+spec-1.1.0",
  "tower-http",
  "tracing",
+ "tracing-opentelemetry",
  "tracing-subscriber",
 ]
 
@@ -4621,6 +4628,12 @@ dependencies = [
  "figment",
  "iggy",
  "iggy_common",
+ "opentelemetry",
+ "opentelemetry-appender-tracing",
+ "opentelemetry-otlp",
+ "opentelemetry-semantic-conventions",
+ "opentelemetry_sdk",
+ "reqwest",
  "rmcp",
  "serde",
  "serde_json",
@@ -4630,6 +4643,7 @@ dependencies = [
  "tokio",
  "tower-http",
  "tracing",
+ "tracing-opentelemetry",
  "tracing-subscriber",
 ]
 
@@ -6350,6 +6364,7 @@ dependencies = [
  "rand 0.9.2",
  "thiserror 2.0.17",
  "tokio",
+ "tokio-stream",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 802dde2d8..ec01b4e00 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -150,6 +150,24 @@ mockall = "0.14.0"
 nix = { version = "0.30.1", features = ["fs", "resource", "sched"] }
 nonzero_lit = "0.1.2"
 once_cell = "1.21.3"
+opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
+opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] }
+opentelemetry-otlp = { version = "0.31.0", features = [
+    "logs",
+    "trace",
+    "grpc-tonic",
+    "http",
+    "http-proto",
+    "reqwest-client",
+] }
+opentelemetry-semantic-conventions = "0.31.0"
+opentelemetry_sdk = { version = "0.31.0", features = [
+    "logs",
+    "trace",
+    "experimental_async_runtime",
+    "experimental_logs_batch_log_processor_with_async_runtime",
+    "experimental_trace_batch_span_processor_with_async_runtime",
+] }
 parquet = "=55.2.0"
 passterm = "=2.0.1"
 postcard = { version = "1.1.3", features = ["alloc"] }
@@ -196,6 +214,7 @@ tower-http = { version = "0.6.8", features = [
 ] }
 tracing = "0.1.44"
 tracing-appender = "0.2.4"
+tracing-opentelemetry = "0.32.1"
 tracing-subscriber = { version = "0.3.22", default-features = false, features 
= [
     "fmt",
     "env-filter",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index ca729329d..85d60a6f6 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -35,6 +35,12 @@ figlet-rs = { workspace = true }
 figment = { workspace = true }
 iggy = { workspace = true }
 iggy_common = { workspace = true }
+opentelemetry = { workspace = true }
+opentelemetry-appender-tracing = { workspace = true }
+opentelemetry-otlp = { workspace = true }
+opentelemetry-semantic-conventions = { workspace = true }
+opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
+reqwest = { workspace = true }
 rmcp = { version = "0.13.0", features = [
     "server",
     "transport-io",
@@ -47,6 +53,7 @@ thiserror = { workspace = true }
 tokio = { workspace = true }
 tower-http = { workspace = true }
 tracing = { workspace = true }
+tracing-opentelemetry = { workspace = true }
 tracing-subscriber = { workspace = true }
 
 [dev-dependencies]
diff --git a/core/ai/mcp/config.toml b/core/ai/mcp/config.toml
index 69ff5f349..b034f42e6 100644
--- a/core/ai/mcp/config.toml
+++ b/core/ai/mcp/config.toml
@@ -52,3 +52,15 @@ create = true
 read = true
 update = true
 delete = true
+
+[telemetry]
+enabled = false
+service_name = "iggy-mcp"
+
+[telemetry.logs]
+transport = "grpc"
+endpoint = "http://localhost:4317";
+
+[telemetry.traces]
+transport = "grpc"
+endpoint = "http://localhost:4317";
diff --git a/core/ai/mcp/src/configs.rs b/core/ai/mcp/src/configs.rs
index e29b33092..7ec2846c3 100644
--- a/core/ai/mcp/src/configs.rs
+++ b/core/ai/mcp/src/configs.rs
@@ -26,7 +26,8 @@ use figment::{
 use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
 use iggy_common::{CustomEnvProvider, FileConfigProvider};
 use serde::{Deserialize, Serialize};
-use std::fmt::Formatter;
+use std::fmt::{Display as FmtDisplay, Formatter};
+use std::str::FromStr;
 use strum::Display;
 use tower_http::cors::{AllowOrigin, CorsLayer};
 
@@ -37,6 +38,7 @@ pub struct McpServerConfig {
     pub iggy: IggyConfig,
     pub permissions: PermissionsConfig,
     pub transport: McpTransport,
+    pub telemetry: TelemetryConfig,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -101,6 +103,105 @@ pub enum McpTransport {
     Stdio,
 }
 
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryConfig {
+    pub enabled: bool,
+    pub service_name: String,
+    pub logs: TelemetryLogsConfig,
+    pub traces: TelemetryTracesConfig,
+}
+
+impl Default for TelemetryConfig {
+    fn default() -> Self {
+        Self {
+            enabled: false,
+            service_name: "iggy-mcp".to_owned(),
+            logs: TelemetryLogsConfig::default(),
+            traces: TelemetryTracesConfig::default(),
+        }
+    }
+}
+
+impl FmtDisplay for TelemetryConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ enabled: {}, service_name: {}, logs: {}, traces: {} }}",
+            self.enabled, self.service_name, self.logs, self.traces
+        )
+    }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryLogsConfig {
+    pub transport: TelemetryTransport,
+    pub endpoint: String,
+}
+
+impl Default for TelemetryLogsConfig {
+    fn default() -> Self {
+        Self {
+            transport: TelemetryTransport::Grpc,
+            endpoint: "http://localhost:4317".to_owned(),
+        }
+    }
+}
+
+impl FmtDisplay for TelemetryLogsConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ transport: {}, endpoint: {} }}",
+            self.transport, self.endpoint
+        )
+    }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryTracesConfig {
+    pub transport: TelemetryTransport,
+    pub endpoint: String,
+}
+
+impl Default for TelemetryTracesConfig {
+    fn default() -> Self {
+        Self {
+            transport: TelemetryTransport::Grpc,
+            endpoint: "http://localhost:4317".to_owned(),
+        }
+    }
+}
+
+impl FmtDisplay for TelemetryTracesConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ transport: {}, endpoint: {} }}",
+            self.transport, self.endpoint
+        )
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)]
+#[serde(rename_all = "lowercase")]
+pub enum TelemetryTransport {
+    #[strum(to_string = "grpc")]
+    Grpc,
+    #[strum(to_string = "http")]
+    Http,
+}
+
+impl FromStr for TelemetryTransport {
+    type Err = String;
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "grpc" => Ok(TelemetryTransport::Grpc),
+            "http" => Ok(TelemetryTransport::Http),
+            _ => Err(format!("Invalid telemetry transport: {s}")),
+        }
+    }
+}
+
 impl Default for McpServerConfig {
     fn default() -> Self {
         Self {
@@ -108,6 +209,7 @@ impl Default for McpServerConfig {
             iggy: IggyConfig::default(),
             permissions: PermissionsConfig::default(),
             transport: McpTransport::Http,
+            telemetry: TelemetryConfig::default(),
         }
     }
 }
@@ -238,8 +340,8 @@ impl std::fmt::Display for McpServerConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ http: {}, iggy: {}, permissions: {:?}, transport: {} }}",
-            self.http, self.iggy, self.permissions, self.transport
+            "{{ http: {}, iggy: {}, permissions: {:?}, transport: {}, 
telemetry: {} }}",
+            self.http, self.iggy, self.permissions, self.transport, 
self.telemetry
         )
     }
 }
diff --git a/core/ai/mcp/src/log.rs b/core/ai/mcp/src/log.rs
new file mode 100644
index 000000000..7cce521e6
--- /dev/null
+++ b/core/ai/mcp/src/log.rs
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::configs::{McpTransport, TelemetryConfig, TelemetryTransport};
+use opentelemetry::trace::TracerProvider;
+use opentelemetry::{KeyValue, global};
+use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
+use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
+use opentelemetry_sdk::Resource;
+use opentelemetry_sdk::propagation::TraceContextPropagator;
+use tracing::info;
+use tracing_opentelemetry::OpenTelemetryLayer;
+use tracing_subscriber::EnvFilter;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+pub fn init_logging(
+    telemetry_config: &TelemetryConfig,
+    transport: McpTransport,
+    version: &'static str,
+) {
+    // STDIO transport needs stderr output with no ANSI codes
+    // HTTP transport can use normal stdout with ANSI
+    let (default_level, use_stderr, use_ansi) = match transport {
+        McpTransport::Stdio => ("DEBUG", true, false),
+        McpTransport::Http => ("INFO", false, true),
+    };
+
+    let env_filter =
+        EnvFilter::try_from_default_env().unwrap_or_else(|_| 
EnvFilter::new(default_level));
+
+    match (telemetry_config.enabled, use_stderr) {
+        (true, true) => {
+            let (logger_provider, tracer_provider) = 
init_telemetry(telemetry_config, version);
+
+            let service_name = telemetry_config.service_name.clone();
+            let tracer = tracer_provider.tracer(service_name);
+            global::set_tracer_provider(tracer_provider);
+            global::set_text_map_propagator(TraceContextPropagator::new());
+
+            let fmt_layer = tracing_subscriber::fmt::layer()
+                .with_writer(std::io::stderr)
+                .with_ansi(use_ansi);
+            let otel_logs_layer = 
OpenTelemetryTracingBridge::new(&logger_provider);
+            let otel_traces_layer = OpenTelemetryLayer::new(tracer);
+
+            tracing_subscriber::registry()
+                .with(env_filter)
+                .with(fmt_layer)
+                .with(otel_logs_layer)
+                .with(otel_traces_layer)
+                .init();
+
+            info!(
+                "Logging initialized with telemetry enabled, service name: {}",
+                telemetry_config.service_name
+            );
+        }
+        (true, false) => {
+            let (logger_provider, tracer_provider) = 
init_telemetry(telemetry_config, version);
+
+            let service_name = telemetry_config.service_name.clone();
+            let tracer = tracer_provider.tracer(service_name);
+            global::set_tracer_provider(tracer_provider);
+            global::set_text_map_propagator(TraceContextPropagator::new());
+
+            let fmt_layer = 
tracing_subscriber::fmt::layer().with_ansi(use_ansi);
+            let otel_logs_layer = 
OpenTelemetryTracingBridge::new(&logger_provider);
+            let otel_traces_layer = OpenTelemetryLayer::new(tracer);
+
+            tracing_subscriber::registry()
+                .with(env_filter)
+                .with(fmt_layer)
+                .with(otel_logs_layer)
+                .with(otel_traces_layer)
+                .init();
+
+            info!(
+                "Logging initialized with telemetry enabled, service name: {}",
+                telemetry_config.service_name
+            );
+        }
+        (false, true) => {
+            tracing_subscriber::fmt()
+                .with_env_filter(env_filter)
+                .with_writer(std::io::stderr)
+                .with_ansi(use_ansi)
+                .init();
+        }
+        (false, false) => {
+            tracing_subscriber::registry()
+                .with(tracing_subscriber::fmt::layer().with_ansi(use_ansi))
+                .with(env_filter)
+                .init();
+        }
+    }
+}
+
+fn init_telemetry(
+    telemetry_config: &TelemetryConfig,
+    version: &'static str,
+) -> (
+    opentelemetry_sdk::logs::SdkLoggerProvider,
+    opentelemetry_sdk::trace::SdkTracerProvider,
+) {
+    let service_name = telemetry_config.service_name.clone();
+    let resource = Resource::builder()
+        .with_service_name(service_name)
+        .with_attribute(KeyValue::new(
+            opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
+            version,
+        ))
+        .build();
+
+    let logger_provider = init_logs_exporter(telemetry_config, 
resource.clone());
+    let tracer_provider = init_traces_exporter(telemetry_config, resource);
+
+    (logger_provider, tracer_provider)
+}
+
+fn init_logs_exporter(
+    telemetry_config: &TelemetryConfig,
+    resource: Resource,
+) -> opentelemetry_sdk::logs::SdkLoggerProvider {
+    match telemetry_config.logs.transport {
+        TelemetryTransport::Grpc => 
opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+            .with_resource(resource)
+            .with_batch_exporter(
+                opentelemetry_otlp::LogExporter::builder()
+                    .with_tonic()
+                    .with_endpoint(telemetry_config.logs.endpoint.clone())
+                    .build()
+                    .expect("Failed to initialize gRPC logger."),
+            )
+            .build(),
+        TelemetryTransport::Http => {
+            let log_exporter = opentelemetry_otlp::LogExporter::builder()
+                .with_http()
+                .with_http_client(reqwest::Client::new())
+                .with_endpoint(telemetry_config.logs.endpoint.clone())
+                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+                .build()
+                .expect("Failed to initialize HTTP logger.");
+            opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+                .with_resource(resource)
+                .with_batch_exporter(log_exporter)
+                .build()
+        }
+    }
+}
+
+fn init_traces_exporter(
+    telemetry_config: &TelemetryConfig,
+    resource: Resource,
+) -> opentelemetry_sdk::trace::SdkTracerProvider {
+    match telemetry_config.traces.transport {
+        TelemetryTransport::Grpc => 
opentelemetry_sdk::trace::SdkTracerProvider::builder()
+            .with_resource(resource)
+            .with_batch_exporter(
+                opentelemetry_otlp::SpanExporter::builder()
+                    .with_tonic()
+                    .with_endpoint(telemetry_config.traces.endpoint.clone())
+                    .build()
+                    .expect("Failed to initialize gRPC tracer."),
+            )
+            .build(),
+        TelemetryTransport::Http => {
+            let trace_exporter = opentelemetry_otlp::SpanExporter::builder()
+                .with_http()
+                .with_http_client(reqwest::Client::new())
+                .with_endpoint(telemetry_config.traces.endpoint.clone())
+                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+                .build()
+                .expect("Failed to initialize HTTP tracer.");
+            opentelemetry_sdk::trace::SdkTracerProvider::builder()
+                .with_resource(resource)
+                .with_batch_exporter(trace_exporter)
+                .build()
+        }
+    }
+}
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index b5cfb4281..dbbd1bf4c 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -26,14 +27,16 @@ use rmcp::{ServiceExt, model::ErrorData, transport::stdio};
 use service::IggyService;
 use std::{env, sync::Arc};
 use tracing::{error, info};
-use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
 
 mod api;
 mod configs;
 mod error;
+mod log;
 mod service;
 mod stream;
 
+const VERSION: &str = env!("CARGO_PKG_VERSION");
+
 const DEFAULT_CONFIG_PATH: &str = "core/ai/mcp/config.toml";
 
 #[tokio::main]
@@ -62,18 +65,7 @@ async fn main() -> Result<(), McpRuntimeError> {
         .expect("Failed to load configuration");
 
     let transport = config.transport;
-    if transport == McpTransport::Stdio {
-        tracing_subscriber::fmt()
-            
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("DEBUG")))
-            .with_writer(std::io::stderr)
-            .with_ansi(false)
-            .init();
-    } else {
-        Registry::default()
-            .with(tracing_subscriber::fmt::layer())
-            
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
-            .init();
-    }
+    log::init_logging(&config.telemetry, transport, VERSION);
 
     info!("Starting Iggy MCP Server, transport: {transport}...");
 
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
index 4b60f74fe..2ef1770fe 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -35,6 +35,7 @@ axum-server = { workspace = true }
 chrono = { workspace = true }
 clap = { workspace = true }
 dashmap = { workspace = true }
+derive_more = { workspace = true }
 dirs = { workspace = true }
 dlopen2 = { workspace = true }
 dotenvy = { workspace = true }
@@ -47,6 +48,11 @@ iggy_common = { workspace = true }
 iggy_connector_sdk = { workspace = true }
 mimalloc = { workspace = true }
 once_cell = { workspace = true }
+opentelemetry = { workspace = true }
+opentelemetry-appender-tracing = { workspace = true }
+opentelemetry-otlp = { workspace = true }
+opentelemetry-semantic-conventions = { workspace = true }
+opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
 postcard = { workspace = true }
 reqwest = { workspace = true }
 reqwest-middleware = { workspace = true }
@@ -62,6 +68,7 @@ tokio = { workspace = true }
 toml = { workspace = true }
 tower-http = { workspace = true }
 tracing = { workspace = true }
+tracing-opentelemetry = { workspace = true }
 tracing-subscriber = { workspace = true }
 
 [dev-dependencies]
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index dadcc0f35..abfa1ab50 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -51,3 +51,15 @@ path = "local_state"
 [connectors]
 config_type = "local"
 config_dir = ""
+
+[telemetry]
+enabled = false
+service_name = "iggy-connectors"
+
+[telemetry.logs]
+transport = "grpc"
+endpoint = "http://localhost:4317";
+
+[telemetry.traces]
+transport = "grpc"
+endpoint = "http://localhost:4317";
diff --git a/core/connectors/runtime/example_config/config.toml 
b/core/connectors/runtime/example_config/config.toml
index 6bcbc9b22..99007e436 100644
--- a/core/connectors/runtime/example_config/config.toml
+++ b/core/connectors/runtime/example_config/config.toml
@@ -34,6 +34,9 @@ enabled = false
 cert_file = "core/certs/iggy_cert.pem"
 key_file = "core/certs/iggy_key.pem"
 
+[telemetry]
+enabled = true
+
 [iggy]
 address = "localhost:8090"
 username = "iggy"
diff --git a/core/connectors/runtime/src/configs/runtime.rs 
b/core/connectors/runtime/src/configs/runtime.rs
index bf81b8651..77d88ef07 100644
--- a/core/connectors/runtime/src/configs/runtime.rs
+++ b/core/connectors/runtime/src/configs/runtime.rs
@@ -18,6 +18,7 @@
  */
 
 use crate::api::config::HttpConfig;
+use derive_more::Display;
 use figment::providers::{Format, Toml};
 use figment::value::Dict;
 use figment::{Metadata, Profile, Provider};
@@ -27,12 +28,112 @@ use serde::{Deserialize, Serialize};
 use serde_with::{DisplayFromStr, serde_as};
 use std::collections::HashMap;
 use std::fmt::{Display, Formatter};
+use std::str::FromStr;
 
 const SECRET_KEYS: [&str; 2] = [
     "IGGY_CONNECTORS_IGGY_PASSWORD",
     "IGGY_CONNECTORS_IGGY_TOKEN",
 ];
 
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryConfig {
+    pub enabled: bool,
+    pub service_name: String,
+    pub logs: TelemetryLogsConfig,
+    pub traces: TelemetryTracesConfig,
+}
+
+impl Default for TelemetryConfig {
+    fn default() -> Self {
+        Self {
+            enabled: false,
+            service_name: "iggy-connectors".to_owned(),
+            logs: TelemetryLogsConfig::default(),
+            traces: TelemetryTracesConfig::default(),
+        }
+    }
+}
+
+impl Display for TelemetryConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ enabled: {}, service_name: {}, logs: {}, traces: {} }}",
+            self.enabled, self.service_name, self.logs, self.traces
+        )
+    }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryLogsConfig {
+    pub transport: TelemetryTransport,
+    pub endpoint: String,
+}
+
+impl Default for TelemetryLogsConfig {
+    fn default() -> Self {
+        Self {
+            transport: TelemetryTransport::Grpc,
+            endpoint: "http://localhost:4317".to_owned(),
+        }
+    }
+}
+
+impl Display for TelemetryLogsConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ transport: {}, endpoint: {} }}",
+            self.transport, self.endpoint
+        )
+    }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryTracesConfig {
+    pub transport: TelemetryTransport,
+    pub endpoint: String,
+}
+
+impl Default for TelemetryTracesConfig {
+    fn default() -> Self {
+        Self {
+            transport: TelemetryTransport::Grpc,
+            endpoint: "http://localhost:4317".to_owned(),
+        }
+    }
+}
+
+impl Display for TelemetryTracesConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{{ transport: {}, endpoint: {} }}",
+            self.transport, self.endpoint
+        )
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)]
+#[serde(rename_all = "lowercase")]
+pub enum TelemetryTransport {
+    #[display("grpc")]
+    Grpc,
+    #[display("http")]
+    Http,
+}
+
+impl FromStr for TelemetryTransport {
+    type Err = String;
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "grpc" => Ok(TelemetryTransport::Grpc),
+            "http" => Ok(TelemetryTransport::Http),
+            _ => Err(format!("Invalid telemetry transport: {s}")),
+        }
+    }
+}
+
 #[derive(Debug, Default, Clone, Deserialize, Serialize)]
 #[serde(default)]
 pub struct ConnectorsRuntimeConfig {
@@ -40,6 +141,7 @@ pub struct ConnectorsRuntimeConfig {
     pub iggy: IggyConfig,
     pub connectors: ConnectorsConfig,
     pub state: StateConfig,
+    pub telemetry: TelemetryConfig,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -174,8 +276,8 @@ impl Display for ConnectorsRuntimeConfig {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "{{ http: {}, iggy: {}, connectors: {}, state: {:} }}",
-            self.http, self.iggy, self.connectors, self.state
+            "{{ http: {}, iggy: {}, connectors: {}, state: {}, telemetry: {} 
}}",
+            self.http, self.iggy, self.connectors, self.state, self.telemetry
         )
     }
 }
diff --git a/core/connectors/runtime/src/log.rs 
b/core/connectors/runtime/src/log.rs
new file mode 100644
index 000000000..4eded2aaf
--- /dev/null
+++ b/core/connectors/runtime/src/log.rs
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::configs::runtime::{TelemetryConfig, TelemetryTransport};
+use iggy_connector_sdk::LogCallback;
+use opentelemetry::trace::TracerProvider;
+use opentelemetry::{KeyValue, global};
+use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
+use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
+use opentelemetry_sdk::Resource;
+use opentelemetry_sdk::propagation::TraceContextPropagator;
+use tracing::info;
+use tracing_opentelemetry::OpenTelemetryLayer;
+use tracing_subscriber::EnvFilter;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+pub fn init_logging(telemetry_config: &TelemetryConfig, version: &'static str) 
{
+    let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| 
EnvFilter::new("INFO"));
+
+    let fmt_layer = tracing_subscriber::fmt::layer();
+
+    if telemetry_config.enabled {
+        let (logger_provider, tracer_provider) = 
init_telemetry(telemetry_config, version);
+
+        let service_name = telemetry_config.service_name.clone();
+        let tracer = tracer_provider.tracer(service_name);
+        global::set_tracer_provider(tracer_provider);
+        global::set_text_map_propagator(TraceContextPropagator::new());
+
+        let otel_logs_layer = 
OpenTelemetryTracingBridge::new(&logger_provider);
+        let otel_traces_layer = OpenTelemetryLayer::new(tracer);
+
+        tracing_subscriber::registry()
+            .with(env_filter)
+            .with(fmt_layer)
+            .with(otel_logs_layer)
+            .with(otel_traces_layer)
+            .init();
+
+        info!(
+            "Logging initialized with telemetry enabled, service name: {}",
+            telemetry_config.service_name
+        );
+    } else {
+        tracing_subscriber::registry()
+            .with(env_filter)
+            .with(fmt_layer)
+            .init();
+    }
+}
+
+fn init_telemetry(
+    telemetry_config: &TelemetryConfig,
+    version: &'static str,
+) -> (
+    opentelemetry_sdk::logs::SdkLoggerProvider,
+    opentelemetry_sdk::trace::SdkTracerProvider,
+) {
+    let service_name = telemetry_config.service_name.clone();
+    let resource = Resource::builder()
+        .with_service_name(service_name)
+        .with_attribute(KeyValue::new(
+            opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
+            version,
+        ))
+        .build();
+
+    let logger_provider = init_logs_exporter(telemetry_config, 
resource.clone());
+    let tracer_provider = init_traces_exporter(telemetry_config, resource);
+
+    (logger_provider, tracer_provider)
+}
+
+fn init_logs_exporter(
+    telemetry_config: &TelemetryConfig,
+    resource: Resource,
+) -> opentelemetry_sdk::logs::SdkLoggerProvider {
+    match telemetry_config.logs.transport {
+        TelemetryTransport::Grpc => 
opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+            .with_resource(resource)
+            .with_batch_exporter(
+                opentelemetry_otlp::LogExporter::builder()
+                    .with_tonic()
+                    .with_endpoint(telemetry_config.logs.endpoint.clone())
+                    .build()
+                    .expect("Failed to initialize gRPC logger."),
+            )
+            .build(),
+        TelemetryTransport::Http => {
+            let log_exporter = opentelemetry_otlp::LogExporter::builder()
+                .with_http()
+                .with_http_client(reqwest::Client::new())
+                .with_endpoint(telemetry_config.logs.endpoint.clone())
+                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+                .build()
+                .expect("Failed to initialize HTTP logger.");
+            opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+                .with_resource(resource)
+                .with_batch_exporter(log_exporter)
+                .build()
+        }
+    }
+}
+
+fn init_traces_exporter(
+    telemetry_config: &TelemetryConfig,
+    resource: Resource,
+) -> opentelemetry_sdk::trace::SdkTracerProvider {
+    match telemetry_config.traces.transport {
+        TelemetryTransport::Grpc => 
opentelemetry_sdk::trace::SdkTracerProvider::builder()
+            .with_resource(resource)
+            .with_batch_exporter(
+                opentelemetry_otlp::SpanExporter::builder()
+                    .with_tonic()
+                    .with_endpoint(telemetry_config.traces.endpoint.clone())
+                    .build()
+                    .expect("Failed to initialize gRPC tracer."),
+            )
+            .build(),
+        TelemetryTransport::Http => {
+            let trace_exporter = opentelemetry_otlp::SpanExporter::builder()
+                .with_http()
+                .with_http_client(reqwest::Client::new())
+                .with_endpoint(telemetry_config.traces.endpoint.clone())
+                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+                .build()
+                .expect("Failed to initialize HTTP tracer.");
+            opentelemetry_sdk::trace::SdkTracerProvider::builder()
+                .with_resource(resource)
+                .with_batch_exporter(trace_exporter)
+                .build()
+        }
+    }
+}
+
+/// Log callback that routes plugin logs through the runtime's tracing 
subscriber.
+/// This function is passed to plugins via FFI so their logs appear in the 
runtime's
+/// output and OTEL telemetry.
+pub extern "C" fn runtime_log_callback(
+    level: u8,
+    target_ptr: *const u8,
+    target_len: usize,
+    message_ptr: *const u8,
+    message_len: usize,
+) {
+    let target = unsafe {
+        std::str::from_utf8(std::slice::from_raw_parts(target_ptr, target_len))
+            .unwrap_or("connector")
+    };
+    let message = unsafe {
+        std::str::from_utf8(std::slice::from_raw_parts(message_ptr, 
message_len))
+            .unwrap_or("<invalid utf8>")
+    };
+
+    match level {
+        0 => tracing::trace!(target: "connector", connector_target = target,  
message),
+        1 => tracing::debug!(target: "connector", connector_target = target,  
message),
+        2 => tracing::info!(target: "connector", connector_target = target,  
message),
+        3 => tracing::warn!(target: "connector", connector_target = target,  
message),
+        _ => tracing::error!(target: "connector", connector_target = target,  
message),
+    }
+}
+
+pub const LOG_CALLBACK: LogCallback = runtime_log_callback;
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index d600fc7eb..ab4a77208 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -41,12 +41,12 @@ use std::{
     sync::{Arc, atomic::AtomicU32},
 };
 use tracing::{debug, info};
-use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
 
 mod api;
 pub(crate) mod configs;
 pub(crate) mod context;
 pub(crate) mod error;
+mod log;
 mod manager;
 mod sink;
 mod source;
@@ -54,6 +54,8 @@ mod state;
 mod stream;
 mod transform;
 
+const VERSION: &str = env!("CARGO_PKG_VERSION");
+
 #[global_allocator]
 static GLOBAL: MiMalloc = MiMalloc;
 
@@ -73,6 +75,7 @@ struct SourceApi {
         config_len: usize,
         state_ptr: *const u8,
         state_len: usize,
+        log_callback: iggy_connector_sdk::LogCallback,
     ) -> i32,
     handle: extern "C" fn(id: u32, callback: SendCallback) -> i32,
     close: extern "C" fn(id: u32) -> i32,
@@ -80,7 +83,12 @@ struct SourceApi {
 
 #[derive(WrapperApi)]
 struct SinkApi {
-    open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> 
i32,
+    open: extern "C" fn(
+        id: u32,
+        config_ptr: *const u8,
+        config_len: usize,
+        log_callback: iggy_connector_sdk::LogCallback,
+    ) -> i32,
     #[allow(clippy::too_many_arguments)]
     consume: extern "C" fn(
         id: u32,
@@ -116,20 +124,17 @@ async fn main() -> Result<(), RuntimeError> {
         );
     }
 
-    Registry::default()
-        .with(tracing_subscriber::fmt::layer())
-        
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
-        .init();
-
     let config_path =
         env::var("IGGY_CONNECTORS_CONFIG_PATH").unwrap_or_else(|_| 
DEFAULT_CONFIG_PATH.to_string());
-    info!("Starting Iggy Connectors Runtime, loading configuration from: 
{config_path}...");
+    println!("Starting Iggy Connectors Runtime, loading configuration from: 
{config_path}...");
 
     let config: ConnectorsRuntimeConfig = 
ConnectorsRuntimeConfig::config_provider(config_path)
         .load_config()
         .await
         .expect("Failed to load configuration");
 
+    log::init_logging(&config.telemetry, VERSION);
+
     std::fs::create_dir_all(&config.state.path).expect("Failed to create state 
directory");
 
     info!("State will be stored in: {}", config.state.path);
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index c9d8bd3da..3f55d4d8a 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -19,6 +19,7 @@
 
 use crate::configs::connectors::SinkConfig;
 use crate::context::RuntimeContext;
+use crate::log::LOG_CALLBACK;
 use crate::manager::status::ConnectorStatus;
 use crate::{
     PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, 
SinkConnectorPlugin,
@@ -319,7 +320,12 @@ fn init_sink(
     id: u32,
 ) -> Result<(), RuntimeError> {
     let plugin_config = serde_json::to_string(plugin_config).expect("Invalid 
sink plugin config.");
-    let result = (container.open)(id, plugin_config.as_ptr(), 
plugin_config.len());
+    let result = (container.open)(
+        id,
+        plugin_config.as_ptr(),
+        plugin_config.len(),
+        LOG_CALLBACK,
+    );
     if result != 0 {
         let err = format!("Plugin initialization failed (ID: {id})");
         error!("{err}");
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 5d6d88744..d366a5de5 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -37,6 +37,7 @@ use tracing::{debug, error, info, trace, warn};
 
 use crate::configs::connectors::SourceConfig;
 use crate::context::RuntimeContext;
+use crate::log::LOG_CALLBACK;
 use crate::manager::status::ConnectorStatus;
 use crate::{
     PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
@@ -202,6 +203,7 @@ fn init_source(
         plugin_config.len(),
         state_ptr,
         state_len,
+        LOG_CALLBACK,
     );
     if result != 0 {
         let err = format!("Plugin initialization failed (ID: {id})");
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index b91ade3dc..16738ef57 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -38,10 +38,12 @@ use tokio::runtime::Runtime;
 
 pub mod decoders;
 pub mod encoders;
+pub mod log;
 pub mod sink;
 pub mod source;
 pub mod transforms;
 
+pub use log::LogCallback;
 pub use transforms::Transform;
 
 static RUNTIME: OnceCell<Runtime> = OnceCell::new();
diff --git a/core/connectors/sdk/src/log.rs b/core/connectors/sdk/src/log.rs
new file mode 100644
index 000000000..af0f6fcf9
--- /dev/null
+++ b/core/connectors/sdk/src/log.rs
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tracing::field::{Field, Visit};
+use tracing::{Event, Level, Subscriber};
+use tracing_subscriber::layer::{Context, Layer};
+
+/// Callback for plugins to send log records to the runtime.
+///
+/// # Arguments
+/// * `level` - Log level (0=TRACE, 1=DEBUG, 2=INFO, 3=WARN, 4=ERROR)
+/// * `target_ptr` - Pointer to the target string (module path)
+/// * `target_len` - Length of the target string
+/// * `message_ptr` - Pointer to the message string
+/// * `message_len` - Length of the message string
+pub type LogCallback = extern "C" fn(
+    level: u8,
+    target_ptr: *const u8,
+    target_len: usize,
+    message_ptr: *const u8,
+    message_len: usize,
+);
+
+/// A tracing Layer that forwards events to the runtime via callback.
+/// This allows plugin logs to be routed through the runtime's OTEL subscriber.
+pub struct CallbackLayer {
+    callback: LogCallback,
+}
+
+impl CallbackLayer {
+    pub fn new(callback: LogCallback) -> Self {
+        Self { callback }
+    }
+}
+
+impl<S: Subscriber> Layer<S> for CallbackLayer {
+    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
+        let level = match *event.metadata().level() {
+            Level::TRACE => 0u8,
+            Level::DEBUG => 1,
+            Level::INFO => 2,
+            Level::WARN => 3,
+            Level::ERROR => 4,
+        };
+
+        let target = event.metadata().target();
+
+        let mut visitor = MessageVisitor::default();
+        event.record(&mut visitor);
+        let message = visitor.message;
+
+        (self.callback)(
+            level,
+            target.as_ptr(),
+            target.len(),
+            message.as_ptr(),
+            message.len(),
+        );
+    }
+}
+
+/// Visitor to extract the message field from a tracing Event
+#[derive(Default)]
+struct MessageVisitor {
+    message: String,
+}
+
+impl Visit for MessageVisitor {
+    fn record_str(&mut self, field: &Field, value: &str) {
+        if field.name() == "message" || self.message.is_empty() {
+            self.message = value.to_string();
+        }
+    }
+
+    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
+        if field.name() == "message" || self.message.is_empty() {
+            self.message = format!("{:?}", value);
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index 8e04f6f05..457618e75 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -21,6 +22,7 @@ use tokio::sync::watch;
 use tracing::{error, info};
 use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
 
+use crate::log::{CallbackLayer, LogCallback};
 use crate::{ConsumedMessage, MessagesMetadata, RawMessages, Sink, 
TopicMetadata, get_runtime};
 
 pub type ConsumeCallback = extern "C" fn(
@@ -56,6 +58,7 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
         id: u32,
         config_ptr: *const u8,
         config_len: usize,
+        log_callback: LogCallback,
         factory: F,
     ) -> i32
     where
@@ -64,7 +67,7 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
     {
         unsafe {
             _ = Registry::default()
-                .with(tracing_subscriber::fmt::layer())
+                .with(CallbackLayer::new(log_callback))
                 
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
                 .try_init();
             let slice = std::slice::from_raw_parts(config_ptr, config_len);
@@ -219,15 +222,21 @@ macro_rules! sink_connector {
 
         use dashmap::DashMap;
         use once_cell::sync::Lazy;
+        use $crate::LogCallback;
         use $crate::sink::SinkContainer;
 
         static INSTANCES: Lazy<DashMap<u32, SinkContainer<$type>>> = 
Lazy::new(DashMap::new);
 
         #[cfg(not(test))]
         #[unsafe(no_mangle)]
-        unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: 
usize) -> i32 {
+        unsafe extern "C" fn open(
+            id: u32,
+            config_ptr: *const u8,
+            config_len: usize,
+            log_callback: LogCallback,
+        ) -> i32 {
             let mut container = SinkContainer::new(id);
-            let result = container.open(id, config_ptr, config_len, 
<$type>::new);
+            let result = container.open(id, config_ptr, config_len, 
log_callback, <$type>::new);
             INSTANCES.insert(id, container);
             result
         }
diff --git a/core/connectors/sdk/src/source.rs 
b/core/connectors/sdk/src/source.rs
index f4f2d1b39..16b45aee7 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+use crate::log::{CallbackLayer, LogCallback};
 use crate::{ConnectorState, Error, Source, get_runtime};
 use serde::de::DeserializeOwned;
 use std::sync::Arc;
@@ -57,6 +58,7 @@ impl<T: Source + std::fmt::Debug + 'static> 
SourceContainer<T> {
 
     /// # Safety
     /// Do not copy the configuration pointer
+    #[allow(clippy::too_many_arguments)]
     pub unsafe fn open<F, C>(
         &mut self,
         id: u32,
@@ -64,6 +66,7 @@ impl<T: Source + std::fmt::Debug + 'static> 
SourceContainer<T> {
         config_len: usize,
         state_ptr: *const u8,
         state_len: usize,
+        log_callback: LogCallback,
         factory: F,
     ) -> i32
     where
@@ -72,7 +75,7 @@ impl<T: Source + std::fmt::Debug + 'static> 
SourceContainer<T> {
     {
         unsafe {
             _ = Registry::default()
-                .with(tracing_subscriber::fmt::layer())
+                .with(CallbackLayer::new(log_callback))
                 
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
                 .try_init();
             let slice = std::slice::from_raw_parts(config_ptr, config_len);
@@ -207,6 +210,7 @@ macro_rules! source_connector {
 
         use dashmap::DashMap;
         use once_cell::sync::Lazy;
+        use $crate::LogCallback;
         use $crate::source::SendCallback;
         use $crate::source::SourceContainer;
 
@@ -220,6 +224,7 @@ macro_rules! source_connector {
             config_len: usize,
             state_ptr: *const u8,
             state_len: usize,
+            log_callback: LogCallback,
         ) -> i32 {
             let mut container = SourceContainer::new(id);
             let result = container.open(
@@ -228,6 +233,7 @@ macro_rules! source_connector {
                 config_len,
                 state_ptr,
                 state_len,
+                log_callback,
                 <$type>::new,
             );
             INSTANCES.insert(id, container);
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3ba2cecb5..b98765b5b 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -78,24 +78,11 @@ mimalloc = { workspace = true, optional = true }
 mime_guess = { version = "2.0", optional = true }
 moka = { version = "0.12.12", features = ["future"] }
 nix = { workspace = true }
-opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
-opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] }
-opentelemetry-otlp = { version = "0.31.0", features = [
-    "logs",
-    "trace",
-    "grpc-tonic",
-    "http",
-    "http-proto",
-    "reqwest-client",
-] }
-opentelemetry-semantic-conventions = "0.31.0"
-opentelemetry_sdk = { version = "0.31.0", features = [
-    "logs",
-    "trace",
-    "experimental_async_runtime",
-    "experimental_logs_batch_log_processor_with_async_runtime",
-    "experimental_trace_batch_span_processor_with_async_runtime",
-] }
+opentelemetry = { workspace = true }
+opentelemetry-appender-tracing = { workspace = true }
+opentelemetry-otlp = { workspace = true }
+opentelemetry-semantic-conventions = { workspace = true }
+opentelemetry_sdk = { workspace = true }
 papaya = "0.2.3"
 prometheus-client = "0.24.0"
 rand = { workspace = true }
@@ -120,7 +107,7 @@ toml = { workspace = true }
 tower-http = { workspace = true }
 tracing = { workspace = true }
 tracing-appender = { workspace = true }
-tracing-opentelemetry = "0.32.1"
+tracing-opentelemetry = { workspace = true }
 tracing-subscriber = { workspace = true }
 tungstenite = { workspace = true }
 ulid = "1.2.1"
diff --git a/docker-compose.yml b/docker-compose.yml
index afc9e14e6..4c377e66d 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -16,34 +16,40 @@
 # under the License.
 
 services:
-  iggy-server:
-    build:
-      context: .
-      dockerfile: Dockerfile
-    container_name: iggy-server
+  greptime:
+    image: "greptime/greptimedb:latest"
+    container_name: greptime
     restart: unless-stopped
-    cap_add:
-      - SYS_NICE
-    security_opt:
-      - seccomp:unconfined
-    ulimits:
-      memlock:
-        soft: -1
-        hard: -1
-    networks:
-      - iggy
+    command: "standalone start --http-addr=0.0.0.0:4000 --rpc-addr 
0.0.0.0:4001 --postgres-addr 0.0.0.0:4003 
--user-provider=static_user_provider:file:./greptime_users"
     ports:
-      - 3000:3000
-      - 8080:8080
-      - 8090:8090
-      - 8092:8092
+      - "4000:4000"
+      - "4001:4001"
+      - "4003:4003"
+    networks:
+      - iggy-net
     volumes:
-      - iggy-server:/local_data
+      - greptime:/tmp/greptimedb
+      - ./greptime_users:/greptime_users
 
-volumes:
-  iggy-server:
-    driver: local
+  otel-collector:
+    image: otel/opentelemetry-collector-contrib:latest
+    container_name: otel-collector
+    restart: unless-stopped
+    command: ["--config=/etc/otel-collector-config.yaml"]
+    volumes:
+      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
+      - /var/log:/var/log:ro
+    ports:
+      - "4317:4317" # OTLP gRPC receiver
+      - "4318:4318" # OTLP HTTP receiver
+      - "13133:13133" # health_check extension
+    depends_on:
+      - greptime
+    networks:
+      - iggy-net
 
 networks:
-  iggy:
-    name: iggy-network
+  iggy-net:
+
+volumes:
+  greptime:
diff --git a/greptime_users b/greptime_users
new file mode 100644
index 000000000..227e38635
--- /dev/null
+++ b/greptime_users
@@ -0,0 +1 @@
+root=secret
diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml
new file mode 100644
index 000000000..4b9c121bf
--- /dev/null
+++ b/otel-collector-config.yaml
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+receivers:
+  otlp:
+    protocols:
+      grpc:
+        endpoint: 0.0.0.0:4317
+      http:
+        endpoint: 0.0.0.0:4318
+
+  filelog:
+    include: [/var/log/*.log]
+    start_at: end
+    operators:
+      - type: add
+        field: attributes.service_name
+        value: "system-logs"
+
+extensions:
+  basicauth/client:
+    client_auth:
+      username: root
+      password: secret
+  health_check:
+    endpoint: 0.0.0.0:13133
+
+processors:
+  batch:
+    timeout: 1s
+    send_batch_size: 1024
+    send_batch_max_size: 2048
+
+  memory_limiter:
+    limit_mib: 512
+    check_interval: 1s
+
+  resource:
+    attributes:
+      - key: service.name
+        from_attribute: service_name
+        action: upsert
+      - key: host.name
+        value: "docker-host"
+        action: upsert
+
+exporters:
+  otlphttp/traces:
+    endpoint: "http://greptime:4000/v1/otlp";
+    auth:
+      authenticator: basicauth/client
+    headers:
+      # x-greptime-db-name: '<your_db_name>'
+      x-greptime-pipeline-name: "greptime_trace_v1"
+    tls:
+      insecure: true
+  otlphttp/logs:
+    endpoint: "http://greptime:4000/v1/otlp";
+    auth:
+      authenticator: basicauth/client
+    headers:
+      # x-greptime-db-name: '<your_db_name>'
+      # x-greptime-log-table-name: '<table_name>'
+      # x-greptime-pipeline-name: '<pipeline_name>'
+    tls:
+      insecure: true
+
+  otlphttp/metrics:
+    endpoint: "http://greptime:4000/v1/otlp";
+    auth:
+      authenticator: basicauth/client
+    headers:
+      # x-greptime-db-name: '<your_db_name>'
+    tls:
+      insecure: true
+
+  debug:
+    verbosity: basic
+    sampling_initial: 5
+    sampling_thereafter: 200
+
+service:
+  extensions: [basicauth/client, health_check]
+  pipelines:
+    traces:
+      receivers: [otlp]
+      processors: [memory_limiter, batch]
+      exporters: [otlphttp/traces, debug]
+    logs:
+      receivers: [otlp]
+      processors: [memory_limiter, batch]
+      exporters: [otlphttp/logs, debug]
+    metrics:
+      receivers: [otlp]
+      processors: [memory_limiter, batch]
+      exporters: [otlphttp/metrics, debug]

Reply via email to