mlevkov opened a new issue, #2927:
URL: https://github.com/apache/iggy/issues/2927
## Description
The connector runtime calls `consume()` on each sink plugin via FFI but
discards the return value. If a sink's `consume()` returns an error (e.g., HTTP
timeout, database connection failure, serialization error), the runtime ignores
it — no logging, no retry, no metric. It proceeds to the next poll cycle as if
delivery succeeded.
## Location
`core/connectors/runtime/src/sink.rs`, lines 585-593:
```rust
(consume)(
plugin_id,
topic_meta.as_ptr(),
topic_meta.len(),
messages_meta.as_ptr(),
messages_meta.len(),
messages.as_ptr(),
messages.len(),
);
```
The FFI function returns an `i32` (0 = success, 1 = error), but the return
value is never checked.
## Impact
This affects **all 7 sink connectors** (Elasticsearch, Iceberg, MongoDB,
PostgreSQL, Quickwit, Stdout, and the new HTTP sink). Every `Err(...)` returned
by any sink's `consume()` implementation is silently swallowed by the runtime.
As a workaround, sink implementations must log errors internally before
returning `Err`. The HTTP sink (#2925) documents this limitation and logs all
errors at `error!` level inside `consume()`.
## Suggested Fix
Check the return value and log at minimum:
```rust
let result = (consume)(
plugin_id,
topic_meta.as_ptr(),
topic_meta.len(),
messages_meta.as_ptr(),
messages_meta.len(),
messages.as_ptr(),
messages.len(),
);
if result != 0 {
error!(
"Sink plugin {} consume() returned error code {}",
plugin_id, result
);
// Optionally: increment error metric, trigger retry, etc.
}
```
## Related
- Discussed in #2901 (HTTP sink connector proposal)
- Related to #2926 (offset commit ordering)
- HTTP sink PR: #2925
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]