fresh-borzoni commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2869130499
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -561,8 +776,96 @@ impl Sender {
)
}
- pub async fn close(&mut self) {
- self.running = false;
+ /// Event-loop sender: drain batches and fire RPCs into a
`FuturesUnordered`,
+ /// then process responses as they arrive. This interleaves drain cycles
with
+ /// response handling — when a fast leader responds, we immediately drain
and
+ /// send more batches for its buckets while slow leaders are still
in-flight.
+ ///
+ /// Better than Java's fire-and-forget + Netty callback threads: same I/O
+ /// overlap, but single-task cooperative multitasking — no cross-thread
+ /// synchronization needed for response handling.
+ pub async fn run_with_shutdown(&self, mut shutdown_rx: mpsc::Receiver<()>)
-> Result<()> {
+ let mut pending: FuturesUnordered<SendFuture<'_>> =
FuturesUnordered::new();
+
+ loop {
+ if pending.is_empty() {
+ // Nothing in-flight: run a full drain cycle. This may briefly
+ // block on writer ID init or metadata refresh — acceptable
here
+ // because there are no pending responses to starve.
+ tokio::select! {
+ result = self.prepare_sends() => {
+ match result {
+ Ok((futures, delay)) => {
+ if futures.is_empty() {
+ // Nothing to drain. Sleep for the
ready-check
+ // delay to avoid busy-spinning.
+ // TODO: add a Notify that append()
signals so we
Review Comment:
this busy spin matches Java logic, but in Rust it's easier to fix tbh, so
I'll handle this next as PR is already getting big. But should be just
`tokio::sync::Notify`
Seems that tokio::spawn + FuturesUnordered change is making our life easier
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -561,8 +776,96 @@ impl Sender {
)
}
- pub async fn close(&mut self) {
- self.running = false;
+ /// Event-loop sender: drain batches and fire RPCs into a
`FuturesUnordered`,
+ /// then process responses as they arrive. This interleaves drain cycles
with
+ /// response handling — when a fast leader responds, we immediately drain
and
+ /// send more batches for its buckets while slow leaders are still
in-flight.
+ ///
+ /// Better than Java's fire-and-forget + Netty callback threads: same I/O
+ /// overlap, but single-task cooperative multitasking — no cross-thread
+ /// synchronization needed for response handling.
+ pub async fn run_with_shutdown(&self, mut shutdown_rx: mpsc::Receiver<()>)
-> Result<()> {
+ let mut pending: FuturesUnordered<SendFuture<'_>> =
FuturesUnordered::new();
+
+ loop {
+ if pending.is_empty() {
+ // Nothing in-flight: run a full drain cycle. This may briefly
+ // block on writer ID init or metadata refresh — acceptable
here
+ // because there are no pending responses to starve.
+ tokio::select! {
+ result = self.prepare_sends() => {
+ match result {
+ Ok((futures, delay)) => {
+ if futures.is_empty() {
+ // Nothing to drain. Sleep for the
ready-check
+ // delay to avoid busy-spinning.
+ // TODO: add a Notify that append()
signals so we
Review Comment:
this busy spin matches Java logic, but in Rust it's easier to fix tbh, so
I'll handle this next as PR is already getting big. But should be just
`tokio::sync::Notify`
Seems that `tokio::spawn + FuturesUnordered` change is making our life easier
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]