mlevkov opened a new issue, #2928:
URL: https://github.com/apache/iggy/issues/2928

   ## Description
   
   The `AutoCommitWhen::PollingMessages` strategy commits consumer group 
offsets **before** `consume()` is called on the sink plugin. This means if 
`consume()` fails (and the failure is already silently discarded per #2927), 
the messages are permanently lost — the consumer group has already advanced 
past them.
   
   ## Sequence
   
   ```
   1. consumer.next()     → messages received
   2. Offsets committed   → consumer group advances  ← PROBLEM
   3. process_messages()  → calls sink consume() via FFI
   4. consume() fails     → messages already committed, lost forever
   ```
   
   ## Location
   
   `core/connectors/runtime/src/sink.rs`:
   
   - **Line 421**: Consumer configured with `AutoCommitWhen::PollingMessages`
     ```rust
     .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
     ```
   
   - **Lines 266-344** (`consume_messages` loop): `consumer.next()` at line 272 
triggers the auto-commit, but `process_messages()` doesn't execute until line 
311.
   
   ## Impact
   
   Combined with #2927 (consume return value discarded), **at-least-once 
delivery is not achievable** with the current runtime for any sink connector. 
If `consume()` fails and the process restarts, those messages will never be 
retried because the offsets have already been committed.
   
   ## Suggested Fix
   
   Change auto-commit strategy to `AutoCommitWhen::ConsumerStopped` or 
`AutoCommit::Disabled`, and commit offsets **after** successful `consume()`:
   
   ```rust
   // Option A: Commit after processing
   .auto_commit(AutoCommit::Disabled)
   // ... in the consume loop, after successful consume():
   consumer.store_offset(offset).await?;
   
   // Option B: Use AfterPollingMessages if available
   .auto_commit(AutoCommit::When(AutoCommitWhen::AfterProcessing))
   ```
   
   The exact API depends on the Iggy SDK's consumer group offset management 
capabilities.
   
   ## Related
   
   - #2927 (consume return value discarded)
   - Discussed in #2901 (HTTP sink connector proposal)
   - HTTP sink PR: #2925


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to