GitHub user hubcio added a comment to the discussion: Proposal: HTTP Source 
Connector (Webhook Gateway)

hey @mlevkov, proposal looks good. i ran multiple agents against the connectors 
runtime, sdk and your proposal and there are findings. i verified them all 
against the code.

tl;dr: the design is sound, but there are runtime bugs you'd inherit that need 
fixing first, and a few design decisions that need adjustment before code lands.

## existing runtime bugs

### shutdown ordering drops messages

`manager/source.rs:151` calls `cleanup_sender(plugin_id)` which kills the flume 
channel **before** `iggy_source_close` at line 159. the forwarding loop exits 
(channel closed), then the plugin gets told to stop. anything in the plugin's 
internal buffer or flume channel is gone.

this affects all source connectors. for a webhook gateway it's especially bad 
-- you already returned http 200 to the caller, and webhook senders don't retry 
on 200.

fix is straightforward: call `iggy_source_close` first, let the forwarding loop 
drain, then cleanup the channel.

### unbounded flume channel breaks your backpressure story

`runtime/src/source.rs:430` uses `flume::unbounded()`. your beautiful 
ArrayQueue + http 429 backpressure design? it won't work. `poll()` drains the 
ArrayQueue into the bottomless flume channel, so the ArrayQueue never fills, so 
http 429 never fires. if iggy goes slow, you just accumulate messages until oom.

needs to be `flume::bounded(capacity)`. then the chain works: iggy slow -> 
forwarding loop blocks -> flume fills -> ffi callback blocks -> your ArrayQueue 
fills -> http 429. this is the missing link.

### FileStateProvider is not crash-safe

`state.rs:90-108` does truncate -> seek -> write -> sync. crash between 
truncate and write = empty state file. should be write-to-temp + fsync + 
rename. impact is actually narrower than it looks since DirectConfig 
`producer.send()` is synchronous (durable to iggy) -- so you get duplicates on 
restart, not data loss. still needs fixing though.

### arc::try_unwrap race in source container close

`sdk/source.rs:130` does `Arc::try_unwrap` which can fail if the spawned task 
hasn't fully dropped its clone despite `close()` sending the shutdown signal 
and awaiting the task handle. returns -1, never calls `source.close()`. for you 
that means a leaked bound port on restart. the risk is narrow but real -- if 
`poll()` blocks long enough that the task doesn't exit cleanly within the await 
window, the arc has multiple owners.

---

## design changes needed

### drop the own-IggyClient for config

i know the event-sourced config topic is elegant, but creating a separate 
IggyClient to consume it introduces split-brain risk. you now have two 
independent tcp connections with independent failure modes. if the config 
connection drops while the main one is healthy, revoked endpoints stay active 
indefinitely. that's a security hole.

use the runtime's `ConnectorsConfigProvider` instead. it already handles 
reconnection and polling. if you need real-time reactivity, your own axum 
server can expose management endpoints (`POST /endpoints`, `DELETE 
/endpoints/{id}`). cleaner separation of control plane and data plane.

### fail-closed on stale config is mandatory

if your config source is unreachable beyond a timeout, return 503. not 
"continue with last known good." a revoked endpoint staying active because your 
config consumer hiccupped is not an acceptable failure mode. document the 
staleness bound as an sla.

### delivery semantics need to be front and center

the "async acknowledgment" section kinda buries the lede. http 200 means 
"queued in volatile connector memory." messages can be lost on crash, shutdown 
race, or oom. webhook providers (github, stripe) don't retry on 200. users need 
to understand this upfront, not discover it in a subsection.

suggest making this a top-level section in the readme with a clear statement: 
"delivery guarantee: at-most-once. http 200 means accepted for processing, not 
durably stored."

### split the code into modules

3600-4650 lines should be: `lib.rs`, `server.rs`, `registry.rs`, `config.rs`, 
`auth.rs`, `types.rs`. the http sink at 2130 lines in one file is already 
pushing it.

---

## on the three blockers

### blocker 1 (multi-topic routing): go with option (c) for v1

single-topic-per-instance. the runtime already supports multiple plugin 
instances per loaded container (`main.rs:433-436`). share one axum server 
across instances -- first `open()` binds, others register routes with a shared 
routing table. each instance's `poll()` drains its own buffer through the 
normal sdk pipeline.

zero sdk changes. zero abi breaks. works today.

the limitation is real though -- the forwarding loop at `source.rs:355` logs 
`producer.send()` failures and continues with no feedback to the http handler. 
at-most-once is structural here, not a bug. you can't fix it without changing 
the architecture.

for v2, we should open a proposal discussion for a `DirectSource`/`PushSource` 
trait where the runtime passes `IggyProducer` to the connector and it calls 
`producer.send().await` before returning http 200. that gives at-least-once by 
construction. the ffi boundary (`IggyProducer` can't cross `extern "C"`) is the 
engineering challenge -- options are c abi produce/ack callbacks, shared-memory 
ring buffer, or a native rust plugin mode.

don't block v1 on this. ship option (c) with documented limitations. open the 
proposal discussion. migrate in v2.

### blocker 2 (shutdown data loss): yes, fix it

confirmed the bug. the fix should be a separate pr since it affects all source 
connectors. happy to review.

### blocker 3 (config consumer failure): use ConnectorsConfigProvider

covered above. the event-sourced iggy topic approach is clever but 
operationally fragile. runtime's existing config mechanism + management 
endpoints on your axum server covers the same use cases without the split-brain 
risk.

---

## nice-to-haves (file separately)

- add `ServerError(String)` to the sdk error enum -- you'll need it for bind 
failures and tls errors
- document delivery semantics on the `Source::poll()` docstring itself
- clarify `Source::close()` contract -- who's responsible for draining 
in-flight messages? (spoiler: the runtime)

---

## summary

| priority | what | scope |
|----------|------|-------|
| p0 | fix shutdown ordering in `manager/source.rs` | all source connectors |
| p0 | bounded flume channel at `source.rs:430` | all source connectors |
| p0 | use ConnectorsConfigProvider, not own IggyClient | this connector |
| p0 | fail-closed on stale config | this connector |
| p0 | document at-most-once delivery prominently | this connector |
| p1 | FileStateProvider atomic rename | all connectors |
| p1 | fix arc::try_unwrap race | all source connectors |
| p1 | multi-module structure | this connector |
| p2 | proposal discussion: DirectSource/PushSource trait | sdk + runtime |

overall: strong proposal, solid engineering. the prerequisite runtime fixes are 
the main blocker. once those land, option (c) gets you a shippable v1 with 
clear documented limitations, and the PushSource proposal discussion charts the 
path to at-least-once in v2.

looking forward to the pr.

GitHub link: 
https://github.com/apache/iggy/discussions/3039#discussioncomment-16526083

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to