hubcio commented on code in PR #2925: URL: https://github.com/apache/iggy/pull/2925#discussion_r2936683260
########## core/connectors/sinks/http_sink/src/lib.rs: ########## @@ -0,0 +1,2099 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use base64::Engine; +use base64::engine::general_purpose; +use bytes::Bytes; +use humantime::Duration as HumanDuration; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::{debug, error, info, warn}; + +sink_connector!(HttpSink); + +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_MAX_RETRY_DELAY: &str = "30s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_BACKOFF_MULTIPLIER: f64 = 2.0; +const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB +const DEFAULT_MAX_CONNECTIONS: usize = 10; +/// TCP keep-alive interval for detecting dead connections behind load balancers. +/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s); +/// probing at 30s detects these before requests fail. +const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30; +/// Close pooled connections unused for this long. Prevents stale connections +/// from accumulating when traffic is bursty. +const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90; +/// Abort remaining messages in individual/raw mode after this many consecutive HTTP failures. +/// Prevents hammering a dead endpoint with N sequential retry cycles per poll. +const MAX_CONSECUTIVE_FAILURES: u32 = 3; + +/// HTTP method enum — validated at deserialization, prevents invalid values like "DELEET" or "GETX". +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum HttpMethod { + Get, + Head, + #[default] + Post, + Put, + Patch, + Delete, +} + +/// Payload formatting mode for HTTP requests. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum BatchMode { + /// One HTTP request per message (default). Note: with batch_length=50, this produces 50 + /// sequential HTTP round trips per poll cycle. Use ndjson or json_array for higher throughput. + #[default] + Individual, + /// All messages in one request, newline-delimited JSON. + Ndjson, + /// All messages as a single JSON array. + JsonArray, + /// Raw bytes, one request per message (for non-JSON payloads). + Raw, +} + +/// Configuration for the HTTP sink connector, deserialized from [plugin_config] in config.toml. +#[derive(Debug, Serialize, Deserialize)] +pub struct HttpSinkConfig { + /// Target URL for HTTP requests (required). + pub url: String, + /// HTTP method (default: POST). + pub method: Option<HttpMethod>, + /// Request timeout as a human-readable duration string, e.g. "30s" (default: 30s). + pub timeout: Option<String>, + /// Maximum HTTP body size in bytes (default: 10MB). Set to 0 to disable. + pub max_payload_size_bytes: Option<u64>, + /// Custom HTTP headers. + pub headers: Option<HashMap<String, String>>, + /// Payload formatting mode (default: individual). + pub batch_mode: Option<BatchMode>, + /// Include Iggy metadata envelope in payload (default: true). + pub include_metadata: Option<bool>, + /// Include message checksum in metadata (default: false). + pub include_checksum: Option<bool>, + /// Include origin timestamp in metadata (default: false). + pub include_origin_timestamp: Option<bool>, + /// Enable health check request in open() (default: false). + pub health_check_enabled: Option<bool>, + /// HTTP method for health check (default: HEAD). + pub health_check_method: Option<HttpMethod>, + /// Maximum number of retries for transient errors (default: 3). + pub max_retries: Option<u32>, + /// Retry delay as a human-readable duration string, e.g. "1s" (default: 1s). + pub retry_delay: Option<String>, + /// Backoff multiplier for exponential retry delay (default: 2.0). + pub retry_backoff_multiplier: Option<f64>, + /// Maximum retry delay cap as a human-readable duration string (default: 30s). + pub max_retry_delay: Option<String>, + /// HTTP status codes considered successful (default: [200, 201, 202, 204]). + pub success_status_codes: Option<Vec<u16>>, + /// Accept invalid TLS certificates (default: false). Named to signal danger. + pub tls_danger_accept_invalid_certs: Option<bool>, + /// Maximum idle connections per host (default: 10). + pub max_connections: Option<usize>, + /// Enable verbose request/response logging (default: false). + pub verbose_logging: Option<bool>, +} + +/// HTTP sink connector that delivers consumed messages to any HTTP endpoint. +/// +/// Lifecycle: `new()` → `open()` → `consume()` (repeated) → `close()`. +/// The `reqwest::Client` is built in `open()` (not `new()`) so that config-derived +/// settings (timeout, TLS, connection pool) are applied. This matches the +/// MongoDB/Elasticsearch/PostgreSQL sink initialization pattern. +#[derive(Debug)] +pub struct HttpSink { + id: u32, + url: String, + method: HttpMethod, + timeout: Duration, + max_payload_size_bytes: u64, + headers: HashMap<String, String>, + batch_mode: BatchMode, + include_metadata: bool, + include_checksum: bool, + include_origin_timestamp: bool, + health_check_enabled: bool, + health_check_method: HttpMethod, + max_retries: u32, + retry_delay: Duration, + retry_backoff_multiplier: f64, + max_retry_delay: Duration, + success_status_codes: Vec<u16>, + tls_danger_accept_invalid_certs: bool, + max_connections: usize, + verbose: bool, + /// Initialized in `open()` with config-derived settings. `None` before `open()` is called. + client: Option<reqwest::Client>, + requests_sent: AtomicU64, + messages_delivered: AtomicU64, + errors_count: AtomicU64, + retries_count: AtomicU64, + /// Epoch seconds of last successful HTTP request. + last_success_timestamp: AtomicU64, +} + +/// Parse a human-readable duration string, falling back to a default on failure. +fn parse_duration(input: Option<&str>, default: &str) -> Duration { + let raw = input.unwrap_or(default); + HumanDuration::from_str(raw) + .map(|d| *d) + .unwrap_or_else(|e| { + warn!( + "Invalid duration '{}': {}, using default '{}'", + raw, e, default + ); + *HumanDuration::from_str(default).expect("default duration must be valid") + }) +} + +impl HttpSink { + pub fn new(id: u32, config: HttpSinkConfig) -> Self { + let url = config.url; + let method = config.method.unwrap_or_default(); + let timeout = parse_duration(config.timeout.as_deref(), DEFAULT_TIMEOUT); + let max_payload_size_bytes = config + .max_payload_size_bytes + .unwrap_or(DEFAULT_MAX_PAYLOAD_SIZE); + let headers = config.headers.unwrap_or_default(); + let batch_mode = config.batch_mode.unwrap_or_default(); + let include_metadata = config.include_metadata.unwrap_or(true); + let include_checksum = config.include_checksum.unwrap_or(false); + let include_origin_timestamp = config.include_origin_timestamp.unwrap_or(false); + let health_check_enabled = config.health_check_enabled.unwrap_or(false); + let health_check_method = config.health_check_method.unwrap_or(HttpMethod::Head); + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let retry_backoff_multiplier = config + .retry_backoff_multiplier + .unwrap_or(DEFAULT_BACKOFF_MULTIPLIER) + .max(1.0); + let max_retry_delay = + parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + let success_status_codes = config + .success_status_codes + .unwrap_or_else(|| vec![200, 201, 202, 204]); + let tls_danger_accept_invalid_certs = + config.tls_danger_accept_invalid_certs.unwrap_or(false); + let max_connections = config.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS); + let verbose = config.verbose_logging.unwrap_or(false); + + if retry_delay > max_retry_delay { + warn!( + "HTTP sink ID: {} — retry_delay ({:?}) exceeds max_retry_delay ({:?}). \ + All retry delays will be capped to max_retry_delay.", + id, retry_delay, max_retry_delay, + ); + } + + if tls_danger_accept_invalid_certs { + warn!( + "HTTP sink ID: {} — tls_danger_accept_invalid_certs is enabled. \ + TLS certificate validation is DISABLED.", + id + ); + } + + if batch_mode == BatchMode::Raw && include_metadata { + warn!( + "HTTP sink ID: {} — batch_mode=raw ignores include_metadata. \ + Raw mode sends payload bytes directly without metadata envelope.", + id + ); + } + + if matches!(method, HttpMethod::Get | HttpMethod::Head) + && batch_mode != BatchMode::Individual + { + warn!( + "HTTP sink ID: {} — {:?} with batch_mode={:?} will send a request body. \ + Some servers may reject GET/HEAD requests with a body.", + id, method, batch_mode, + ); + } + + HttpSink { + id, + url, + method, + timeout, + max_payload_size_bytes, + headers, + batch_mode, + include_metadata, + include_checksum, + include_origin_timestamp, + health_check_enabled, + health_check_method, + max_retries, + retry_delay, + retry_backoff_multiplier, + max_retry_delay, + success_status_codes, + tls_danger_accept_invalid_certs, + max_connections, + verbose, + client: None, + requests_sent: AtomicU64::new(0), + messages_delivered: AtomicU64::new(0), + errors_count: AtomicU64::new(0), + retries_count: AtomicU64::new(0), + last_success_timestamp: AtomicU64::new(0), + } + } + + /// Build the `reqwest::Client` from resolved config. + fn build_client(&self) -> Result<reqwest::Client, Error> { + let builder = reqwest::Client::builder() + .timeout(self.timeout) + .pool_max_idle_per_host(self.max_connections) + .pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS)) + .tcp_keepalive(Duration::from_secs(DEFAULT_TCP_KEEPALIVE_SECS)) + .danger_accept_invalid_certs(self.tls_danger_accept_invalid_certs); + + builder + .build() + .map_err(|e| Error::InitError(format!("Failed to build HTTP client: {}", e))) + } + + /// Apply the configured HTTP method to a `reqwest::Client` for the target URL, + /// including custom headers (excluding Content-Type, which is set per-request by batch mode). + fn request_builder(&self, client: &reqwest::Client) -> reqwest::RequestBuilder { + let mut builder = build_request(self.method, client, &self.url); + for (key, value) in &self.headers { + if key.eq_ignore_ascii_case("content-type") { + continue; // Content-Type is set by batch mode in send_with_retry + } + builder = builder.header(key, value); + } + builder + } + + /// Determine the Content-Type header based on batch mode. + fn content_type(&self) -> &'static str { + match self.batch_mode { + BatchMode::Individual | BatchMode::JsonArray => "application/json", + BatchMode::Ndjson => "application/x-ndjson", + BatchMode::Raw => "application/octet-stream", + } + } + + /// Convert a `Payload` to a JSON value for metadata wrapping. + /// Non-JSON payloads are base64-encoded with a `iggy_payload_encoding` marker. + /// + /// Note: All current `Payload` variants produce infallible conversions. + /// The `Result` return type exists as a safety net for future variants. + fn payload_to_json(&self, payload: Payload) -> Result<serde_json::Value, Error> { + match payload { + Payload::Json(value) => { + // Direct structural conversion (not serialization roundtrip). + // Follows the Elasticsearch sink pattern. NaN/Infinity f64 → null. + Ok(owned_value_to_serde_json(&value)) + } + Payload::Text(text) => Ok(serde_json::Value::String(text)), + Payload::Raw(bytes) | Payload::FlatBuffer(bytes) => Ok(serde_json::json!({ + "data": general_purpose::STANDARD.encode(&bytes), + "iggy_payload_encoding": "base64" + })), + Payload::Proto(proto_str) => Ok(serde_json::json!({ + "data": general_purpose::STANDARD.encode(proto_str.as_bytes()), + "iggy_payload_encoding": "base64" + })), + } + } + + /// Build a message envelope with optional metadata wrapping. + fn build_envelope( + &self, + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + payload_json: serde_json::Value, + ) -> serde_json::Value { + if !self.include_metadata { + return payload_json; + } + + let mut metadata = serde_json::json!({ + "iggy_id": format_u128_as_uuid(message.id), + "iggy_offset": message.offset, + "iggy_timestamp": message.timestamp, Review Comment: `iggy_timestamp` (server-assigned) vs `iggy_origin_timestamp` (producer-supplied) -- the difference isn't obvious from the names. these are new public api surface. once shipped, the names become a contract. consider `iggy_server_timestamp` or at least document the distinction prominently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
