This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 13a1bf339 feat(connectors,mcp): implement logging with telemetry
(#2612)
13a1bf339 is described below
commit 13a1bf339c0a7c7f75672062ec4709e0337e310c
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Mon Jan 26 21:13:22 2026 +0100
feat(connectors,mcp): implement logging with telemetry (#2612)
---
Cargo.lock | 37 +++--
Cargo.toml | 19 +++
DEPENDENCIES.md | 22 +--
core/ai/mcp/Cargo.toml | 9 +-
core/ai/mcp/README.md | 18 ++
core/ai/mcp/config.toml | 12 ++
core/ai/mcp/src/configs.rs | 108 +++++++++++-
core/ai/mcp/src/log.rs | 180 ++++++++++++++++++++
core/ai/mcp/src/main.rs | 20 +--
core/connectors/runtime/Cargo.toml | 9 +-
core/connectors/runtime/README.md | 18 ++
core/connectors/runtime/config.toml | 12 ++
core/connectors/runtime/src/configs/runtime.rs | 106 +++++++++++-
core/connectors/runtime/src/log.rs | 181 +++++++++++++++++++++
core/connectors/runtime/src/main.rs | 21 ++-
core/connectors/runtime/src/sink.rs | 8 +-
core/connectors/runtime/src/source.rs | 2 +
core/connectors/sdk/Cargo.toml | 2 +-
core/connectors/sdk/src/lib.rs | 2 +
core/connectors/sdk/src/log.rs | 96 +++++++++++
core/connectors/sdk/src/sink.rs | 17 +-
core/connectors/sdk/src/source.rs | 8 +-
.../connectors/sinks/elasticsearch_sink/Cargo.toml | 2 +-
core/connectors/sinks/iceberg_sink/Cargo.toml | 2 +-
core/connectors/sinks/postgres_sink/Cargo.toml | 2 +-
core/connectors/sinks/quickwit_sink/Cargo.toml | 2 +-
core/connectors/sinks/stdout_sink/Cargo.toml | 2 +-
.../sources/elasticsearch_source/Cargo.toml | 2 +-
core/connectors/sources/postgres_source/Cargo.toml | 2 +-
core/connectors/sources/random_source/Cargo.toml | 2 +-
core/server/Cargo.toml | 25 +--
justfile | 4 +-
32 files changed, 865 insertions(+), 87 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 1167a2ea9..24d5d02d8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4527,7 +4527,7 @@ dependencies = [
[[package]]
name = "iggy-connectors"
-version = "0.2.1-edge.3"
+version = "0.2.1-edge.4"
dependencies = [
"async-trait",
"axum",
@@ -4535,6 +4535,7 @@ dependencies = [
"chrono",
"clap",
"dashmap",
+ "derive_more",
"dirs",
"dlopen2",
"dotenvy",
@@ -4547,6 +4548,11 @@ dependencies = [
"iggy_connector_sdk",
"mimalloc",
"once_cell",
+ "opentelemetry",
+ "opentelemetry-appender-tracing",
+ "opentelemetry-otlp",
+ "opentelemetry-semantic-conventions",
+ "opentelemetry_sdk",
"postcard",
"reqwest",
"reqwest-middleware",
@@ -4563,12 +4569,13 @@ dependencies = [
"toml 0.9.11+spec-1.1.0",
"tower-http",
"tracing",
+ "tracing-opentelemetry",
"tracing-subscriber",
]
[[package]]
name = "iggy-mcp"
-version = "0.2.1-edge.3"
+version = "0.2.1-edge.4"
dependencies = [
"axum",
"axum-server",
@@ -4578,6 +4585,12 @@ dependencies = [
"figment",
"iggy",
"iggy_common",
+ "opentelemetry",
+ "opentelemetry-appender-tracing",
+ "opentelemetry-otlp",
+ "opentelemetry-semantic-conventions",
+ "opentelemetry_sdk",
+ "reqwest",
"rmcp",
"serde",
"serde_json",
@@ -4587,6 +4600,7 @@ dependencies = [
"tokio",
"tower-http",
"tracing",
+ "tracing-opentelemetry",
"tracing-subscriber",
]
@@ -4657,7 +4671,7 @@ dependencies = [
[[package]]
name = "iggy_connector_elasticsearch_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -4675,7 +4689,7 @@ dependencies = [
[[package]]
name = "iggy_connector_elasticsearch_source"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"async-trait",
"chrono",
@@ -4693,7 +4707,7 @@ dependencies = [
[[package]]
name = "iggy_connector_iceberg_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"arrow-json",
"async-trait",
@@ -4712,7 +4726,7 @@ dependencies = [
[[package]]
name = "iggy_connector_postgres_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"async-trait",
"chrono",
@@ -4731,7 +4745,7 @@ dependencies = [
[[package]]
name = "iggy_connector_postgres_source"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -4752,7 +4766,7 @@ dependencies = [
[[package]]
name = "iggy_connector_quickwit_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -4767,7 +4781,7 @@ dependencies = [
[[package]]
name = "iggy_connector_random_source"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -4784,7 +4798,7 @@ dependencies = [
[[package]]
name = "iggy_connector_sdk"
-version = "0.1.1-edge.1"
+version = "0.1.1-edge.2"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -4812,7 +4826,7 @@ dependencies = [
[[package]]
name = "iggy_connector_stdout_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
dependencies = [
"async-trait",
"dashmap",
@@ -6306,6 +6320,7 @@ dependencies = [
"rand 0.9.2",
"thiserror 2.0.17",
"tokio",
+ "tokio-stream",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 1a3cbdb4f..c76235a11 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -150,6 +150,24 @@ mockall = "0.14.0"
nix = { version = "0.30.1", features = ["fs", "resource", "sched"] }
nonzero_lit = "0.1.2"
once_cell = "1.21.3"
+opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
+opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] }
+opentelemetry-otlp = { version = "0.31.0", features = [
+ "logs",
+ "trace",
+ "grpc-tonic",
+ "http",
+ "http-proto",
+ "reqwest-client",
+] }
+opentelemetry-semantic-conventions = "0.31.0"
+opentelemetry_sdk = { version = "0.31.0", features = [
+ "logs",
+ "trace",
+ "experimental_async_runtime",
+ "experimental_logs_batch_log_processor_with_async_runtime",
+ "experimental_trace_batch_span_processor_with_async_runtime",
+] }
parquet = "=55.2.0"
passterm = "=2.0.1"
postcard = { version = "1.1.3", features = ["alloc"] }
@@ -197,6 +215,7 @@ tower-http = { version = "0.6.8", features = [
] }
tracing = "0.1.44"
tracing-appender = "0.2.4"
+tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3.22", default-features = false, features
= [
"fmt",
"env-filter",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 3150cc05e..5404013af 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -390,19 +390,19 @@ iggy: 0.8.1-edge.5, "Apache-2.0",
iggy-bench: 0.3.1-edge.1, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.1-edge.1, "Apache-2.0",
-iggy-connectors: 0.2.1-edge.3, "Apache-2.0",
-iggy-mcp: 0.2.1-edge.3, "Apache-2.0",
+iggy-connectors: 0.2.1-edge.4, "Apache-2.0",
+iggy-mcp: 0.2.1-edge.4, "Apache-2.0",
iggy_binary_protocol: 0.8.1-edge.2, "Apache-2.0",
iggy_common: 0.8.1-edge.1, "Apache-2.0",
-iggy_connector_elasticsearch_sink: 0.1.0, "Apache-2.0",
-iggy_connector_elasticsearch_source: 0.1.0, "Apache-2.0",
-iggy_connector_iceberg_sink: 0.1.0, "Apache-2.0",
-iggy_connector_postgres_sink: 0.1.0, "Apache-2.0",
-iggy_connector_postgres_source: 0.1.0, "Apache-2.0",
-iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0",
-iggy_connector_random_source: 0.1.0, "Apache-2.0",
-iggy_connector_sdk: 0.1.1-edge.1, "Apache-2.0",
-iggy_connector_stdout_sink: 0.1.0, "Apache-2.0",
+iggy_connector_elasticsearch_sink: 0.2.0-edge.1, "Apache-2.0",
+iggy_connector_elasticsearch_source: 0.2.0-edge.1, "Apache-2.0",
+iggy_connector_iceberg_sink: 0.2.0-edge.1, "Apache-2.0",
+iggy_connector_postgres_sink: 0.2.0-edge.1, "Apache-2.0",
+iggy_connector_postgres_source: 0.2.0-edge.1, "Apache-2.0",
+iggy_connector_quickwit_sink: 0.2.0-edge.1, "Apache-2.0",
+iggy_connector_random_source: 0.2.0-edge.1, "Apache-2.0",
+iggy_connector_sdk: 0.1.1-edge.2, "Apache-2.0",
+iggy_connector_stdout_sink: 0.2.0-edge.1, "Apache-2.0",
iggy_examples: 0.0.5, "Apache-2.0",
ignore: 0.4.25, "MIT OR Unlicense",
impl-more: 0.1.9, "Apache-2.0 OR MIT",
diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml
index ca729329d..f8f8c138e 100644
--- a/core/ai/mcp/Cargo.toml
+++ b/core/ai/mcp/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-mcp"
-version = "0.2.1-edge.3"
+version = "0.2.1-edge.4"
description = "MCP Server for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
@@ -35,6 +35,12 @@ figlet-rs = { workspace = true }
figment = { workspace = true }
iggy = { workspace = true }
iggy_common = { workspace = true }
+opentelemetry = { workspace = true }
+opentelemetry-appender-tracing = { workspace = true }
+opentelemetry-otlp = { workspace = true }
+opentelemetry-semantic-conventions = { workspace = true }
+opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
+reqwest = { workspace = true }
rmcp = { version = "0.13.0", features = [
"server",
"transport-io",
@@ -47,6 +53,7 @@ thiserror = { workspace = true }
tokio = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
+tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
[dev-dependencies]
diff --git a/core/ai/mcp/README.md b/core/ai/mcp/README.md
index ae5537180..7c2097815 100644
--- a/core/ai/mcp/README.md
+++ b/core/ai/mcp/README.md
@@ -69,3 +69,21 @@ Here's the example configuration to be used with Claude
Desktop:
**Remember to use the appropriate Iggy account credentials for your
environment** (e.g. create the user with read-only permissions to avoid
modifying the data). On top of this, you can also configure the `permissions`
for the MCP server to control which operations are allowed (this will be
checked first, before forwarding the actual request to the Iggy server).

+
+## Telemetry
+
+The MCP server supports OpenTelemetry for logs and traces. To enable
telemetry, add the following configuration:
+
+```toml
+[telemetry]
+enabled = true
+service_name = "iggy-mcp"
+
+[telemetry.logs]
+transport = "grpc" # Options: "grpc", "http"
+endpoint = "http://localhost:4317"
+
+[telemetry.traces]
+transport = "grpc" # Options: "grpc", "http"
+endpoint = "http://localhost:4317"
+```
diff --git a/core/ai/mcp/config.toml b/core/ai/mcp/config.toml
index 69ff5f349..b034f42e6 100644
--- a/core/ai/mcp/config.toml
+++ b/core/ai/mcp/config.toml
@@ -52,3 +52,15 @@ create = true
read = true
update = true
delete = true
+
+[telemetry]
+enabled = false
+service_name = "iggy-mcp"
+
+[telemetry.logs]
+transport = "grpc"
+endpoint = "http://localhost:4317"
+
+[telemetry.traces]
+transport = "grpc"
+endpoint = "http://localhost:4317"
diff --git a/core/ai/mcp/src/configs.rs b/core/ai/mcp/src/configs.rs
index e29b33092..7ec2846c3 100644
--- a/core/ai/mcp/src/configs.rs
+++ b/core/ai/mcp/src/configs.rs
@@ -26,7 +26,8 @@ use figment::{
use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
use iggy_common::{CustomEnvProvider, FileConfigProvider};
use serde::{Deserialize, Serialize};
-use std::fmt::Formatter;
+use std::fmt::{Display as FmtDisplay, Formatter};
+use std::str::FromStr;
use strum::Display;
use tower_http::cors::{AllowOrigin, CorsLayer};
@@ -37,6 +38,7 @@ pub struct McpServerConfig {
pub iggy: IggyConfig,
pub permissions: PermissionsConfig,
pub transport: McpTransport,
+ pub telemetry: TelemetryConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -101,6 +103,105 @@ pub enum McpTransport {
Stdio,
}
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryConfig {
+ pub enabled: bool,
+ pub service_name: String,
+ pub logs: TelemetryLogsConfig,
+ pub traces: TelemetryTracesConfig,
+}
+
+impl Default for TelemetryConfig {
+ fn default() -> Self {
+ Self {
+ enabled: false,
+ service_name: "iggy-mcp".to_owned(),
+ logs: TelemetryLogsConfig::default(),
+ traces: TelemetryTracesConfig::default(),
+ }
+ }
+}
+
+impl FmtDisplay for TelemetryConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ enabled: {}, service_name: {}, logs: {}, traces: {} }}",
+ self.enabled, self.service_name, self.logs, self.traces
+ )
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryLogsConfig {
+ pub transport: TelemetryTransport,
+ pub endpoint: String,
+}
+
+impl Default for TelemetryLogsConfig {
+ fn default() -> Self {
+ Self {
+ transport: TelemetryTransport::Grpc,
+ endpoint: "http://localhost:4317".to_owned(),
+ }
+ }
+}
+
+impl FmtDisplay for TelemetryLogsConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ transport: {}, endpoint: {} }}",
+ self.transport, self.endpoint
+ )
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryTracesConfig {
+ pub transport: TelemetryTransport,
+ pub endpoint: String,
+}
+
+impl Default for TelemetryTracesConfig {
+ fn default() -> Self {
+ Self {
+ transport: TelemetryTransport::Grpc,
+ endpoint: "http://localhost:4317".to_owned(),
+ }
+ }
+}
+
+impl FmtDisplay for TelemetryTracesConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ transport: {}, endpoint: {} }}",
+ self.transport, self.endpoint
+ )
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)]
+#[serde(rename_all = "lowercase")]
+pub enum TelemetryTransport {
+ #[strum(to_string = "grpc")]
+ Grpc,
+ #[strum(to_string = "http")]
+ Http,
+}
+
+impl FromStr for TelemetryTransport {
+ type Err = String;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "grpc" => Ok(TelemetryTransport::Grpc),
+ "http" => Ok(TelemetryTransport::Http),
+ _ => Err(format!("Invalid telemetry transport: {s}")),
+ }
+ }
+}
+
impl Default for McpServerConfig {
fn default() -> Self {
Self {
@@ -108,6 +209,7 @@ impl Default for McpServerConfig {
iggy: IggyConfig::default(),
permissions: PermissionsConfig::default(),
transport: McpTransport::Http,
+ telemetry: TelemetryConfig::default(),
}
}
}
@@ -238,8 +340,8 @@ impl std::fmt::Display for McpServerConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ http: {}, iggy: {}, permissions: {:?}, transport: {} }}",
- self.http, self.iggy, self.permissions, self.transport
+ "{{ http: {}, iggy: {}, permissions: {:?}, transport: {},
telemetry: {} }}",
+ self.http, self.iggy, self.permissions, self.transport,
self.telemetry
)
}
}
diff --git a/core/ai/mcp/src/log.rs b/core/ai/mcp/src/log.rs
new file mode 100644
index 000000000..7ff0049df
--- /dev/null
+++ b/core/ai/mcp/src/log.rs
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::configs::{McpTransport, TelemetryConfig, TelemetryTransport};
+use opentelemetry::trace::TracerProvider;
+use opentelemetry::{KeyValue, global};
+use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
+use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
+use opentelemetry_sdk::Resource;
+use opentelemetry_sdk::propagation::TraceContextPropagator;
+use tracing::info;
+use tracing_opentelemetry::OpenTelemetryLayer;
+use tracing_subscriber::EnvFilter;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+pub fn init_logging(
+ telemetry_config: &TelemetryConfig,
+ transport: McpTransport,
+ version: &'static str,
+) {
+ // STDIO transport needs stderr output with no ANSI codes
+ // HTTP transport can use normal stdout with ANSI
+ let (default_level, use_stderr, use_ansi) = match transport {
+ McpTransport::Stdio => ("DEBUG", true, false),
+ McpTransport::Http => ("INFO", false, true),
+ };
+
+ let env_filter =
+ EnvFilter::try_from_default_env().unwrap_or_else(|_|
EnvFilter::new(default_level));
+
+ if telemetry_config.enabled {
+ let (logger_provider, tracer_provider) =
init_telemetry(telemetry_config, version);
+
+ let service_name = telemetry_config.service_name.clone();
+ let tracer = tracer_provider.tracer(service_name);
+ global::set_tracer_provider(tracer_provider);
+ global::set_text_map_propagator(TraceContextPropagator::new());
+
+ if use_stderr {
+ let fmt_layer = tracing_subscriber::fmt::layer()
+ .with_writer(std::io::stderr)
+ .with_ansi(use_ansi);
+ let otel_logs_layer =
OpenTelemetryTracingBridge::new(&logger_provider);
+ let otel_traces_layer = OpenTelemetryLayer::new(tracer);
+ tracing_subscriber::registry()
+ .with(env_filter)
+ .with(fmt_layer)
+ .with(otel_logs_layer)
+ .with(otel_traces_layer)
+ .init();
+ } else {
+ let fmt_layer =
tracing_subscriber::fmt::layer().with_ansi(use_ansi);
+ let otel_logs_layer =
OpenTelemetryTracingBridge::new(&logger_provider);
+ let otel_traces_layer = OpenTelemetryLayer::new(tracer);
+ tracing_subscriber::registry()
+ .with(env_filter)
+ .with(fmt_layer)
+ .with(otel_logs_layer)
+ .with(otel_traces_layer)
+ .init();
+ }
+
+ info!(
+ "Logging initialized with telemetry enabled, service name: {}",
+ telemetry_config.service_name
+ );
+ } else if use_stderr {
+ tracing_subscriber::fmt()
+ .with_env_filter(env_filter)
+ .with_writer(std::io::stderr)
+ .with_ansi(use_ansi)
+ .init();
+ } else {
+ tracing_subscriber::registry()
+ .with(tracing_subscriber::fmt::layer().with_ansi(use_ansi))
+ .with(env_filter)
+ .init();
+ }
+}
+
+fn init_telemetry(
+ telemetry_config: &TelemetryConfig,
+ version: &'static str,
+) -> (
+ opentelemetry_sdk::logs::SdkLoggerProvider,
+ opentelemetry_sdk::trace::SdkTracerProvider,
+) {
+ let service_name = telemetry_config.service_name.clone();
+ let resource = Resource::builder()
+ .with_service_name(service_name)
+ .with_attribute(KeyValue::new(
+ opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
+ version,
+ ))
+ .build();
+
+ let logger_provider = init_logs_exporter(telemetry_config,
resource.clone());
+ let tracer_provider = init_traces_exporter(telemetry_config, resource);
+
+ (logger_provider, tracer_provider)
+}
+
+fn init_logs_exporter(
+ telemetry_config: &TelemetryConfig,
+ resource: Resource,
+) -> opentelemetry_sdk::logs::SdkLoggerProvider {
+ match telemetry_config.logs.transport {
+ TelemetryTransport::Grpc =>
opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(
+ opentelemetry_otlp::LogExporter::builder()
+ .with_tonic()
+ .with_endpoint(telemetry_config.logs.endpoint.clone())
+ .build()
+ .expect("Failed to initialize gRPC logger."),
+ )
+ .build(),
+ TelemetryTransport::Http => {
+ let log_exporter = opentelemetry_otlp::LogExporter::builder()
+ .with_http()
+ .with_http_client(reqwest::Client::new())
+ .with_endpoint(telemetry_config.logs.endpoint.clone())
+ .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+ .build()
+ .expect("Failed to initialize HTTP logger.");
+ opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(log_exporter)
+ .build()
+ }
+ }
+}
+
+fn init_traces_exporter(
+ telemetry_config: &TelemetryConfig,
+ resource: Resource,
+) -> opentelemetry_sdk::trace::SdkTracerProvider {
+ match telemetry_config.traces.transport {
+ TelemetryTransport::Grpc =>
opentelemetry_sdk::trace::SdkTracerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(
+ opentelemetry_otlp::SpanExporter::builder()
+ .with_tonic()
+ .with_endpoint(telemetry_config.traces.endpoint.clone())
+ .build()
+ .expect("Failed to initialize gRPC tracer."),
+ )
+ .build(),
+ TelemetryTransport::Http => {
+ let trace_exporter = opentelemetry_otlp::SpanExporter::builder()
+ .with_http()
+ .with_http_client(reqwest::Client::new())
+ .with_endpoint(telemetry_config.traces.endpoint.clone())
+ .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+ .build()
+ .expect("Failed to initialize HTTP tracer.");
+ opentelemetry_sdk::trace::SdkTracerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(trace_exporter)
+ .build()
+ }
+ }
+}
diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs
index b5cfb4281..dbbd1bf4c 100644
--- a/core/ai/mcp/src/main.rs
+++ b/core/ai/mcp/src/main.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -26,14 +27,16 @@ use rmcp::{ServiceExt, model::ErrorData, transport::stdio};
use service::IggyService;
use std::{env, sync::Arc};
use tracing::{error, info};
-use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt,
util::SubscriberInitExt};
mod api;
mod configs;
mod error;
+mod log;
mod service;
mod stream;
+const VERSION: &str = env!("CARGO_PKG_VERSION");
+
const DEFAULT_CONFIG_PATH: &str = "core/ai/mcp/config.toml";
#[tokio::main]
@@ -62,18 +65,7 @@ async fn main() -> Result<(), McpRuntimeError> {
.expect("Failed to load configuration");
let transport = config.transport;
- if transport == McpTransport::Stdio {
- tracing_subscriber::fmt()
-
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("DEBUG")))
- .with_writer(std::io::stderr)
- .with_ansi(false)
- .init();
- } else {
- Registry::default()
- .with(tracing_subscriber::fmt::layer())
-
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
- .init();
- }
+ log::init_logging(&config.telemetry, transport, VERSION);
info!("Starting Iggy MCP Server, transport: {transport}...");
diff --git a/core/connectors/runtime/Cargo.toml
b/core/connectors/runtime/Cargo.toml
index 4b60f74fe..0ec17f5b3 100644
--- a/core/connectors/runtime/Cargo.toml
+++ b/core/connectors/runtime/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy-connectors"
-version = "0.2.1-edge.3"
+version = "0.2.1-edge.4"
description = "Connectors runtime for Iggy message streaming platform"
edition = "2024"
license = "Apache-2.0"
@@ -35,6 +35,7 @@ axum-server = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
dashmap = { workspace = true }
+derive_more = { workspace = true }
dirs = { workspace = true }
dlopen2 = { workspace = true }
dotenvy = { workspace = true }
@@ -47,6 +48,11 @@ iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true }
mimalloc = { workspace = true }
once_cell = { workspace = true }
+opentelemetry = { workspace = true }
+opentelemetry-appender-tracing = { workspace = true }
+opentelemetry-otlp = { workspace = true }
+opentelemetry-semantic-conventions = { workspace = true }
+opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
postcard = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
@@ -62,6 +68,7 @@ tokio = { workspace = true }
toml = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
+tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
[dev-dependencies]
diff --git a/core/connectors/runtime/README.md
b/core/connectors/runtime/README.md
index ac77dc287..8b0d2584c 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -162,3 +162,21 @@ Currently, it does expose the following endpoints:
- `PUT /sources/{key}/configs/active`: activate a specific configuration
version for the source.
- `GET /sources/{key}/configs/plugin`: source plugin config, including the
optional `format` query parameter to specify the config format.
- `GET /sources/{key}/transforms`: source transforms to be applied to the
fields.
+
+## Telemetry
+
+The connector runtime supports OpenTelemetry for logs and traces. To enable
telemetry, add the following configuration:
+
+```toml
+[telemetry]
+enabled = true
+service_name = "iggy-connectors"
+
+[telemetry.logs]
+transport = "grpc" # Options: "grpc", "http"
+endpoint = "http://localhost:4317"
+
+[telemetry.traces]
+transport = "grpc" # Options: "grpc", "http"
+endpoint = "http://localhost:4317"
+```
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/config.toml
index dadcc0f35..abfa1ab50 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -51,3 +51,15 @@ path = "local_state"
[connectors]
config_type = "local"
config_dir = ""
+
+[telemetry]
+enabled = false
+service_name = "iggy-connectors"
+
+[telemetry.logs]
+transport = "grpc"
+endpoint = "http://localhost:4317"
+
+[telemetry.traces]
+transport = "grpc"
+endpoint = "http://localhost:4317"
diff --git a/core/connectors/runtime/src/configs/runtime.rs
b/core/connectors/runtime/src/configs/runtime.rs
index bf81b8651..77d88ef07 100644
--- a/core/connectors/runtime/src/configs/runtime.rs
+++ b/core/connectors/runtime/src/configs/runtime.rs
@@ -18,6 +18,7 @@
*/
use crate::api::config::HttpConfig;
+use derive_more::Display;
use figment::providers::{Format, Toml};
use figment::value::Dict;
use figment::{Metadata, Profile, Provider};
@@ -27,12 +28,112 @@ use serde::{Deserialize, Serialize};
use serde_with::{DisplayFromStr, serde_as};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
+use std::str::FromStr;
const SECRET_KEYS: [&str; 2] = [
"IGGY_CONNECTORS_IGGY_PASSWORD",
"IGGY_CONNECTORS_IGGY_TOKEN",
];
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryConfig {
+ pub enabled: bool,
+ pub service_name: String,
+ pub logs: TelemetryLogsConfig,
+ pub traces: TelemetryTracesConfig,
+}
+
+impl Default for TelemetryConfig {
+ fn default() -> Self {
+ Self {
+ enabled: false,
+ service_name: "iggy-connectors".to_owned(),
+ logs: TelemetryLogsConfig::default(),
+ traces: TelemetryTracesConfig::default(),
+ }
+ }
+}
+
+impl Display for TelemetryConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ enabled: {}, service_name: {}, logs: {}, traces: {} }}",
+ self.enabled, self.service_name, self.logs, self.traces
+ )
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryLogsConfig {
+ pub transport: TelemetryTransport,
+ pub endpoint: String,
+}
+
+impl Default for TelemetryLogsConfig {
+ fn default() -> Self {
+ Self {
+ transport: TelemetryTransport::Grpc,
+ endpoint: "http://localhost:4317".to_owned(),
+ }
+ }
+}
+
+impl Display for TelemetryLogsConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ transport: {}, endpoint: {} }}",
+ self.transport, self.endpoint
+ )
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct TelemetryTracesConfig {
+ pub transport: TelemetryTransport,
+ pub endpoint: String,
+}
+
+impl Default for TelemetryTracesConfig {
+ fn default() -> Self {
+ Self {
+ transport: TelemetryTransport::Grpc,
+ endpoint: "http://localhost:4317".to_owned(),
+ }
+ }
+}
+
+impl Display for TelemetryTracesConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{{ transport: {}, endpoint: {} }}",
+ self.transport, self.endpoint
+ )
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)]
+#[serde(rename_all = "lowercase")]
+pub enum TelemetryTransport {
+ #[display("grpc")]
+ Grpc,
+ #[display("http")]
+ Http,
+}
+
+impl FromStr for TelemetryTransport {
+ type Err = String;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "grpc" => Ok(TelemetryTransport::Grpc),
+ "http" => Ok(TelemetryTransport::Http),
+ _ => Err(format!("Invalid telemetry transport: {s}")),
+ }
+ }
+}
+
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct ConnectorsRuntimeConfig {
@@ -40,6 +141,7 @@ pub struct ConnectorsRuntimeConfig {
pub iggy: IggyConfig,
pub connectors: ConnectorsConfig,
pub state: StateConfig,
+ pub telemetry: TelemetryConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -174,8 +276,8 @@ impl Display for ConnectorsRuntimeConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ http: {}, iggy: {}, connectors: {}, state: {:} }}",
- self.http, self.iggy, self.connectors, self.state
+ "{{ http: {}, iggy: {}, connectors: {}, state: {}, telemetry: {}
}}",
+ self.http, self.iggy, self.connectors, self.state, self.telemetry
)
}
}
diff --git a/core/connectors/runtime/src/log.rs
b/core/connectors/runtime/src/log.rs
new file mode 100644
index 000000000..4eded2aaf
--- /dev/null
+++ b/core/connectors/runtime/src/log.rs
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::configs::runtime::{TelemetryConfig, TelemetryTransport};
+use iggy_connector_sdk::LogCallback;
+use opentelemetry::trace::TracerProvider;
+use opentelemetry::{KeyValue, global};
+use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
+use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
+use opentelemetry_sdk::Resource;
+use opentelemetry_sdk::propagation::TraceContextPropagator;
+use tracing::info;
+use tracing_opentelemetry::OpenTelemetryLayer;
+use tracing_subscriber::EnvFilter;
+use tracing_subscriber::layer::SubscriberExt;
+use tracing_subscriber::util::SubscriberInitExt;
+
+pub fn init_logging(telemetry_config: &TelemetryConfig, version: &'static str)
{
+ let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_|
EnvFilter::new("INFO"));
+
+ let fmt_layer = tracing_subscriber::fmt::layer();
+
+ if telemetry_config.enabled {
+ let (logger_provider, tracer_provider) =
init_telemetry(telemetry_config, version);
+
+ let service_name = telemetry_config.service_name.clone();
+ let tracer = tracer_provider.tracer(service_name);
+ global::set_tracer_provider(tracer_provider);
+ global::set_text_map_propagator(TraceContextPropagator::new());
+
+ let otel_logs_layer =
OpenTelemetryTracingBridge::new(&logger_provider);
+ let otel_traces_layer = OpenTelemetryLayer::new(tracer);
+
+ tracing_subscriber::registry()
+ .with(env_filter)
+ .with(fmt_layer)
+ .with(otel_logs_layer)
+ .with(otel_traces_layer)
+ .init();
+
+ info!(
+ "Logging initialized with telemetry enabled, service name: {}",
+ telemetry_config.service_name
+ );
+ } else {
+ tracing_subscriber::registry()
+ .with(env_filter)
+ .with(fmt_layer)
+ .init();
+ }
+}
+
+fn init_telemetry(
+ telemetry_config: &TelemetryConfig,
+ version: &'static str,
+) -> (
+ opentelemetry_sdk::logs::SdkLoggerProvider,
+ opentelemetry_sdk::trace::SdkTracerProvider,
+) {
+ let service_name = telemetry_config.service_name.clone();
+ let resource = Resource::builder()
+ .with_service_name(service_name)
+ .with_attribute(KeyValue::new(
+ opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
+ version,
+ ))
+ .build();
+
+ let logger_provider = init_logs_exporter(telemetry_config,
resource.clone());
+ let tracer_provider = init_traces_exporter(telemetry_config, resource);
+
+ (logger_provider, tracer_provider)
+}
+
+fn init_logs_exporter(
+ telemetry_config: &TelemetryConfig,
+ resource: Resource,
+) -> opentelemetry_sdk::logs::SdkLoggerProvider {
+ match telemetry_config.logs.transport {
+ TelemetryTransport::Grpc =>
opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(
+ opentelemetry_otlp::LogExporter::builder()
+ .with_tonic()
+ .with_endpoint(telemetry_config.logs.endpoint.clone())
+ .build()
+ .expect("Failed to initialize gRPC logger."),
+ )
+ .build(),
+ TelemetryTransport::Http => {
+ let log_exporter = opentelemetry_otlp::LogExporter::builder()
+ .with_http()
+ .with_http_client(reqwest::Client::new())
+ .with_endpoint(telemetry_config.logs.endpoint.clone())
+ .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+ .build()
+ .expect("Failed to initialize HTTP logger.");
+ opentelemetry_sdk::logs::SdkLoggerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(log_exporter)
+ .build()
+ }
+ }
+}
+
+fn init_traces_exporter(
+ telemetry_config: &TelemetryConfig,
+ resource: Resource,
+) -> opentelemetry_sdk::trace::SdkTracerProvider {
+ match telemetry_config.traces.transport {
+ TelemetryTransport::Grpc =>
opentelemetry_sdk::trace::SdkTracerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(
+ opentelemetry_otlp::SpanExporter::builder()
+ .with_tonic()
+ .with_endpoint(telemetry_config.traces.endpoint.clone())
+ .build()
+ .expect("Failed to initialize gRPC tracer."),
+ )
+ .build(),
+ TelemetryTransport::Http => {
+ let trace_exporter = opentelemetry_otlp::SpanExporter::builder()
+ .with_http()
+ .with_http_client(reqwest::Client::new())
+ .with_endpoint(telemetry_config.traces.endpoint.clone())
+ .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
+ .build()
+ .expect("Failed to initialize HTTP tracer.");
+ opentelemetry_sdk::trace::SdkTracerProvider::builder()
+ .with_resource(resource)
+ .with_batch_exporter(trace_exporter)
+ .build()
+ }
+ }
+}
+
+/// Log callback that routes plugin logs through the runtime's tracing
subscriber.
+/// This function is passed to plugins via FFI so their logs appear in the
runtime's
+/// output and OTEL telemetry.
+pub extern "C" fn runtime_log_callback(
+ level: u8,
+ target_ptr: *const u8,
+ target_len: usize,
+ message_ptr: *const u8,
+ message_len: usize,
+) {
+ let target = unsafe {
+ std::str::from_utf8(std::slice::from_raw_parts(target_ptr, target_len))
+ .unwrap_or("connector")
+ };
+ let message = unsafe {
+ std::str::from_utf8(std::slice::from_raw_parts(message_ptr,
message_len))
+ .unwrap_or("<invalid utf8>")
+ };
+
+ match level {
+ 0 => tracing::trace!(target: "connector", connector_target = target,
message),
+ 1 => tracing::debug!(target: "connector", connector_target = target,
message),
+ 2 => tracing::info!(target: "connector", connector_target = target,
message),
+ 3 => tracing::warn!(target: "connector", connector_target = target,
message),
+ _ => tracing::error!(target: "connector", connector_target = target,
message),
+ }
+}
+
+pub const LOG_CALLBACK: LogCallback = runtime_log_callback;
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index d600fc7eb..ab4a77208 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -41,12 +41,12 @@ use std::{
sync::{Arc, atomic::AtomicU32},
};
use tracing::{debug, info};
-use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt,
util::SubscriberInitExt};
mod api;
pub(crate) mod configs;
pub(crate) mod context;
pub(crate) mod error;
+mod log;
mod manager;
mod sink;
mod source;
@@ -54,6 +54,8 @@ mod state;
mod stream;
mod transform;
+const VERSION: &str = env!("CARGO_PKG_VERSION");
+
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
@@ -73,6 +75,7 @@ struct SourceApi {
config_len: usize,
state_ptr: *const u8,
state_len: usize,
+ log_callback: iggy_connector_sdk::LogCallback,
) -> i32,
handle: extern "C" fn(id: u32, callback: SendCallback) -> i32,
close: extern "C" fn(id: u32) -> i32,
@@ -80,7 +83,12 @@ struct SourceApi {
#[derive(WrapperApi)]
struct SinkApi {
- open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) ->
i32,
+ open: extern "C" fn(
+ id: u32,
+ config_ptr: *const u8,
+ config_len: usize,
+ log_callback: iggy_connector_sdk::LogCallback,
+ ) -> i32,
#[allow(clippy::too_many_arguments)]
consume: extern "C" fn(
id: u32,
@@ -116,20 +124,17 @@ async fn main() -> Result<(), RuntimeError> {
);
}
- Registry::default()
- .with(tracing_subscriber::fmt::layer())
-
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
- .init();
-
let config_path =
env::var("IGGY_CONNECTORS_CONFIG_PATH").unwrap_or_else(|_|
DEFAULT_CONFIG_PATH.to_string());
- info!("Starting Iggy Connectors Runtime, loading configuration from:
{config_path}...");
+ println!("Starting Iggy Connectors Runtime, loading configuration from:
{config_path}...");
let config: ConnectorsRuntimeConfig =
ConnectorsRuntimeConfig::config_provider(config_path)
.load_config()
.await
.expect("Failed to load configuration");
+ log::init_logging(&config.telemetry, VERSION);
+
std::fs::create_dir_all(&config.state.path).expect("Failed to create state
directory");
info!("State will be stored in: {}", config.state.path);
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index c9d8bd3da..3f55d4d8a 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -19,6 +19,7 @@
use crate::configs::connectors::SinkConfig;
use crate::context::RuntimeContext;
+use crate::log::LOG_CALLBACK;
use crate::manager::status::ConnectorStatus;
use crate::{
PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer,
SinkConnectorPlugin,
@@ -319,7 +320,12 @@ fn init_sink(
id: u32,
) -> Result<(), RuntimeError> {
let plugin_config = serde_json::to_string(plugin_config).expect("Invalid
sink plugin config.");
- let result = (container.open)(id, plugin_config.as_ptr(),
plugin_config.len());
+ let result = (container.open)(
+ id,
+ plugin_config.as_ptr(),
+ plugin_config.len(),
+ LOG_CALLBACK,
+ );
if result != 0 {
let err = format!("Plugin initialization failed (ID: {id})");
error!("{err}");
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index 5d6d88744..d366a5de5 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -37,6 +37,7 @@ use tracing::{debug, error, info, trace, warn};
use crate::configs::connectors::SourceConfig;
use crate::context::RuntimeContext;
+use crate::log::LOG_CALLBACK;
use crate::manager::status::ConnectorStatus;
use crate::{
PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
@@ -202,6 +203,7 @@ fn init_source(
plugin_config.len(),
state_ptr,
state_len,
+ LOG_CALLBACK,
);
if result != 0 {
let err = format!("Plugin initialization failed (ID: {id})");
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
index 94c5d6192..72bafef01 100644
--- a/core/connectors/sdk/Cargo.toml
+++ b/core/connectors/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_sdk"
-version = "0.1.1-edge.1"
+version = "0.1.1-edge.2"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index b91ade3dc..16738ef57 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -38,10 +38,12 @@ use tokio::runtime::Runtime;
pub mod decoders;
pub mod encoders;
+pub mod log;
pub mod sink;
pub mod source;
pub mod transforms;
+pub use log::LogCallback;
pub use transforms::Transform;
static RUNTIME: OnceCell<Runtime> = OnceCell::new();
diff --git a/core/connectors/sdk/src/log.rs b/core/connectors/sdk/src/log.rs
new file mode 100644
index 000000000..af0f6fcf9
--- /dev/null
+++ b/core/connectors/sdk/src/log.rs
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tracing::field::{Field, Visit};
+use tracing::{Event, Level, Subscriber};
+use tracing_subscriber::layer::{Context, Layer};
+
+/// Callback for plugins to send log records to the runtime.
+///
+/// # Arguments
+/// * `level` - Log level (0=TRACE, 1=DEBUG, 2=INFO, 3=WARN, 4=ERROR)
+/// * `target_ptr` - Pointer to the target string (module path)
+/// * `target_len` - Length of the target string
+/// * `message_ptr` - Pointer to the message string
+/// * `message_len` - Length of the message string
+pub type LogCallback = extern "C" fn(
+ level: u8,
+ target_ptr: *const u8,
+ target_len: usize,
+ message_ptr: *const u8,
+ message_len: usize,
+);
+
+/// A tracing Layer that forwards events to the runtime via callback.
+/// This allows plugin logs to be routed through the runtime's OTEL subscriber.
+pub struct CallbackLayer {
+ callback: LogCallback,
+}
+
+impl CallbackLayer {
+ pub fn new(callback: LogCallback) -> Self {
+ Self { callback }
+ }
+}
+
+impl<S: Subscriber> Layer<S> for CallbackLayer {
+ fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
+ let level = match *event.metadata().level() {
+ Level::TRACE => 0u8,
+ Level::DEBUG => 1,
+ Level::INFO => 2,
+ Level::WARN => 3,
+ Level::ERROR => 4,
+ };
+
+ let target = event.metadata().target();
+
+ let mut visitor = MessageVisitor::default();
+ event.record(&mut visitor);
+ let message = visitor.message;
+
+ (self.callback)(
+ level,
+ target.as_ptr(),
+ target.len(),
+ message.as_ptr(),
+ message.len(),
+ );
+ }
+}
+
+/// Visitor to extract the message field from a tracing Event
+#[derive(Default)]
+struct MessageVisitor {
+ message: String,
+}
+
+impl Visit for MessageVisitor {
+ fn record_str(&mut self, field: &Field, value: &str) {
+ if field.name() == "message" || self.message.is_empty() {
+ self.message = value.to_string();
+ }
+ }
+
+ fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
+ if field.name() == "message" || self.message.is_empty() {
+ self.message = format!("{:?}", value);
+ }
+ }
+}
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index 8e04f6f05..457618e75 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -21,6 +22,7 @@ use tokio::sync::watch;
use tracing::{error, info};
use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt,
util::SubscriberInitExt};
+use crate::log::{CallbackLayer, LogCallback};
use crate::{ConsumedMessage, MessagesMetadata, RawMessages, Sink,
TopicMetadata, get_runtime};
pub type ConsumeCallback = extern "C" fn(
@@ -56,6 +58,7 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
id: u32,
config_ptr: *const u8,
config_len: usize,
+ log_callback: LogCallback,
factory: F,
) -> i32
where
@@ -64,7 +67,7 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
{
unsafe {
_ = Registry::default()
- .with(tracing_subscriber::fmt::layer())
+ .with(CallbackLayer::new(log_callback))
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
.try_init();
let slice = std::slice::from_raw_parts(config_ptr, config_len);
@@ -219,15 +222,21 @@ macro_rules! sink_connector {
use dashmap::DashMap;
use once_cell::sync::Lazy;
+ use $crate::LogCallback;
use $crate::sink::SinkContainer;
static INSTANCES: Lazy<DashMap<u32, SinkContainer<$type>>> =
Lazy::new(DashMap::new);
#[cfg(not(test))]
#[unsafe(no_mangle)]
- unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len:
usize) -> i32 {
+ unsafe extern "C" fn open(
+ id: u32,
+ config_ptr: *const u8,
+ config_len: usize,
+ log_callback: LogCallback,
+ ) -> i32 {
let mut container = SinkContainer::new(id);
- let result = container.open(id, config_ptr, config_len,
<$type>::new);
+ let result = container.open(id, config_ptr, config_len,
log_callback, <$type>::new);
INSTANCES.insert(id, container);
result
}
diff --git a/core/connectors/sdk/src/source.rs
b/core/connectors/sdk/src/source.rs
index f4f2d1b39..16b45aee7 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -17,6 +17,7 @@
* under the License.
*/
+use crate::log::{CallbackLayer, LogCallback};
use crate::{ConnectorState, Error, Source, get_runtime};
use serde::de::DeserializeOwned;
use std::sync::Arc;
@@ -57,6 +58,7 @@ impl<T: Source + std::fmt::Debug + 'static>
SourceContainer<T> {
/// # Safety
/// Do not copy the configuration pointer
+ #[allow(clippy::too_many_arguments)]
pub unsafe fn open<F, C>(
&mut self,
id: u32,
@@ -64,6 +66,7 @@ impl<T: Source + std::fmt::Debug + 'static>
SourceContainer<T> {
config_len: usize,
state_ptr: *const u8,
state_len: usize,
+ log_callback: LogCallback,
factory: F,
) -> i32
where
@@ -72,7 +75,7 @@ impl<T: Source + std::fmt::Debug + 'static>
SourceContainer<T> {
{
unsafe {
_ = Registry::default()
- .with(tracing_subscriber::fmt::layer())
+ .with(CallbackLayer::new(log_callback))
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
.try_init();
let slice = std::slice::from_raw_parts(config_ptr, config_len);
@@ -207,6 +210,7 @@ macro_rules! source_connector {
use dashmap::DashMap;
use once_cell::sync::Lazy;
+ use $crate::LogCallback;
use $crate::source::SendCallback;
use $crate::source::SourceContainer;
@@ -220,6 +224,7 @@ macro_rules! source_connector {
config_len: usize,
state_ptr: *const u8,
state_len: usize,
+ log_callback: LogCallback,
) -> i32 {
let mut container = SourceContainer::new(id);
let result = container.open(
@@ -228,6 +233,7 @@ macro_rules! source_connector {
config_len,
state_ptr,
state_len,
+ log_callback,
<$type>::new,
);
INSTANCES.insert(id, container);
diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml
b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
index f3224459b..6d238eeca 100644
--- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml
+++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_elasticsearch_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
description = "Iggy Elasticsearch sink connector"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/iceberg_sink/Cargo.toml
b/core/connectors/sinks/iceberg_sink/Cargo.toml
index 4da947bf7..8eb33b345 100644
--- a/core/connectors/sinks/iceberg_sink/Cargo.toml
+++ b/core/connectors/sinks/iceberg_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_iceberg_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming"]
diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml
b/core/connectors/sinks/postgres_sink/Cargo.toml
index b38de8a85..4c9549830 100644
--- a/core/connectors/sinks/postgres_sink/Cargo.toml
+++ b/core/connectors/sinks/postgres_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_postgres_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
description = "Iggy PostgreSQL sink connector for storing stream messages into
PostgreSQL database"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml
b/core/connectors/sinks/quickwit_sink/Cargo.toml
index aa1388581..228b0e3be 100644
--- a/core/connectors/sinks/quickwit_sink/Cargo.toml
+++ b/core/connectors/sinks/quickwit_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_quickwit_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml
b/core/connectors/sinks/stdout_sink/Cargo.toml
index 2797e04ba..ce63bc1ba 100644
--- a/core/connectors/sinks/stdout_sink/Cargo.toml
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_stdout_sink"
-version = "0.1.0"
+version = "0.2.0-edge.1"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml
b/core/connectors/sources/elasticsearch_source/Cargo.toml
index 4acc5f928..e871ba326 100644
--- a/core/connectors/sources/elasticsearch_source/Cargo.toml
+++ b/core/connectors/sources/elasticsearch_source/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_elasticsearch_source"
-version = "0.1.0"
+version = "0.2.0-edge.1"
description = "Iggy Elasticsearch source connector"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sources/postgres_source/Cargo.toml
b/core/connectors/sources/postgres_source/Cargo.toml
index dc6b1dfc1..6c8245707 100644
--- a/core/connectors/sources/postgres_source/Cargo.toml
+++ b/core/connectors/sources/postgres_source/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_postgres_source"
-version = "0.1.0"
+version = "0.2.0-edge.1"
description = "Iggy PostgreSQL source connector supporting CDC and table
polling for message streaming platform"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/connectors/sources/random_source/Cargo.toml
b/core/connectors/sources/random_source/Cargo.toml
index ad645601d..ba5acc657 100644
--- a/core/connectors/sources/random_source/Cargo.toml
+++ b/core/connectors/sources/random_source/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy_connector_random_source"
-version = "0.1.0"
+version = "0.2.0-edge.1"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index d9bd6ac46..26170f137 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -77,24 +77,11 @@ mimalloc = { workspace = true, optional = true }
mime_guess = { version = "2.0", optional = true }
moka = { version = "0.12.12", features = ["future"] }
nix = { workspace = true }
-opentelemetry = { version = "0.31.0", features = ["trace", "logs"] }
-opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] }
-opentelemetry-otlp = { version = "0.31.0", features = [
- "logs",
- "trace",
- "grpc-tonic",
- "http",
- "http-proto",
- "reqwest-client",
-] }
-opentelemetry-semantic-conventions = "0.31.0"
-opentelemetry_sdk = { version = "0.31.0", features = [
- "logs",
- "trace",
- "experimental_async_runtime",
- "experimental_logs_batch_log_processor_with_async_runtime",
- "experimental_trace_batch_span_processor_with_async_runtime",
-] }
+opentelemetry = { workspace = true }
+opentelemetry-appender-tracing = { workspace = true }
+opentelemetry-otlp = { workspace = true }
+opentelemetry-semantic-conventions = { workspace = true }
+opentelemetry_sdk = { workspace = true }
papaya = "0.2.3"
prometheus-client = "0.24.0"
rand = { workspace = true }
@@ -119,7 +106,7 @@ toml = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-appender = { workspace = true }
-tracing-opentelemetry = "0.32.1"
+tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
tungstenite = { workspace = true }
ulid = "1.2.1"
diff --git a/justfile b/justfile
index 578d0e4dd..e03e8656f 100644
--- a/justfile
+++ b/justfile
@@ -48,8 +48,8 @@ nextest: build
nextests TEST: build
cargo nextest run --nocapture -- {{TEST}}
-server:
- cargo run --bin iggy-server
+server *ARGS:
+ cargo run --bin iggy-server {{ARGS}}
run-benches:
./scripts/run-benches.sh