ryerraguntla opened a new pull request, #2933:
URL: https://github.com/apache/iggy/pull/2933
Adds a native InfluxDB connector for Apache Iggy resolving #2700.
### New crates:
- `iggy_connector_influxdb_sink` — consumes Iggy messages and writes them to
InfluxDB 2.x via line protocol (`/api/v2/write`)
- `iggy_connector_influxdb_source` — polls InfluxDB Flux query results and
produces messages to Iggy topics
### Features:
- Batch writes / cursor-based polling
- Exponential backoff + jitter on retries
- Circuit breaker after consecutive failures
- HTTP 429 Retry-After header support
- Configurable precision (`ns`, `us`, `ms`, `s`)
- Full integration tests using Testcontainers (influxdb:2.7-alpine)
Closes #2700
## What changed?
Implemented the following features based on a good connector framework
**InfluxDB Sink Connector**
Writes Iggy stream messages to InfluxDB v2 using the line protocol write API.
Data writing — Messages are serialised as line protocol with correct
escaping for measurements, tag values, and string fields. Payloads can be
written as JSON (validated and escaped), UTF-8 text, or Base64-encoded raw
bytes. Timestamps are converted from Iggy's microsecond epoch to the configured
InfluxDB precision (ns/us/ms/s, default µs). An offset-based nanosecond blend
prevents silent deduplication when multiple messages in the same batch share
the same microsecond timestamp. If a producer sets timestamp=0, the connector
falls back to SystemTime::now() to avoid Year-1970 points.
Resilience — Writes are retried up to max_retries (default 3) on HTTP 429
and 5xx responses using exponential backoff with ±20% jitter. The Retry-After
header is honoured in both integer-seconds and RFC 7231 HTTP-date formats
before falling back to own backoff. A circuit breaker opens after
circuit_breaker_threshold (default 5) consecutive batch failures and holds for
circuit_breaker_cool_down (default 30s) before a half-open probe. Batch errors
are captured and propagated to the runtime after processing all remaining
sub-batches, preventing silent data loss.
Startup — open() retries the InfluxDB /health endpoint up to
max_open_retries (default 10) with capped exponential backoff, so the connector
recovers from transient InfluxDB restarts without manual intervention.
Metadata — Stream name, topic, and partition can be written as InfluxDB
tags; message checksum and origin timestamp as fields. Each is individually
togglable. The measurement name is configurable (default iggy_messages).
Verbose logging mode promotes per-batch diagnostics from debug! to info!
without code changes.
**InfluxDB Source Connector**
Polls InfluxDB v2 via the Flux query API and publishes results to Iggy
topics as structured JSON messages.
Incremental polling — Tracks max(_time) from each Flux response as a cursor,
advanced by 1 ns before storing to prevent boundary-point re-delivery. The
cursor is templated into any $cursor and $limit placeholder in the
user-supplied Flux query, supporting arbitrary range, filter, pivot, and
aggregation chains. The cursor and row count are persisted via ConnectorState
so polling resumes exactly where it left off after a restart.
CSV parsing — Handles InfluxDB annotated CSV output: skips #group,
#datatype, and #default annotation rows, detects header rows by field names,
and correctly handles blank-line separators between multi-table results.
Message format — Each row is published as a structured JSON envelope
{measurement, field, timestamp, value, row} by default. When payload_column is
set, the raw field value is extracted and emitted in the configured format
(JSON, Text, or Raw/Base64).
Resilience — Same exponential backoff, jitter, Retry-After parsing, and
circuit breaker as the sink. open() retries the /health endpoint before
declaring ready. close() drops the reqwest client to release all connection
pool resources.
Configuration — cursor_field (default _time) and initial_offset allow
operators to replay from any point in the bucket. poll_interval, batch_size,
and timeout are all tunable. Verbose logging toggles per-poll diagnostics
between debug! and info!.
## Local Execution
- Passed
- Pre-commit hooks ran
## AI Usage
If AI tools were used, please answer:
1. Which tools? GitHub Copilot, Claude
2. Scope of usage? autocomplete, generated functions
3. How did you verify the generated code works correctly? - Yes Verified by
testing
4. Can you explain every line of the code if asked? Yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]