charlesdong1991 commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2890031851
##########
crates/fluss/src/rpc/api_key.rs:
##########
@@ -68,6 +69,7 @@ impl From<i16> for ApiKey {
1035 => ApiKey::GetDatabaseInfo,
1036 => ApiKey::CreatePartition,
1037 => ApiKey::DropPartition,
+ 1026 => ApiKey::InitWriter,
Review Comment:
nit: maybe can move up so it keeps order 😄 same below
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -561,8 +776,96 @@ impl Sender {
)
}
- pub async fn close(&mut self) {
- self.running = false;
+ /// Event-loop sender: drain batches and fire RPCs into a
`FuturesUnordered`,
+ /// then process responses as they arrive. This interleaves drain cycles
with
+ /// response handling — when a fast leader responds, we immediately drain
and
+ /// send more batches for its buckets while slow leaders are still
in-flight.
+ ///
+ /// Better than Java's fire-and-forget + Netty callback threads: same I/O
+ /// overlap, but single-task cooperative multitasking — no cross-thread
+ /// synchronization needed for response handling.
+ pub async fn run_with_shutdown(&self, mut shutdown_rx: mpsc::Receiver<()>)
-> Result<()> {
+ let mut pending: FuturesUnordered<SendFuture<'_>> =
FuturesUnordered::new();
+
+ loop {
+ if pending.is_empty() {
+ // Nothing in-flight: run a full drain cycle. This may briefly
+ // block on writer ID init or metadata refresh — acceptable
here
+ // because there are no pending responses to starve.
+ tokio::select! {
+ result = self.prepare_sends() => {
+ match result {
+ Ok((futures, delay)) => {
+ if futures.is_empty() {
+ // Nothing to drain. Sleep for the
ready-check
+ // delay to avoid busy-spinning.
+ // TODO: add a Notify that append()
signals so we
+ // wake immediately on new batches instead
of
+ // polling on a timer (same as Kafka's
wakeup()).
+ let sleep_ms = delay.unwrap_or(1);
+ tokio::select! {
+ _ = shutdown_rx.recv() => break,
+ _ =
tokio::time::sleep(Duration::from_millis(sleep_ms)) => continue,
+ }
+ }
+ for f in futures {
+ pending.push(f);
+ }
+ }
+ Err(e) => {
+ warn!("Uncaught error in sender drain,
continuing: {e}");
+ tokio::select! {
+ _ = shutdown_rx.recv() => break,
+ _ =
tokio::time::sleep(Duration::from_millis(1)) => continue,
+ }
+ }
+ }
+ }
+ _ = shutdown_rx.recv() => break,
+ }
+ } else {
+ // Sends are in-flight: process responses as they arrive,
+ // then try to drain and send more batches. The prepare_sends
+ // call may briefly await on writer ID init or metadata refresh
+ // in rare cases (writer ID reset, leader change); this is
+ // acceptable because the alternative (skipping the drain)
+ // would delay recovery.
Review Comment:
i am curious what if we wrap this inner call in its own select? will it help
avoid this issue?
something like:
```
Some(result) = pending.next() => {
<warning path we have>
tokio::select! {
result = self.prepare_sends() => { # -> no wait
if let Ok((futures, _)) = result {
for f in futures {
pending.push(f);
}
}
}
<break path>
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]