fresh-borzoni commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2891554524


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -561,8 +776,96 @@ impl Sender {
         )
     }
 
-    pub async fn close(&mut self) {
-        self.running = false;
+    /// Event-loop sender: drain batches and fire RPCs into a 
`FuturesUnordered`,
+    /// then process responses as they arrive. This interleaves drain cycles 
with
+    /// response handling — when a fast leader responds, we immediately drain 
and
+    /// send more batches for its buckets while slow leaders are still 
in-flight.
+    ///
+    /// Better than Java's fire-and-forget + Netty callback threads: same I/O
+    /// overlap, but single-task cooperative multitasking — no cross-thread
+    /// synchronization needed for response handling.
+    pub async fn run_with_shutdown(&self, mut shutdown_rx: mpsc::Receiver<()>) 
-> Result<()> {
+        let mut pending: FuturesUnordered<SendFuture<'_>> = 
FuturesUnordered::new();
+
+        loop {
+            if pending.is_empty() {
+                // Nothing in-flight: run a full drain cycle. This may briefly
+                // block on writer ID init or metadata refresh — acceptable 
here
+                // because there are no pending responses to starve.
+                tokio::select! {
+                    result = self.prepare_sends() => {
+                        match result {
+                            Ok((futures, delay)) => {
+                                if futures.is_empty() {
+                                    // Nothing to drain. Sleep for the 
ready-check
+                                    // delay to avoid busy-spinning.
+                                    // TODO: add a Notify that append() 
signals so we
+                                    // wake immediately on new batches instead 
of
+                                    // polling on a timer (same as Kafka's 
wakeup()).
+                                    let sleep_ms = delay.unwrap_or(1);
+                                    tokio::select! {
+                                        _ = shutdown_rx.recv() => break,
+                                        _ = 
tokio::time::sleep(Duration::from_millis(sleep_ms)) => continue,
+                                    }
+                                }
+                                for f in futures {
+                                    pending.push(f);
+                                }
+                            }
+                            Err(e) => {
+                                warn!("Uncaught error in sender drain, 
continuing: {e}");
+                                tokio::select! {
+                                    _ = shutdown_rx.recv() => break,
+                                    _ = 
tokio::time::sleep(Duration::from_millis(1)) => continue,
+                                }
+                            }
+                        }
+                    }
+                    _ = shutdown_rx.recv() => break,
+                }
+            } else {
+                // Sends are in-flight: process responses as they arrive,
+                // then try to drain and send more batches. The prepare_sends
+                // call may briefly await on writer ID init or metadata refresh
+                // in rare cases (writer ID reset, leader change); this is
+                // acceptable because the alternative (skipping the drain)
+                // would delay recovery.

Review Comment:
   Ty for the suggestion, but with this we’re still inside the pending.next() 
arm and still awaiting prepare_sends(), so completed responses can still pile 
up, the same starvation problem really
   
   we can fix it with concurrent init task really, I'll try to find time to 
give it a try today



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to