This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch connectors-mcp-otel in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 95d91076961d8d1f623db7fe8938e30da49f0dd3 Author: Maciej Modzelewski <[email protected]> AuthorDate: Fri Jan 23 08:14:54 2026 +0100 feat(connectors,mcp): implement logging with telemetry --- Cargo.lock | 15 ++ Cargo.toml | 19 ++ core/ai/mcp/Cargo.toml | 7 + core/ai/mcp/config.toml | 12 ++ core/ai/mcp/src/configs.rs | 108 ++++++++++- core/ai/mcp/src/log.rs | 197 +++++++++++++++++++++ core/ai/mcp/src/main.rs | 20 +-- core/connectors/runtime/Cargo.toml | 7 + core/connectors/runtime/config.toml | 12 ++ core/connectors/runtime/example_config/config.toml | 3 + core/connectors/runtime/src/configs/runtime.rs | 106 ++++++++++- core/connectors/runtime/src/log.rs | 181 +++++++++++++++++++ core/connectors/runtime/src/main.rs | 21 ++- core/connectors/runtime/src/sink.rs | 8 +- core/connectors/runtime/src/source.rs | 2 + core/connectors/sdk/src/lib.rs | 2 + core/connectors/sdk/src/log.rs | 96 ++++++++++ core/connectors/sdk/src/sink.rs | 17 +- core/connectors/sdk/src/source.rs | 8 +- core/server/Cargo.toml | 25 +-- docker-compose.yml | 56 +++--- greptime_users | 1 + otel-collector-config.yaml | 110 ++++++++++++ 23 files changed, 956 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f7fceb1a..69a0d1650 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4578,6 +4578,7 @@ dependencies = [ "chrono", "clap", "dashmap", + "derive_more", "dirs", "dlopen2", "dotenvy", @@ -4590,6 +4591,11 @@ dependencies = [ "iggy_connector_sdk", "mimalloc", "once_cell", + "opentelemetry", + "opentelemetry-appender-tracing", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "postcard", "reqwest", "reqwest-middleware", @@ -4606,6 +4612,7 @@ dependencies = [ "toml 0.9.11+spec-1.1.0", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -4621,6 +4628,12 @@ dependencies = [ "figment", "iggy", "iggy_common", + "opentelemetry", + "opentelemetry-appender-tracing", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "reqwest", "rmcp", "serde", "serde_json", @@ -4630,6 +4643,7 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -6350,6 +6364,7 @@ dependencies = [ "rand 0.9.2", "thiserror 2.0.17", "tokio", + "tokio-stream", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 802dde2d8..ec01b4e00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -150,6 +150,24 @@ mockall = "0.14.0" nix = { version = "0.30.1", features = ["fs", "resource", "sched"] } nonzero_lit = "0.1.2" once_cell = "1.21.3" +opentelemetry = { version = "0.31.0", features = ["trace", "logs"] } +opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] } +opentelemetry-otlp = { version = "0.31.0", features = [ + "logs", + "trace", + "grpc-tonic", + "http", + "http-proto", + "reqwest-client", +] } +opentelemetry-semantic-conventions = "0.31.0" +opentelemetry_sdk = { version = "0.31.0", features = [ + "logs", + "trace", + "experimental_async_runtime", + "experimental_logs_batch_log_processor_with_async_runtime", + "experimental_trace_batch_span_processor_with_async_runtime", +] } parquet = "=55.2.0" passterm = "=2.0.1" postcard = { version = "1.1.3", features = ["alloc"] } @@ -196,6 +214,7 @@ tower-http = { version = "0.6.8", features = [ ] } tracing = "0.1.44" tracing-appender = "0.2.4" +tracing-opentelemetry = "0.32.1" tracing-subscriber = { version = "0.3.22", default-features = false, features = [ "fmt", "env-filter", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index ca729329d..85d60a6f6 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -35,6 +35,12 @@ figlet-rs = { workspace = true } figment = { workspace = true } iggy = { workspace = true } iggy_common = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-appender-tracing = { workspace = true } +opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +reqwest = { workspace = true } rmcp = { version = "0.13.0", features = [ "server", "transport-io", @@ -47,6 +53,7 @@ thiserror = { workspace = true } tokio = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } [dev-dependencies] diff --git a/core/ai/mcp/config.toml b/core/ai/mcp/config.toml index 69ff5f349..b034f42e6 100644 --- a/core/ai/mcp/config.toml +++ b/core/ai/mcp/config.toml @@ -52,3 +52,15 @@ create = true read = true update = true delete = true + +[telemetry] +enabled = false +service_name = "iggy-mcp" + +[telemetry.logs] +transport = "grpc" +endpoint = "http://localhost:4317" + +[telemetry.traces] +transport = "grpc" +endpoint = "http://localhost:4317" diff --git a/core/ai/mcp/src/configs.rs b/core/ai/mcp/src/configs.rs index e29b33092..7ec2846c3 100644 --- a/core/ai/mcp/src/configs.rs +++ b/core/ai/mcp/src/configs.rs @@ -26,7 +26,8 @@ use figment::{ use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}; use iggy_common::{CustomEnvProvider, FileConfigProvider}; use serde::{Deserialize, Serialize}; -use std::fmt::Formatter; +use std::fmt::{Display as FmtDisplay, Formatter}; +use std::str::FromStr; use strum::Display; use tower_http::cors::{AllowOrigin, CorsLayer}; @@ -37,6 +38,7 @@ pub struct McpServerConfig { pub iggy: IggyConfig, pub permissions: PermissionsConfig, pub transport: McpTransport, + pub telemetry: TelemetryConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -101,6 +103,105 @@ pub enum McpTransport { Stdio, } +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryConfig { + pub enabled: bool, + pub service_name: String, + pub logs: TelemetryLogsConfig, + pub traces: TelemetryTracesConfig, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + enabled: false, + service_name: "iggy-mcp".to_owned(), + logs: TelemetryLogsConfig::default(), + traces: TelemetryTracesConfig::default(), + } + } +} + +impl FmtDisplay for TelemetryConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ enabled: {}, service_name: {}, logs: {}, traces: {} }}", + self.enabled, self.service_name, self.logs, self.traces + ) + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryLogsConfig { + pub transport: TelemetryTransport, + pub endpoint: String, +} + +impl Default for TelemetryLogsConfig { + fn default() -> Self { + Self { + transport: TelemetryTransport::Grpc, + endpoint: "http://localhost:4317".to_owned(), + } + } +} + +impl FmtDisplay for TelemetryLogsConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ transport: {}, endpoint: {} }}", + self.transport, self.endpoint + ) + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryTracesConfig { + pub transport: TelemetryTransport, + pub endpoint: String, +} + +impl Default for TelemetryTracesConfig { + fn default() -> Self { + Self { + transport: TelemetryTransport::Grpc, + endpoint: "http://localhost:4317".to_owned(), + } + } +} + +impl FmtDisplay for TelemetryTracesConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ transport: {}, endpoint: {} }}", + self.transport, self.endpoint + ) + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)] +#[serde(rename_all = "lowercase")] +pub enum TelemetryTransport { + #[strum(to_string = "grpc")] + Grpc, + #[strum(to_string = "http")] + Http, +} + +impl FromStr for TelemetryTransport { + type Err = String; + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "grpc" => Ok(TelemetryTransport::Grpc), + "http" => Ok(TelemetryTransport::Http), + _ => Err(format!("Invalid telemetry transport: {s}")), + } + } +} + impl Default for McpServerConfig { fn default() -> Self { Self { @@ -108,6 +209,7 @@ impl Default for McpServerConfig { iggy: IggyConfig::default(), permissions: PermissionsConfig::default(), transport: McpTransport::Http, + telemetry: TelemetryConfig::default(), } } } @@ -238,8 +340,8 @@ impl std::fmt::Display for McpServerConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ http: {}, iggy: {}, permissions: {:?}, transport: {} }}", - self.http, self.iggy, self.permissions, self.transport + "{{ http: {}, iggy: {}, permissions: {:?}, transport: {}, telemetry: {} }}", + self.http, self.iggy, self.permissions, self.transport, self.telemetry ) } } diff --git a/core/ai/mcp/src/log.rs b/core/ai/mcp/src/log.rs new file mode 100644 index 000000000..7cce521e6 --- /dev/null +++ b/core/ai/mcp/src/log.rs @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::configs::{McpTransport, TelemetryConfig, TelemetryTransport}; +use opentelemetry::trace::TracerProvider; +use opentelemetry::{KeyValue, global}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{WithExportConfig, WithHttpConfig}; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use tracing::info; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +pub fn init_logging( + telemetry_config: &TelemetryConfig, + transport: McpTransport, + version: &'static str, +) { + // STDIO transport needs stderr output with no ANSI codes + // HTTP transport can use normal stdout with ANSI + let (default_level, use_stderr, use_ansi) = match transport { + McpTransport::Stdio => ("DEBUG", true, false), + McpTransport::Http => ("INFO", false, true), + }; + + let env_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level)); + + match (telemetry_config.enabled, use_stderr) { + (true, true) => { + let (logger_provider, tracer_provider) = init_telemetry(telemetry_config, version); + + let service_name = telemetry_config.service_name.clone(); + let tracer = tracer_provider.tracer(service_name); + global::set_tracer_provider(tracer_provider); + global::set_text_map_propagator(TraceContextPropagator::new()); + + let fmt_layer = tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .with_ansi(use_ansi); + let otel_logs_layer = OpenTelemetryTracingBridge::new(&logger_provider); + let otel_traces_layer = OpenTelemetryLayer::new(tracer); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .with(otel_logs_layer) + .with(otel_traces_layer) + .init(); + + info!( + "Logging initialized with telemetry enabled, service name: {}", + telemetry_config.service_name + ); + } + (true, false) => { + let (logger_provider, tracer_provider) = init_telemetry(telemetry_config, version); + + let service_name = telemetry_config.service_name.clone(); + let tracer = tracer_provider.tracer(service_name); + global::set_tracer_provider(tracer_provider); + global::set_text_map_propagator(TraceContextPropagator::new()); + + let fmt_layer = tracing_subscriber::fmt::layer().with_ansi(use_ansi); + let otel_logs_layer = OpenTelemetryTracingBridge::new(&logger_provider); + let otel_traces_layer = OpenTelemetryLayer::new(tracer); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .with(otel_logs_layer) + .with(otel_traces_layer) + .init(); + + info!( + "Logging initialized with telemetry enabled, service name: {}", + telemetry_config.service_name + ); + } + (false, true) => { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_writer(std::io::stderr) + .with_ansi(use_ansi) + .init(); + } + (false, false) => { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_ansi(use_ansi)) + .with(env_filter) + .init(); + } + } +} + +fn init_telemetry( + telemetry_config: &TelemetryConfig, + version: &'static str, +) -> ( + opentelemetry_sdk::logs::SdkLoggerProvider, + opentelemetry_sdk::trace::SdkTracerProvider, +) { + let service_name = telemetry_config.service_name.clone(); + let resource = Resource::builder() + .with_service_name(service_name) + .with_attribute(KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_VERSION, + version, + )) + .build(); + + let logger_provider = init_logs_exporter(telemetry_config, resource.clone()); + let tracer_provider = init_traces_exporter(telemetry_config, resource); + + (logger_provider, tracer_provider) +} + +fn init_logs_exporter( + telemetry_config: &TelemetryConfig, + resource: Resource, +) -> opentelemetry_sdk::logs::SdkLoggerProvider { + match telemetry_config.logs.transport { + TelemetryTransport::Grpc => opentelemetry_sdk::logs::SdkLoggerProvider::builder() + .with_resource(resource) + .with_batch_exporter( + opentelemetry_otlp::LogExporter::builder() + .with_tonic() + .with_endpoint(telemetry_config.logs.endpoint.clone()) + .build() + .expect("Failed to initialize gRPC logger."), + ) + .build(), + TelemetryTransport::Http => { + let log_exporter = opentelemetry_otlp::LogExporter::builder() + .with_http() + .with_http_client(reqwest::Client::new()) + .with_endpoint(telemetry_config.logs.endpoint.clone()) + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .build() + .expect("Failed to initialize HTTP logger."); + opentelemetry_sdk::logs::SdkLoggerProvider::builder() + .with_resource(resource) + .with_batch_exporter(log_exporter) + .build() + } + } +} + +fn init_traces_exporter( + telemetry_config: &TelemetryConfig, + resource: Resource, +) -> opentelemetry_sdk::trace::SdkTracerProvider { + match telemetry_config.traces.transport { + TelemetryTransport::Grpc => opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_resource(resource) + .with_batch_exporter( + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(telemetry_config.traces.endpoint.clone()) + .build() + .expect("Failed to initialize gRPC tracer."), + ) + .build(), + TelemetryTransport::Http => { + let trace_exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_http_client(reqwest::Client::new()) + .with_endpoint(telemetry_config.traces.endpoint.clone()) + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .build() + .expect("Failed to initialize HTTP tracer."); + opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_resource(resource) + .with_batch_exporter(trace_exporter) + .build() + } + } +} diff --git a/core/ai/mcp/src/main.rs b/core/ai/mcp/src/main.rs index b5cfb4281..dbbd1bf4c 100644 --- a/core/ai/mcp/src/main.rs +++ b/core/ai/mcp/src/main.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -26,14 +27,16 @@ use rmcp::{ServiceExt, model::ErrorData, transport::stdio}; use service::IggyService; use std::{env, sync::Arc}; use tracing::{error, info}; -use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; mod api; mod configs; mod error; +mod log; mod service; mod stream; +const VERSION: &str = env!("CARGO_PKG_VERSION"); + const DEFAULT_CONFIG_PATH: &str = "core/ai/mcp/config.toml"; #[tokio::main] @@ -62,18 +65,7 @@ async fn main() -> Result<(), McpRuntimeError> { .expect("Failed to load configuration"); let transport = config.transport; - if transport == McpTransport::Stdio { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("DEBUG"))) - .with_writer(std::io::stderr) - .with_ansi(false) - .init(); - } else { - Registry::default() - .with(tracing_subscriber::fmt::layer()) - .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) - .init(); - } + log::init_logging(&config.telemetry, transport, VERSION); info!("Starting Iggy MCP Server, transport: {transport}..."); diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index 4b60f74fe..2ef1770fe 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -35,6 +35,7 @@ axum-server = { workspace = true } chrono = { workspace = true } clap = { workspace = true } dashmap = { workspace = true } +derive_more = { workspace = true } dirs = { workspace = true } dlopen2 = { workspace = true } dotenvy = { workspace = true } @@ -47,6 +48,11 @@ iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } mimalloc = { workspace = true } once_cell = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-appender-tracing = { workspace = true } +opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } postcard = { workspace = true } reqwest = { workspace = true } reqwest-middleware = { workspace = true } @@ -62,6 +68,7 @@ tokio = { workspace = true } toml = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } [dev-dependencies] diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml index dadcc0f35..abfa1ab50 100644 --- a/core/connectors/runtime/config.toml +++ b/core/connectors/runtime/config.toml @@ -51,3 +51,15 @@ path = "local_state" [connectors] config_type = "local" config_dir = "" + +[telemetry] +enabled = false +service_name = "iggy-connectors" + +[telemetry.logs] +transport = "grpc" +endpoint = "http://localhost:4317" + +[telemetry.traces] +transport = "grpc" +endpoint = "http://localhost:4317" diff --git a/core/connectors/runtime/example_config/config.toml b/core/connectors/runtime/example_config/config.toml index 6bcbc9b22..99007e436 100644 --- a/core/connectors/runtime/example_config/config.toml +++ b/core/connectors/runtime/example_config/config.toml @@ -34,6 +34,9 @@ enabled = false cert_file = "core/certs/iggy_cert.pem" key_file = "core/certs/iggy_key.pem" +[telemetry] +enabled = true + [iggy] address = "localhost:8090" username = "iggy" diff --git a/core/connectors/runtime/src/configs/runtime.rs b/core/connectors/runtime/src/configs/runtime.rs index bf81b8651..77d88ef07 100644 --- a/core/connectors/runtime/src/configs/runtime.rs +++ b/core/connectors/runtime/src/configs/runtime.rs @@ -18,6 +18,7 @@ */ use crate::api::config::HttpConfig; +use derive_more::Display; use figment::providers::{Format, Toml}; use figment::value::Dict; use figment::{Metadata, Profile, Provider}; @@ -27,12 +28,112 @@ use serde::{Deserialize, Serialize}; use serde_with::{DisplayFromStr, serde_as}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use std::str::FromStr; const SECRET_KEYS: [&str; 2] = [ "IGGY_CONNECTORS_IGGY_PASSWORD", "IGGY_CONNECTORS_IGGY_TOKEN", ]; +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryConfig { + pub enabled: bool, + pub service_name: String, + pub logs: TelemetryLogsConfig, + pub traces: TelemetryTracesConfig, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + enabled: false, + service_name: "iggy-connectors".to_owned(), + logs: TelemetryLogsConfig::default(), + traces: TelemetryTracesConfig::default(), + } + } +} + +impl Display for TelemetryConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ enabled: {}, service_name: {}, logs: {}, traces: {} }}", + self.enabled, self.service_name, self.logs, self.traces + ) + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryLogsConfig { + pub transport: TelemetryTransport, + pub endpoint: String, +} + +impl Default for TelemetryLogsConfig { + fn default() -> Self { + Self { + transport: TelemetryTransport::Grpc, + endpoint: "http://localhost:4317".to_owned(), + } + } +} + +impl Display for TelemetryLogsConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ transport: {}, endpoint: {} }}", + self.transport, self.endpoint + ) + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TelemetryTracesConfig { + pub transport: TelemetryTransport, + pub endpoint: String, +} + +impl Default for TelemetryTracesConfig { + fn default() -> Self { + Self { + transport: TelemetryTransport::Grpc, + endpoint: "http://localhost:4317".to_owned(), + } + } +} + +impl Display for TelemetryTracesConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ transport: {}, endpoint: {} }}", + self.transport, self.endpoint + ) + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Display, Copy, Clone)] +#[serde(rename_all = "lowercase")] +pub enum TelemetryTransport { + #[display("grpc")] + Grpc, + #[display("http")] + Http, +} + +impl FromStr for TelemetryTransport { + type Err = String; + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s { + "grpc" => Ok(TelemetryTransport::Grpc), + "http" => Ok(TelemetryTransport::Http), + _ => Err(format!("Invalid telemetry transport: {s}")), + } + } +} + #[derive(Debug, Default, Clone, Deserialize, Serialize)] #[serde(default)] pub struct ConnectorsRuntimeConfig { @@ -40,6 +141,7 @@ pub struct ConnectorsRuntimeConfig { pub iggy: IggyConfig, pub connectors: ConnectorsConfig, pub state: StateConfig, + pub telemetry: TelemetryConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -174,8 +276,8 @@ impl Display for ConnectorsRuntimeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ http: {}, iggy: {}, connectors: {}, state: {:} }}", - self.http, self.iggy, self.connectors, self.state + "{{ http: {}, iggy: {}, connectors: {}, state: {}, telemetry: {} }}", + self.http, self.iggy, self.connectors, self.state, self.telemetry ) } } diff --git a/core/connectors/runtime/src/log.rs b/core/connectors/runtime/src/log.rs new file mode 100644 index 000000000..4eded2aaf --- /dev/null +++ b/core/connectors/runtime/src/log.rs @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::configs::runtime::{TelemetryConfig, TelemetryTransport}; +use iggy_connector_sdk::LogCallback; +use opentelemetry::trace::TracerProvider; +use opentelemetry::{KeyValue, global}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{WithExportConfig, WithHttpConfig}; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use tracing::info; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +pub fn init_logging(telemetry_config: &TelemetryConfig, version: &'static str) { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("INFO")); + + let fmt_layer = tracing_subscriber::fmt::layer(); + + if telemetry_config.enabled { + let (logger_provider, tracer_provider) = init_telemetry(telemetry_config, version); + + let service_name = telemetry_config.service_name.clone(); + let tracer = tracer_provider.tracer(service_name); + global::set_tracer_provider(tracer_provider); + global::set_text_map_propagator(TraceContextPropagator::new()); + + let otel_logs_layer = OpenTelemetryTracingBridge::new(&logger_provider); + let otel_traces_layer = OpenTelemetryLayer::new(tracer); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .with(otel_logs_layer) + .with(otel_traces_layer) + .init(); + + info!( + "Logging initialized with telemetry enabled, service name: {}", + telemetry_config.service_name + ); + } else { + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .init(); + } +} + +fn init_telemetry( + telemetry_config: &TelemetryConfig, + version: &'static str, +) -> ( + opentelemetry_sdk::logs::SdkLoggerProvider, + opentelemetry_sdk::trace::SdkTracerProvider, +) { + let service_name = telemetry_config.service_name.clone(); + let resource = Resource::builder() + .with_service_name(service_name) + .with_attribute(KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_VERSION, + version, + )) + .build(); + + let logger_provider = init_logs_exporter(telemetry_config, resource.clone()); + let tracer_provider = init_traces_exporter(telemetry_config, resource); + + (logger_provider, tracer_provider) +} + +fn init_logs_exporter( + telemetry_config: &TelemetryConfig, + resource: Resource, +) -> opentelemetry_sdk::logs::SdkLoggerProvider { + match telemetry_config.logs.transport { + TelemetryTransport::Grpc => opentelemetry_sdk::logs::SdkLoggerProvider::builder() + .with_resource(resource) + .with_batch_exporter( + opentelemetry_otlp::LogExporter::builder() + .with_tonic() + .with_endpoint(telemetry_config.logs.endpoint.clone()) + .build() + .expect("Failed to initialize gRPC logger."), + ) + .build(), + TelemetryTransport::Http => { + let log_exporter = opentelemetry_otlp::LogExporter::builder() + .with_http() + .with_http_client(reqwest::Client::new()) + .with_endpoint(telemetry_config.logs.endpoint.clone()) + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .build() + .expect("Failed to initialize HTTP logger."); + opentelemetry_sdk::logs::SdkLoggerProvider::builder() + .with_resource(resource) + .with_batch_exporter(log_exporter) + .build() + } + } +} + +fn init_traces_exporter( + telemetry_config: &TelemetryConfig, + resource: Resource, +) -> opentelemetry_sdk::trace::SdkTracerProvider { + match telemetry_config.traces.transport { + TelemetryTransport::Grpc => opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_resource(resource) + .with_batch_exporter( + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(telemetry_config.traces.endpoint.clone()) + .build() + .expect("Failed to initialize gRPC tracer."), + ) + .build(), + TelemetryTransport::Http => { + let trace_exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_http_client(reqwest::Client::new()) + .with_endpoint(telemetry_config.traces.endpoint.clone()) + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) + .build() + .expect("Failed to initialize HTTP tracer."); + opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_resource(resource) + .with_batch_exporter(trace_exporter) + .build() + } + } +} + +/// Log callback that routes plugin logs through the runtime's tracing subscriber. +/// This function is passed to plugins via FFI so their logs appear in the runtime's +/// output and OTEL telemetry. +pub extern "C" fn runtime_log_callback( + level: u8, + target_ptr: *const u8, + target_len: usize, + message_ptr: *const u8, + message_len: usize, +) { + let target = unsafe { + std::str::from_utf8(std::slice::from_raw_parts(target_ptr, target_len)) + .unwrap_or("connector") + }; + let message = unsafe { + std::str::from_utf8(std::slice::from_raw_parts(message_ptr, message_len)) + .unwrap_or("<invalid utf8>") + }; + + match level { + 0 => tracing::trace!(target: "connector", connector_target = target, message), + 1 => tracing::debug!(target: "connector", connector_target = target, message), + 2 => tracing::info!(target: "connector", connector_target = target, message), + 3 => tracing::warn!(target: "connector", connector_target = target, message), + _ => tracing::error!(target: "connector", connector_target = target, message), + } +} + +pub const LOG_CALLBACK: LogCallback = runtime_log_callback; diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index d600fc7eb..ab4a77208 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -41,12 +41,12 @@ use std::{ sync::{Arc, atomic::AtomicU32}, }; use tracing::{debug, info}; -use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; mod api; pub(crate) mod configs; pub(crate) mod context; pub(crate) mod error; +mod log; mod manager; mod sink; mod source; @@ -54,6 +54,8 @@ mod state; mod stream; mod transform; +const VERSION: &str = env!("CARGO_PKG_VERSION"); + #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -73,6 +75,7 @@ struct SourceApi { config_len: usize, state_ptr: *const u8, state_len: usize, + log_callback: iggy_connector_sdk::LogCallback, ) -> i32, handle: extern "C" fn(id: u32, callback: SendCallback) -> i32, close: extern "C" fn(id: u32) -> i32, @@ -80,7 +83,12 @@ struct SourceApi { #[derive(WrapperApi)] struct SinkApi { - open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> i32, + open: extern "C" fn( + id: u32, + config_ptr: *const u8, + config_len: usize, + log_callback: iggy_connector_sdk::LogCallback, + ) -> i32, #[allow(clippy::too_many_arguments)] consume: extern "C" fn( id: u32, @@ -116,20 +124,17 @@ async fn main() -> Result<(), RuntimeError> { ); } - Registry::default() - .with(tracing_subscriber::fmt::layer()) - .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) - .init(); - let config_path = env::var("IGGY_CONNECTORS_CONFIG_PATH").unwrap_or_else(|_| DEFAULT_CONFIG_PATH.to_string()); - info!("Starting Iggy Connectors Runtime, loading configuration from: {config_path}..."); + println!("Starting Iggy Connectors Runtime, loading configuration from: {config_path}..."); let config: ConnectorsRuntimeConfig = ConnectorsRuntimeConfig::config_provider(config_path) .load_config() .await .expect("Failed to load configuration"); + log::init_logging(&config.telemetry, VERSION); + std::fs::create_dir_all(&config.state.path).expect("Failed to create state directory"); info!("State will be stored in: {}", config.state.path); diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index c9d8bd3da..3f55d4d8a 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -19,6 +19,7 @@ use crate::configs::connectors::SinkConfig; use crate::context::RuntimeContext; +use crate::log::LOG_CALLBACK; use crate::manager::status::ConnectorStatus; use crate::{ PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, SinkConnectorPlugin, @@ -319,7 +320,12 @@ fn init_sink( id: u32, ) -> Result<(), RuntimeError> { let plugin_config = serde_json::to_string(plugin_config).expect("Invalid sink plugin config."); - let result = (container.open)(id, plugin_config.as_ptr(), plugin_config.len()); + let result = (container.open)( + id, + plugin_config.as_ptr(), + plugin_config.len(), + LOG_CALLBACK, + ); if result != 0 { let err = format!("Plugin initialization failed (ID: {id})"); error!("{err}"); diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 5d6d88744..d366a5de5 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -37,6 +37,7 @@ use tracing::{debug, error, info, trace, warn}; use crate::configs::connectors::SourceConfig; use crate::context::RuntimeContext; +use crate::log::LOG_CALLBACK; use crate::manager::status::ConnectorStatus; use crate::{ PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin, @@ -202,6 +203,7 @@ fn init_source( plugin_config.len(), state_ptr, state_len, + LOG_CALLBACK, ); if result != 0 { let err = format!("Plugin initialization failed (ID: {id})"); diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index b91ade3dc..16738ef57 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -38,10 +38,12 @@ use tokio::runtime::Runtime; pub mod decoders; pub mod encoders; +pub mod log; pub mod sink; pub mod source; pub mod transforms; +pub use log::LogCallback; pub use transforms::Transform; static RUNTIME: OnceCell<Runtime> = OnceCell::new(); diff --git a/core/connectors/sdk/src/log.rs b/core/connectors/sdk/src/log.rs new file mode 100644 index 000000000..af0f6fcf9 --- /dev/null +++ b/core/connectors/sdk/src/log.rs @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use tracing::field::{Field, Visit}; +use tracing::{Event, Level, Subscriber}; +use tracing_subscriber::layer::{Context, Layer}; + +/// Callback for plugins to send log records to the runtime. +/// +/// # Arguments +/// * `level` - Log level (0=TRACE, 1=DEBUG, 2=INFO, 3=WARN, 4=ERROR) +/// * `target_ptr` - Pointer to the target string (module path) +/// * `target_len` - Length of the target string +/// * `message_ptr` - Pointer to the message string +/// * `message_len` - Length of the message string +pub type LogCallback = extern "C" fn( + level: u8, + target_ptr: *const u8, + target_len: usize, + message_ptr: *const u8, + message_len: usize, +); + +/// A tracing Layer that forwards events to the runtime via callback. +/// This allows plugin logs to be routed through the runtime's OTEL subscriber. +pub struct CallbackLayer { + callback: LogCallback, +} + +impl CallbackLayer { + pub fn new(callback: LogCallback) -> Self { + Self { callback } + } +} + +impl<S: Subscriber> Layer<S> for CallbackLayer { + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + let level = match *event.metadata().level() { + Level::TRACE => 0u8, + Level::DEBUG => 1, + Level::INFO => 2, + Level::WARN => 3, + Level::ERROR => 4, + }; + + let target = event.metadata().target(); + + let mut visitor = MessageVisitor::default(); + event.record(&mut visitor); + let message = visitor.message; + + (self.callback)( + level, + target.as_ptr(), + target.len(), + message.as_ptr(), + message.len(), + ); + } +} + +/// Visitor to extract the message field from a tracing Event +#[derive(Default)] +struct MessageVisitor { + message: String, +} + +impl Visit for MessageVisitor { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "message" || self.message.is_empty() { + self.message = value.to_string(); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" || self.message.is_empty() { + self.message = format!("{:?}", value); + } + } +} diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs index 8e04f6f05..457618e75 100644 --- a/core/connectors/sdk/src/sink.rs +++ b/core/connectors/sdk/src/sink.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -21,6 +22,7 @@ use tokio::sync::watch; use tracing::{error, info}; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; +use crate::log::{CallbackLayer, LogCallback}; use crate::{ConsumedMessage, MessagesMetadata, RawMessages, Sink, TopicMetadata, get_runtime}; pub type ConsumeCallback = extern "C" fn( @@ -56,6 +58,7 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> { id: u32, config_ptr: *const u8, config_len: usize, + log_callback: LogCallback, factory: F, ) -> i32 where @@ -64,7 +67,7 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> { { unsafe { _ = Registry::default() - .with(tracing_subscriber::fmt::layer()) + .with(CallbackLayer::new(log_callback)) .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) .try_init(); let slice = std::slice::from_raw_parts(config_ptr, config_len); @@ -219,15 +222,21 @@ macro_rules! sink_connector { use dashmap::DashMap; use once_cell::sync::Lazy; + use $crate::LogCallback; use $crate::sink::SinkContainer; static INSTANCES: Lazy<DashMap<u32, SinkContainer<$type>>> = Lazy::new(DashMap::new); #[cfg(not(test))] #[unsafe(no_mangle)] - unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: usize) -> i32 { + unsafe extern "C" fn open( + id: u32, + config_ptr: *const u8, + config_len: usize, + log_callback: LogCallback, + ) -> i32 { let mut container = SinkContainer::new(id); - let result = container.open(id, config_ptr, config_len, <$type>::new); + let result = container.open(id, config_ptr, config_len, log_callback, <$type>::new); INSTANCES.insert(id, container); result } diff --git a/core/connectors/sdk/src/source.rs b/core/connectors/sdk/src/source.rs index f4f2d1b39..16b45aee7 100644 --- a/core/connectors/sdk/src/source.rs +++ b/core/connectors/sdk/src/source.rs @@ -17,6 +17,7 @@ * under the License. */ +use crate::log::{CallbackLayer, LogCallback}; use crate::{ConnectorState, Error, Source, get_runtime}; use serde::de::DeserializeOwned; use std::sync::Arc; @@ -57,6 +58,7 @@ impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> { /// # Safety /// Do not copy the configuration pointer + #[allow(clippy::too_many_arguments)] pub unsafe fn open<F, C>( &mut self, id: u32, @@ -64,6 +66,7 @@ impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> { config_len: usize, state_ptr: *const u8, state_len: usize, + log_callback: LogCallback, factory: F, ) -> i32 where @@ -72,7 +75,7 @@ impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> { { unsafe { _ = Registry::default() - .with(tracing_subscriber::fmt::layer()) + .with(CallbackLayer::new(log_callback)) .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) .try_init(); let slice = std::slice::from_raw_parts(config_ptr, config_len); @@ -207,6 +210,7 @@ macro_rules! source_connector { use dashmap::DashMap; use once_cell::sync::Lazy; + use $crate::LogCallback; use $crate::source::SendCallback; use $crate::source::SourceContainer; @@ -220,6 +224,7 @@ macro_rules! source_connector { config_len: usize, state_ptr: *const u8, state_len: usize, + log_callback: LogCallback, ) -> i32 { let mut container = SourceContainer::new(id); let result = container.open( @@ -228,6 +233,7 @@ macro_rules! source_connector { config_len, state_ptr, state_len, + log_callback, <$type>::new, ); INSTANCES.insert(id, container); diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3ba2cecb5..b98765b5b 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -78,24 +78,11 @@ mimalloc = { workspace = true, optional = true } mime_guess = { version = "2.0", optional = true } moka = { version = "0.12.12", features = ["future"] } nix = { workspace = true } -opentelemetry = { version = "0.31.0", features = ["trace", "logs"] } -opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] } -opentelemetry-otlp = { version = "0.31.0", features = [ - "logs", - "trace", - "grpc-tonic", - "http", - "http-proto", - "reqwest-client", -] } -opentelemetry-semantic-conventions = "0.31.0" -opentelemetry_sdk = { version = "0.31.0", features = [ - "logs", - "trace", - "experimental_async_runtime", - "experimental_logs_batch_log_processor_with_async_runtime", - "experimental_trace_batch_span_processor_with_async_runtime", -] } +opentelemetry = { workspace = true } +opentelemetry-appender-tracing = { workspace = true } +opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } +opentelemetry_sdk = { workspace = true } papaya = "0.2.3" prometheus-client = "0.24.0" rand = { workspace = true } @@ -120,7 +107,7 @@ toml = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } tracing-appender = { workspace = true } -tracing-opentelemetry = "0.32.1" +tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } tungstenite = { workspace = true } ulid = "1.2.1" diff --git a/docker-compose.yml b/docker-compose.yml index afc9e14e6..4c377e66d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,34 +16,40 @@ # under the License. services: - iggy-server: - build: - context: . - dockerfile: Dockerfile - container_name: iggy-server + greptime: + image: "greptime/greptimedb:latest" + container_name: greptime restart: unless-stopped - cap_add: - - SYS_NICE - security_opt: - - seccomp:unconfined - ulimits: - memlock: - soft: -1 - hard: -1 - networks: - - iggy + command: "standalone start --http-addr=0.0.0.0:4000 --rpc-addr 0.0.0.0:4001 --postgres-addr 0.0.0.0:4003 --user-provider=static_user_provider:file:./greptime_users" ports: - - 3000:3000 - - 8080:8080 - - 8090:8090 - - 8092:8092 + - "4000:4000" + - "4001:4001" + - "4003:4003" + networks: + - iggy-net volumes: - - iggy-server:/local_data + - greptime:/tmp/greptimedb + - ./greptime_users:/greptime_users -volumes: - iggy-server: - driver: local + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + container_name: otel-collector + restart: unless-stopped + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + - /var/log:/var/log:ro + ports: + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + - "13133:13133" # health_check extension + depends_on: + - greptime + networks: + - iggy-net networks: - iggy: - name: iggy-network + iggy-net: + +volumes: + greptime: diff --git a/greptime_users b/greptime_users new file mode 100644 index 000000000..227e38635 --- /dev/null +++ b/greptime_users @@ -0,0 +1 @@ +root=secret diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml new file mode 100644 index 000000000..4b9c121bf --- /dev/null +++ b/otel-collector-config.yaml @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + + filelog: + include: [/var/log/*.log] + start_at: end + operators: + - type: add + field: attributes.service_name + value: "system-logs" + +extensions: + basicauth/client: + client_auth: + username: root + password: secret + health_check: + endpoint: 0.0.0.0:13133 + +processors: + batch: + timeout: 1s + send_batch_size: 1024 + send_batch_max_size: 2048 + + memory_limiter: + limit_mib: 512 + check_interval: 1s + + resource: + attributes: + - key: service.name + from_attribute: service_name + action: upsert + - key: host.name + value: "docker-host" + action: upsert + +exporters: + otlphttp/traces: + endpoint: "http://greptime:4000/v1/otlp" + auth: + authenticator: basicauth/client + headers: + # x-greptime-db-name: '<your_db_name>' + x-greptime-pipeline-name: "greptime_trace_v1" + tls: + insecure: true + otlphttp/logs: + endpoint: "http://greptime:4000/v1/otlp" + auth: + authenticator: basicauth/client + headers: + # x-greptime-db-name: '<your_db_name>' + # x-greptime-log-table-name: '<table_name>' + # x-greptime-pipeline-name: '<pipeline_name>' + tls: + insecure: true + + otlphttp/metrics: + endpoint: "http://greptime:4000/v1/otlp" + auth: + authenticator: basicauth/client + headers: + # x-greptime-db-name: '<your_db_name>' + tls: + insecure: true + + debug: + verbosity: basic + sampling_initial: 5 + sampling_thereafter: 200 + +service: + extensions: [basicauth/client, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [otlphttp/traces, debug] + logs: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [otlphttp/logs, debug] + metrics: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [otlphttp/metrics, debug]
