GitHub user hubcio added a comment to the discussion: Proposal: HTTP Source
Connector (Webhook Gateway)
hey @mlevkov, proposal looks good. i ran multiple agents against the connectors
runtime, sdk and your proposal and there are findings. i verified them all
against the code.
tl;dr: the design is sound, but there are runtime bugs you'd inherit that need
fixing first, and a few design decisions that need adjustment before code lands.
## existing runtime bugs
### shutdown ordering drops messages
`manager/source.rs:151` calls `cleanup_sender(plugin_id)` which kills the flume
channel **before** `iggy_source_close` at line 159. the forwarding loop exits
(channel closed), then the plugin gets told to stop. anything in the plugin's
internal buffer or flume channel is gone.
this affects all source connectors. for a webhook gateway it's especially bad
-- you already returned http 200 to the caller, and webhook senders don't retry
on 200.
fix is straightforward: call `iggy_source_close` first, let the forwarding loop
drain, then cleanup the channel.
### unbounded flume channel breaks your backpressure story
`runtime/src/source.rs:430` uses `flume::unbounded()`. your beautiful
ArrayQueue + http 429 backpressure design? it won't work. `poll()` drains the
ArrayQueue into the bottomless flume channel, so the ArrayQueue never fills, so
http 429 never fires. if iggy goes slow, you just accumulate messages until oom.
needs to be `flume::bounded(capacity)`. then the chain works: iggy slow ->
forwarding loop blocks -> flume fills -> ffi callback blocks -> your ArrayQueue
fills -> http 429. this is the missing link.
### FileStateProvider is not crash-safe
`state.rs:90-108` does truncate -> seek -> write -> sync. crash between
truncate and write = empty state file. should be write-to-temp + fsync +
rename. impact is actually narrower than it looks since DirectConfig
`producer.send()` is synchronous (durable to iggy) -- so you get duplicates on
restart, not data loss. still needs fixing though.
### arc::try_unwrap race in source container close
`sdk/source.rs:130` does `Arc::try_unwrap` which can fail if the spawned task
hasn't fully dropped its clone despite `close()` sending the shutdown signal
and awaiting the task handle. returns -1, never calls `source.close()`. for you
that means a leaked bound port on restart. the risk is narrow but real -- if
`poll()` blocks long enough that the task doesn't exit cleanly within the await
window, the arc has multiple owners.
---
## design changes needed
### drop the own-IggyClient for config
i know the event-sourced config topic is elegant, but creating a separate
IggyClient to consume it introduces split-brain risk. you now have two
independent tcp connections with independent failure modes. if the config
connection drops while the main one is healthy, revoked endpoints stay active
indefinitely. that's a security hole.
use the runtime's `ConnectorsConfigProvider` instead. it already handles
reconnection and polling. if you need real-time reactivity, your own axum
server can expose management endpoints (`POST /endpoints`, `DELETE
/endpoints/{id}`). cleaner separation of control plane and data plane.
### fail-closed on stale config is mandatory
if your config source is unreachable beyond a timeout, return 503. not
"continue with last known good." a revoked endpoint staying active because your
config consumer hiccupped is not an acceptable failure mode. document the
staleness bound as an sla.
### delivery semantics need to be front and center
the "async acknowledgment" section kinda buries the lede. http 200 means
"queued in volatile connector memory." messages can be lost on crash, shutdown
race, or oom. webhook providers (github, stripe) don't retry on 200. users need
to understand this upfront, not discover it in a subsection.
suggest making this a top-level section in the readme with a clear statement:
"delivery guarantee: at-most-once. http 200 means accepted for processing, not
durably stored."
### split the code into modules
3600-4650 lines should be: `lib.rs`, `server.rs`, `registry.rs`, `config.rs`,
`auth.rs`, `types.rs`. the http sink at 2130 lines in one file is already
pushing it.
---
## on the three blockers
### blocker 1 (multi-topic routing): go with option (c) for v1
single-topic-per-instance. the runtime already supports multiple plugin
instances per loaded container (`main.rs:433-436`). share one axum server
across instances -- first `open()` binds, others register routes with a shared
routing table. each instance's `poll()` drains its own buffer through the
normal sdk pipeline.
zero sdk changes. zero abi breaks. works today.
the limitation is real though -- the forwarding loop at `source.rs:355` logs
`producer.send()` failures and continues with no feedback to the http handler.
at-most-once is structural here, not a bug. you can't fix it without changing
the architecture.
for v2, we should open a proposal discussion for a `DirectSource`/`PushSource`
trait where the runtime passes `IggyProducer` to the connector and it calls
`producer.send().await` before returning http 200. that gives at-least-once by
construction. the ffi boundary (`IggyProducer` can't cross `extern "C"`) is the
engineering challenge -- options are c abi produce/ack callbacks, shared-memory
ring buffer, or a native rust plugin mode.
don't block v1 on this. ship option (c) with documented limitations. open the
proposal discussion. migrate in v2.
### blocker 2 (shutdown data loss): yes, fix it
confirmed the bug. the fix should be a separate pr since it affects all source
connectors. happy to review.
### blocker 3 (config consumer failure): use ConnectorsConfigProvider
covered above. the event-sourced iggy topic approach is clever but
operationally fragile. runtime's existing config mechanism + management
endpoints on your axum server covers the same use cases without the split-brain
risk.
---
## nice-to-haves (file separately)
- add `ServerError(String)` to the sdk error enum -- you'll need it for bind
failures and tls errors
- document delivery semantics on the `Source::poll()` docstring itself
- clarify `Source::close()` contract -- who's responsible for draining
in-flight messages? (spoiler: the runtime)
---
## summary
| priority | what | scope |
|----------|------|-------|
| p0 | fix shutdown ordering in `manager/source.rs` | all source connectors |
| p0 | bounded flume channel at `source.rs:430` | all source connectors |
| p0 | use ConnectorsConfigProvider, not own IggyClient | this connector |
| p0 | fail-closed on stale config | this connector |
| p0 | document at-most-once delivery prominently | this connector |
| p1 | FileStateProvider atomic rename | all connectors |
| p1 | fix arc::try_unwrap race | all source connectors |
| p1 | multi-module structure | this connector |
| p2 | proposal discussion: DirectSource/PushSource trait | sdk + runtime |
overall: strong proposal, solid engineering. the prerequisite runtime fixes are
the main blocker. once those land, option (c) gets you a shippable v1 with
clear documented limitations, and the PushSource proposal discussion charts the
path to at-least-once in v2.
looking forward to the pr.
GitHub link:
https://github.com/apache/iggy/discussions/3039#discussioncomment-16526083
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]