atharvalade opened a new issue, #2956:
URL: https://github.com/apache/iggy/issues/2956

   ## Amazon S3 Sink Connector
   
   ### Summary
   
   A native Rust sink connector that writes messages from Iggy streams to 
Amazon S3 and S3-compatible stores (MinIO, DigitalOcean Spaces, Backblaze B2, 
Cloudflare R2). Listed as P0 in the connector ecosystem tracker (#2756). No 
existing implementation or active PR.
   
   ### Why This Matters
   
   Without a native S3 sink, users who want to archive stream data, feed data 
lake pipelines, or stage for cloud warehouses (Snowflake, Redshift, Athena) 
must write custom consumers. The Iceberg sink (#2191) writes to S3 internally 
but requires a catalog server and targets the Iceberg table format, not raw 
file export. This connector covers the straightforward "stream messages to S3 
files" use case that every competing platform (Kafka Connect, Pulsar IO, Flink) 
already ships.
   
   ### Scope
   
   Sink only (Iggy to S3). A source connector (S3 to Iggy) can follow in a 
separate issue.
   
   ### Proposed Config
   
   ```toml
   type = "sink"
   key = "s3"
   enabled = true
   version = 0
   name = "S3 sink"
   path = "../../target/release/libiggy_connector_s3_sink"
   verbose = false
   
   [[streams]]
   stream = "application_logs"
   topics = ["api_requests", "errors"]
   schema = "json"
   batch_length = 1000
   poll_interval = "100ms"
   consumer_group = "s3_sink"
   
   [plugin_config]
   bucket = "my-data-lake"
   prefix = "iggy/raw"
   region = "us-east-1"
   # endpoint = "http://localhost:9000";       # for MinIO / S3-compatible
   # access_key_id = "AKIA..."               # omit to use instance profile / 
env vars
   # secret_access_key = "..."               # omit to use instance profile / 
env vars
   
   # File layout: {stream}, {topic}, {partition}, {date}, {hour}, {timestamp}
   path_template = "{stream}/{topic}/{date}/{hour}"
   
   # Rotation: write a new file when this threshold is hit
   file_rotation = "size"                     # "size" | "time" | "messages"
   max_file_size = "8MiB"
   # max_file_age = "5m"
   # max_messages_per_file = 10000
   
   output_format = "json_lines"               # "json_lines" | "json_array" | 
"raw"
   # compression = "none"                     # "none" | "gzip" | "zstd"
   include_metadata = true
   include_headers = true
   max_retries = 3
   retry_delay = "1s"
   ```
   
   ### Example Output
   
   Writing `api_requests` messages on 2026-03-16 at 14:00 UTC produces:
   
   ```
   
s3://my-data-lake/iggy/raw/application_logs/api_requests/2026-03-16/14/000001.jsonl
   ```
   
   Each line:
   
   ```json
   
{"offset":42,"timestamp":"2026-03-16T14:02:31Z","stream":"application_logs","topic":"api_requests","partition_id":1,"payload":{"method":"GET","path":"/api/users","status":200}}
   ```
   
   ### Design Questions
   
   **1. `aws-sdk-s3` vs Apache OpenDAL?**
   
   `aws-sdk-s3` is lighter and covers S3 + all S3-compatible stores via custom 
endpoint. Apache OpenDAL (#1419, with @Xuanwo already engaged) would give 
native GCS/Azure/HDFS support through one abstraction. Tiered storage (#1419) 
is a separate feature (Iggy internal persistence) but sharing the same storage 
layer across both could be a good long-term call. My leaning: start with 
`aws-sdk-s3`, migrate to OpenDAL later if adopted project-wide.
   
   **2. Buffering: memory vs temp file?**
   
   Messages arrive in small batches. We buffer until rotation triggers, then 
upload. At 8MiB default file size, in-memory buffering seems fine. Want to 
confirm this fits the connector runtime's expectations.
   
   **3. Deduplication on crash recovery?**
   
   If the connector crashes before offset commit, messages get re-delivered. 
Proposal: use deterministic S3 keys based on offset ranges (e.g. 
`.../{offset_start}-{offset_end}.jsonl`) so re-uploads are idempotent 
overwrites. No extra state needed.
   
   ### Implementation Plan
   
   1. Scaffold `core/connectors/sinks/s3_sink/` following the postgres_sink 
pattern
   2. `S3SinkConfig` struct, `S3Sink` struct implementing `Sink` trait
   3. `open()`: create S3 client, HeadBucket connectivity check
   4. `consume()`: buffer messages, rotate and upload when threshold hit
   5. `close()`: flush remaining buffer
   6. Path template rendering, compression, output formatting
   7. Unit tests (config, path templates, rotation, formatting)
   8. Integration test with MinIO in Docker
   9. README + example config
   
   ### References
   
   - Connector Ecosystem Tracker: #2756 (P0 priority)
   - Tiered Storage / OpenDAL: #1419
   - Existing sinks for reference: `core/connectors/sinks/postgres_sink/`, 
`core/connectors/sinks/iceberg_sink/`
   - Sink trait: `core/connectors/sdk/src/lib.rs`
   
   Please share your thoughts @hubcio @kparisa 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to