ryerraguntla commented on code in PR #3140: URL: https://github.com/apache/iggy/pull/3140#discussion_r3232988729
########## core/connectors/sources/influxdb_source/src/v3.rs: ########## @@ -0,0 +1,1298 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! InfluxDB V3 source — SQL queries, JSONL responses, Bearer auth. +//! +//! V3 uses strict `> cursor` semantics. DataFusion/Parquet does not guarantee +//! stable ordering for rows that share the same timestamp, so the V2 skip-N +//! approach is not safe here. If all rows in a batch share the same timestamp, +//! the cursor cannot advance — the effective batch size is doubled each poll +//! up to `stuck_batch_cap_factor × batch_size`. If the cap is reached, the +//! circuit breaker is tripped. + +use crate::common::{ + DEFAULT_V3_CURSOR_FIELD, PayloadFormat, Row, RowContext, V3SourceConfig, V3State, + apply_query_params, is_timestamp_after, parse_jsonl_rows, timestamps_equal, validate_cursor, +}; +use base64::{Engine as _, engine::general_purpose}; +use chrono::{DateTime, Utc}; +use iggy_connector_sdk::{Error, ProducedMessage, Schema}; +use reqwest::Url; +use reqwest_middleware::ClientWithMiddleware; +use serde_json::json; +use tracing::warn; +use uuid::Uuid; + +pub(crate) const DEFAULT_STUCK_CAP_FACTOR: u32 = 10; +/// Upper bound for `stuck_batch_cap_factor`. A value of 1000 with batch_size=1000 +/// would issue 1,000,000-row queries before tripping the circuit breaker. +pub(crate) const MAX_STUCK_CAP_FACTOR: u32 = 100; + +/// Hard cap on buffered JSONL response body size. +/// +/// `MAX_STUCK_CAP_FACTOR` can inflate the effective batch to 100 × `batch_size`, +/// making unbounded `response.text()` a real OOM vector under misconfiguration. +/// Streaming stops and returns an error once this many bytes have been read. +const MAX_RESPONSE_BODY_BYTES: usize = 256 * 1024 * 1024; // 256 MiB + +/// InfluxDB V3 query endpoint expects this exact string for JSONL response format. +const QUERY_FORMAT_JSONL: &str = "jsonl"; + +fn build_query(base: &str, query: &str, db: &str) -> Result<(Url, serde_json::Value), Error> { + let url = Url::parse(&format!("{base}/api/v3/query_sql")) + .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB URL: {e}")))?; + let body = json!({ + "db": db, + "q": query, + "format": QUERY_FORMAT_JSONL + }); + Ok((url, body)) +} + +// ── Query execution ─────────────────────────────────────────────────────────── + +pub(crate) async fn run_query( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + cursor: &str, + effective_batch: u32, + offset: u64, +) -> Result<String, Error> { + validate_cursor(cursor)?; + let q = apply_query_params( + &config.query, + cursor, + &effective_batch.to_string(), + &offset.to_string(), /* &str */ + ); + let base = config.url.trim_end_matches('/'); + let (url, body) = build_query(base, &q, &config.db)?; + + let mut response = client + .post(url) + .header("Authorization", auth) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .json(&body) + .send() + .await + .map_err(|e| Error::Storage(format!("InfluxDB V3 query failed: {e}")))?; + + let status = response.status(); + if status.is_success() { + // Stream chunk-by-chunk with a hard byte cap to prevent OOM when + // MAX_STUCK_CAP_FACTOR inflates the effective batch to 100 × batch_size. + if response + .content_length() + .is_some_and(|n| n as usize > MAX_RESPONSE_BODY_BYTES) + { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeds {MAX_RESPONSE_BODY_BYTES} byte cap; \ + reduce batch_size to avoid OOM" + ))); + } + let mut buf: Vec<u8> = Vec::new(); + while let Some(chunk) = response + .chunk() + .await + .map_err(|e| Error::Storage(format!("Failed to read V3 response: {e}")))? + { + buf.extend_from_slice(&chunk); + if buf.len() > MAX_RESPONSE_BODY_BYTES { + return Err(Error::Storage(format!( + "InfluxDB V3 response body exceeded {MAX_RESPONSE_BODY_BYTES} byte cap \ + while streaming; reduce batch_size to avoid OOM" + ))); + } + } + return String::from_utf8(buf) + .map_err(|e| Error::Storage(format!("V3 response body is not valid UTF-8: {e}"))); + } + + let body_text = response + .text() + .await + .unwrap_or_else(|_| "failed to read response body".to_string()); + + // 404 "database not found" means the namespace has not been written to yet; + // treat it as empty rather than a failure so the circuit breaker stays healthy. + // Any other 404 (e.g. "table not found") is a permanent error — don't swallow it. + if status.as_u16() == 404 { + if body_text.to_lowercase().contains("database not found") { + return Ok(String::new()); + } + return Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))); + } + + if iggy_connector_sdk::retry::is_transient_status(status) { + Err(Error::Storage(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } else { + Err(Error::PermanentHttpError(format!( + "InfluxDB V3 query failed with status {status}: {body_text}" + ))) + } +} + +// ── Message building ────────────────────────────────────────────────────────── + +fn build_payload( + row: &Row, + payload_column: Option<&str>, + payload_format: PayloadFormat, + include_metadata: bool, + cursor_field: &str, +) -> Result<Vec<u8>, Error> { + if let Some(col) = payload_column { + let raw = row + .get(col) + .cloned() + .ok_or_else(|| Error::InvalidRecordValue(format!("Missing payload column '{col}'")))?; + return match payload_format { + // raw is already a serde_json::Value — serialize directly, no re-parse. + PayloadFormat::Json => serde_json::to_vec(&raw) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + PayloadFormat::Text => match raw { + serde_json::Value::String(s) => Ok(s.into_bytes()), + other => serde_json::to_vec(&other) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))), + }, + PayloadFormat::Raw => { + let s = raw.as_str().ok_or_else(|| { + Error::InvalidRecordValue(format!( + "Payload column '{col}' must be a string value for Raw format" + )) + })?; + general_purpose::STANDARD.decode(s.as_bytes()).map_err(|e| { + Error::InvalidRecordValue(format!("Failed to decode payload as base64: {e}")) + }) + } + }; + } + + // V3 rows carry typed serde_json::Values — clone directly, no parse_scalar needed. + // When include_metadata=false, exclude the cursor column (timestamp). + let json_row: serde_json::Map<_, _> = row + .iter() + .filter(|(k, _)| include_metadata || k.as_str() != cursor_field) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + serde_json::to_vec(&json_row) + .map_err(|e| Error::Serialization(format!("JSON serialization failed: {e}"))) +} + +/// Compute the next effective batch size when the batch is stuck. +/// Doubles until it reaches `cap`. Returns `None` if already at cap. +pub(crate) fn next_stuck_batch_size(current: u32, base: u32, cap_factor: u32) -> Option<u32> { + let cap = base.saturating_mul(cap_factor); + if current >= cap { + None + } else { + Some(current.saturating_mul(2).min(cap)) + } +} + +// ── Poll ────────────────────────────────────────────────────────────────────── + +pub(crate) struct PollResult { + pub messages: Vec<ProducedMessage>, + pub new_state: V3State, + pub schema: Schema, + /// Set to true when the stuck-timestamp cap was reached and the circuit + /// breaker should be tripped by the caller. + pub trip_circuit_breaker: bool, +} + +// ── Row processing (pure, testable without HTTP) ────────────────────────────── + +/// Normalize a raw timestamp from InfluxDB V3 JSONL into a cursor-safe RFC 3339 string. +/// +/// InfluxDB 3 Core returns timestamps without a timezone suffix and with nanosecond +/// precision (e.g. `"2026-04-26T02:32:20.526360865"`). The only required fix is +/// appending `"Z"` when no timezone suffix is present (InfluxDB always stores UTC). +/// +/// Full nanosecond precision is intentionally preserved — truncating to milliseconds +/// would place the cursor BEFORE the actual row timestamps within the same millisecond, +/// causing `WHERE time > '$cursor'` to re-deliver already-seen rows on subsequent polls. +/// InfluxDB 3's DataFusion SQL engine handles RFC 3339 strings with any number of +/// fractional digits in WHERE clause timestamp comparisons. +fn normalize_v3_timestamp(ts: &str) -> String { + // Fast path: already a valid RFC 3339 timestamp with timezone suffix. + if chrono::DateTime::parse_from_rfc3339(ts).is_ok() { + return ts.to_string(); + } + // Slow path: no timezone suffix — append "Z" to make it RFC 3339 compliant. + format!("{ts}Z") +} + +/// Result of processing a batch of V3 rows into Iggy messages. +#[derive(Debug)] +pub(crate) struct RowProcessingResult { + pub messages: Vec<ProducedMessage>, + pub max_cursor: Option<String>, + /// `true` when every row's `cursor_field` value equals `current_cursor`. + /// Combined with `rows.len() >= effective_batch`, this signals a stuck batch: + /// all returned rows are at the current cursor, meaning the cursor cannot + /// advance with `> cursor` semantics. + pub all_at_cursor: bool, + /// Count of rows whose cursor == max_cursor (for tiebreaker offset). + pub rows_at_max_cursor: u64, +} + +/// Convert a slice of V3 query rows into Iggy messages. +/// +/// Also detects whether all rows share the same cursor value as `ctx.current_cursor` +/// (the `all_at_cursor` flag). The caller uses this together with batch fullness +/// to decide whether to inflate the batch size for the next poll. +/// +/// Unlike V2, V3 uses strict `> cursor` semantics, so there is no row-skipping. +/// All rows in the slice are emitted as messages. +pub(crate) fn process_rows( + rows: &[Row], + ctx: &RowContext<'_>, +) -> Result<RowProcessingResult, Error> { + let mut messages = Vec::with_capacity(rows.len()); + let mut max_cursor: Option<String> = None; + let mut max_cursor_parsed: Option<DateTime<Utc>> = None; // cache parsed form + // Starts true for non-empty batches; flipped to false as soon as any row + // either has a different cursor value or has no cursor field at all. + let mut all_at_cursor = !rows.is_empty(); + // Generate the base UUID once per poll; derive per-message IDs by addition. + // This is O(1) PRNG calls per batch instead of O(n), measurable at batch ≥ 100. + let id_base = Uuid::new_v4().as_u128(); + for row in rows.iter() { + if let Some(raw_cv) = row.get(ctx.cursor_field).and_then(|v| v.as_str()) { + let cv_owned = normalize_v3_timestamp(raw_cv); + let cv = cv_owned.as_str(); + if !timestamps_equal(cv, ctx.current_cursor) { + all_at_cursor = false; + } + validate_cursor(cv)?; + let cv_parsed = cv.parse::<DateTime<Utc>>().ok(); + match (cv_parsed, max_cursor_parsed) { + (Some(new_dt), Some(cur_dt)) if new_dt > cur_dt => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (Some(new_dt), None) => { + max_cursor = Some(cv.to_string()); + max_cursor_parsed = Some(new_dt); + } + (None, _) if max_cursor_parsed.is_none() => { + // Unparsable cursor — still track it (string fallback) if no + // parsable cursor has been seen yet. + max_cursor = Some(cv.to_string()); + } + _ => {} + } + } else { + all_at_cursor = false; + } + + let payload = build_payload( + row, + ctx.payload_col, + ctx.payload_format, + ctx.include_metadata, + ctx.cursor_field, + )?; + messages.push(ProducedMessage { + id: Some(id_base.wrapping_add(messages.len() as u128)), + checksum: None, + timestamp: Some(ctx.now_micros), + origin_timestamp: Some(ctx.now_micros), + headers: None, + payload, + }); + } + + let rows_at_max_cursor = rows + .iter() + .filter(|r| { + max_cursor.as_deref().is_some_and(|mc| { + r.get(ctx.cursor_field) + .and_then(|v| v.as_str()) + .is_some_and(|cv| normalize_v3_timestamp(cv) == mc) + }) + }) + .count() as u64; + + if !rows.is_empty() && max_cursor.is_none() { + return Err(Error::InvalidRecordValue(format!( + "No '{}' field found in any returned row — cursor cannot advance; \ + the connector would re-deliver the same rows on every poll. \ + Ensure your query selects the cursor column.", + ctx.cursor_field + ))); + } + + Ok(RowProcessingResult { + messages, + max_cursor, + all_at_cursor, + rows_at_max_cursor, + }) +} + +pub(crate) async fn poll( + client: &ClientWithMiddleware, + config: &V3SourceConfig, + auth: &str, + state: &V3State, + payload_format: PayloadFormat, + include_metadata: bool, +) -> Result<PollResult, Error> { + // Access config.initial_offset directly (not via the enum accessor) because + // poll() receives &V3SourceConfig — the inner struct — already matched by the + // caller in lib.rs. The enum accessor InfluxDbSourceConfig::initial_offset() + // is not available here. + let cursor = state + .last_timestamp + .clone() + .or_else(|| config.initial_offset.clone()) + .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); + + let base_batch = config.batch_size.unwrap_or(500); + let effective_batch = if state.effective_batch_size == 0 { + base_batch + } else { + state.effective_batch_size + }; + + let response_data = run_query( + client, + config, + auth, + &cursor, + effective_batch, + state.last_timestamp_row_offset, + ) + .await?; + let rows = parse_jsonl_rows(&response_data)?; + + let cap_factor = config + .stuck_batch_cap_factor + .unwrap_or(DEFAULT_STUCK_CAP_FACTOR); + let ctx = RowContext { + cursor_field: config + .cursor_field + .as_deref() + .unwrap_or(DEFAULT_V3_CURSOR_FIELD), + current_cursor: &cursor, + include_metadata, + payload_col: config.payload_column.as_deref(), + payload_format, + now_micros: iggy_common::Utc::now().timestamp_micros() as u64, + }; + + let result = process_rows(&rows, &ctx)?; + + // Stuck-timestamp detection: if every row is at the current cursor + // and the batch was full, inflate and request more next time. + let stuck = result.all_at_cursor && rows.len() >= effective_batch as usize; Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
