luoyuxia commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2934468836


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -54,70 +72,160 @@ impl Sender {
         max_request_timeout_ms: i32,
         ack: i16,
         retries: i32,
+        idempotence_manager: Arc<IdempotenceManager>,
     ) -> Self {
         Self {
-            running: true,
+            running: AtomicBool::new(true),
             metadata,
             accumulator,
             in_flight_batches: Default::default(),
             max_request_size,
             ack,
             max_request_timeout_ms,
             retries,
+            idempotence_manager,
         }
     }
 
+    #[allow(dead_code)]
     pub async fn run(&self) -> Result<()> {

Review Comment:
   nit:
   can this method be removed?



##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -467,6 +653,39 @@ impl Sender {
                 physical_table_path.as_ref(),
                 ready_write_batch.table_bucket.bucket_id()
             );
+
+            // If idempotence is enabled, only retry if the current writer ID 
still matches
+            // the batch's writer ID. If the writer ID was reset (e.g., by 
another bucket's
+            // error), fail the batch instead of retrying with stale state.
+            if self.idempotence_manager.is_enabled() {
+                let batch_writer_id = 
ready_write_batch.write_batch.writer_id();
+                if batch_writer_id != NO_WRITER_ID

Review Comment:
   Here is java's comment
   ```
   // If idempotence is enabled only retry the request if the current writer id 
is
                   // the same as the writer id of the batch.
   ```
   But seems the java code doesn't check  the current writer id  the same as 
the writer id of the batch, right? 
   I think maybe it's a code miss in java side which may worth create a issue 
in main repo



##########
crates/fluss/src/client/write/idempotence.rs:
##########
@@ -0,0 +1,712 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TableBucket;
+use crate::record::{NO_BATCH_SEQUENCE, NO_WRITER_ID};
+use crate::rpc::FlussError;
+use log::debug;
+use parking_lot::Mutex;
+use std::collections::{HashMap, HashSet};
+use std::sync::atomic::{AtomicI64, Ordering};
+
+struct InFlightBatch {
+    batch_sequence: i32,
+    batch_id: i64,
+}
+
+struct BucketEntry {
+    writer_id: i64,
+    next_sequence: i32,
+    last_acked_sequence: i32,
+    in_flight: Vec<InFlightBatch>,
+    reset_batch_ids: HashSet<i64>,
+}
+
+impl BucketEntry {
+    fn new() -> Self {
+        Self {
+            writer_id: NO_WRITER_ID,
+            next_sequence: 0,
+            last_acked_sequence: -1,
+            in_flight: Vec::new(),
+            reset_batch_ids: HashSet::new(),
+        }
+    }
+}
+
+pub struct IdempotenceManager {
+    writer_id: AtomicI64,
+    bucket_entries: Mutex<HashMap<TableBucket, BucketEntry>>,
+    enabled: bool,
+    max_in_flight_requests_per_bucket: usize,
+}
+
+impl IdempotenceManager {
+    pub fn new(enabled: bool, max_in_flight_requests_per_bucket: usize) -> 
Self {
+        Self {
+            writer_id: AtomicI64::new(NO_WRITER_ID),
+            bucket_entries: Mutex::new(HashMap::new()),
+            enabled,
+            max_in_flight_requests_per_bucket,
+        }
+    }
+
+    pub fn is_enabled(&self) -> bool {
+        self.enabled
+    }
+
+    pub fn writer_id(&self) -> i64 {
+        self.writer_id.load(Ordering::Acquire)
+    }
+
+    pub fn has_writer_id(&self) -> bool {
+        self.writer_id() != NO_WRITER_ID
+    }
+
+    pub fn is_writer_id_valid(&self) -> bool {
+        self.has_writer_id()
+    }
+
+    pub fn in_flight_count(&self, bucket: &TableBucket) -> usize {
+        let entries = self.bucket_entries.lock();
+        entries.get(bucket).map_or(0, |e| e.in_flight.len())
+    }
+
+    pub fn can_send_more_requests(&self, bucket: &TableBucket) -> bool {
+        self.in_flight_count(bucket) < self.max_in_flight_requests_per_bucket
+    }
+
+    pub fn set_writer_id(&self, id: i64) {
+        self.writer_id.store(id, Ordering::Release);
+    }
+
+    pub fn reset_writer_id(&self) {
+        self.writer_id.store(NO_WRITER_ID, Ordering::Release);
+        self.bucket_entries.lock().clear();
+    }
+
+    pub fn next_sequence_and_increment(&self, bucket: &TableBucket) -> i32 {
+        let mut entries = self.bucket_entries.lock();
+        let entry = entries
+            .entry(bucket.clone())
+            .or_insert_with(BucketEntry::new);
+        let seq = entry.next_sequence;
+        entry.next_sequence += 1;
+        seq
+    }
+
+    pub fn add_in_flight_batch(&self, bucket: &TableBucket, batch_sequence: 
i32, batch_id: i64) {
+        debug_assert!(
+            batch_sequence != NO_BATCH_SEQUENCE,
+            "Can't track batch for bucket {bucket} when batch sequence is not 
set"
+        );
+        let mut entries = self.bucket_entries.lock();
+        let entry = entries
+            .entry(bucket.clone())
+            .or_insert_with(BucketEntry::new);
+        // Insert sorted by batch_sequence
+        let pos = entry
+            .in_flight
+            .binary_search_by_key(&batch_sequence, |b| b.batch_sequence)
+            .unwrap_or_else(|e| e);
+        entry.in_flight.insert(
+            pos,
+            InFlightBatch {
+                batch_sequence,
+                batch_id,
+            },
+        );
+    }
+
+    pub fn handle_completed_batch(
+        &self,
+        bucket: &TableBucket,
+        batch_id: i64,
+        batch_writer_id: i64,
+    ) {
+        if batch_writer_id != self.writer_id() {
+            debug!(
+                "Ignoring completed batch for bucket {bucket} with stale 
writer_id {batch_writer_id} (current: {})",
+                self.writer_id()
+            );
+            return;
+        }
+        let mut entries = self.bucket_entries.lock();
+        if let Some(entry) = entries.get_mut(bucket) {
+            // Find by batch_id to handle the case where the in-flight entry's 
sequence
+            // was adjusted by a prior handle_failed_batch call.
+            if let Some(pos) = entry.in_flight.iter().position(|b| b.batch_id 
== batch_id) {
+                let adjusted_seq = entry.in_flight[pos].batch_sequence;
+                entry.in_flight.remove(pos);
+                entry.reset_batch_ids.remove(&batch_id);
+                if adjusted_seq > entry.last_acked_sequence {
+                    entry.last_acked_sequence = adjusted_seq;
+                }
+            }
+        }
+    }
+
+    /// Handle a failed batch. Matches Java's 
`IdempotenceManager.handleFailedBatch`.
+    ///
+    /// For `OutOfOrderSequenceException` or `UnknownWriterIdException`, 
resets ALL
+    /// writer state (matching Java: "we cannot make any guarantees about the 
previously
+    /// committed message").
+    ///
+    /// For other errors, removes the specific in-flight entry by `batch_id` 
and
+    /// optionally adjusts downstream sequences. `adjust_sequences` should 
only be true
+    /// when the batch has NOT exhausted its retries.
+    pub fn handle_failed_batch(
+        &self,
+        bucket: &TableBucket,
+        batch_id: i64,
+        batch_writer_id: i64,
+        error: Option<FlussError>,
+        adjust_sequences: bool,
+    ) {
+        if batch_writer_id != self.writer_id() {
+            debug!(
+                "Ignoring failed batch for bucket {bucket} with stale 
writer_id {batch_writer_id} (current: {})",
+                self.writer_id()
+            );
+            return;
+        }
+
+        let mut entries = self.bucket_entries.lock();
+
+        // Matches Java: OutOfOrderSequence or UnknownWriterId → reset all 
writer state.
+        // Java's synchronized handleFailedBatch can call synchronized 
resetWriterId
+        // because Java monitors are reentrant. We inline the reset here to 
stay in
+        // the same lock scope.
+        if let Some(e) = error {
+            if e == FlussError::OutOfOrderSequenceException
+                || e == FlussError::UnknownWriterIdException
+            {
+                debug!(
+                    "Resetting writer ID due to {e:?} for bucket {bucket} \
+                     (writer_id={batch_writer_id}, batch_id={batch_id})"
+                );
+                self.writer_id.store(NO_WRITER_ID, Ordering::Release);
+                entries.clear();
+                return;
+            }
+        }
+        if let Some(entry) = entries.get_mut(bucket) {
+            // Find and remove by batch_id, capturing the (possibly adjusted) 
sequence
+            let failed_sequence = entry
+                .in_flight
+                .iter()
+                .position(|b| b.batch_id == batch_id)
+                .map(|pos| {
+                    let seq = entry.in_flight[pos].batch_sequence;
+                    entry.in_flight.remove(pos);
+                    seq
+                });
+            entry.reset_batch_ids.remove(&batch_id);
+            if adjust_sequences {
+                if let Some(failed_seq) = failed_sequence {
+                    // Decrement sequences of in-flight batches that have 
higher sequences
+                    for b in &mut entry.in_flight {
+                        if b.batch_sequence > failed_seq {
+                            b.batch_sequence -= 1;
+                            debug_assert!(
+                                b.batch_sequence >= 0,
+                                "Batch sequence for batch_id={} went negative: 
{}",
+                                b.batch_id,
+                                b.batch_sequence
+                            );
+                            entry.reset_batch_ids.insert(b.batch_id);
+                        }
+                    }
+                    // Roll back next_sequence
+                    if entry.next_sequence > failed_seq {
+                        entry.next_sequence -= 1;
+                        debug_assert!(
+                            entry.next_sequence >= 0,
+                            "Next sequence went negative: {}",
+                            entry.next_sequence
+                        );
+                    }
+                }
+            }
+        }
+    }
+
+    #[cfg(test)]
+    pub fn remove_in_flight_batch(&self, bucket: &TableBucket, batch_id: i64) {
+        let mut entries = self.bucket_entries.lock();
+        if let Some(entry) = entries.get_mut(bucket) {
+            entry.in_flight.retain(|b| b.batch_id != batch_id);
+        }
+    }
+
+    /// If the bucket's stored writer_id doesn't match the current writer_id
+    /// and there are no in-flight batches, reset the bucket entry to start
+    /// sequences from 0. Matches Java's 
`IdempotenceManager.maybeUpdateWriterId`.
+    pub fn maybe_update_writer_id(&self, bucket: &TableBucket) {
+        let current_writer_id = self.writer_id();
+        let mut entries = self.bucket_entries.lock();
+        let entry = entries
+            .entry(bucket.clone())
+            .or_insert_with(BucketEntry::new);
+        if entry.writer_id != current_writer_id && entry.in_flight.is_empty() {
+            entry.writer_id = current_writer_id;
+            entry.next_sequence = 0;
+            entry.last_acked_sequence = -1;
+            debug!(
+                "Writer id of bucket {bucket} set to {current_writer_id}. 
Reinitialize batch sequence at beginning."
+            );
+        }
+    }
+
+    /// Returns true if the given batch (identified by `batch_id`) is the first
+    /// in-flight batch for its bucket. Uses batch_id rather than 
batch_sequence
+    /// because sequence adjustment (`handle_failed_batch` with 
`adjust_sequences`)
+    /// modifies InFlightBatch sequences without updating the actual 
WriteBatch,
+    /// so batch_sequence on the WriteBatch may be stale.
+    pub fn is_first_in_flight_batch(&self, bucket: &TableBucket, batch_id: 
i64) -> bool {
+        let entries = self.bucket_entries.lock();
+        entries
+            .get(bucket)
+            .and_then(|e| e.in_flight.first())
+            .is_some_and(|b| b.batch_id == batch_id)
+    }
+
+    /// Returns the current (possibly adjusted) in-flight sequence for a batch
+    /// and clears its reset flag in a single lock scope. Used by `re_enqueue`
+    /// to sync the WriteBatch's sequence with the adjusted InFlightBatch
+    /// sequence. Clearing the reset flag matches Java where `close()` resets
+    /// `reopened = false` — the hint is consumed after one re-enqueue cycle.
+    pub fn take_adjusted_sequence(&self, bucket: &TableBucket, batch_id: i64) 
-> Option<i32> {
+        let mut entries = self.bucket_entries.lock();
+        let entry = entries.get_mut(bucket)?;
+        entry.reset_batch_ids.remove(&batch_id);

Review Comment:
   I think take_adjusted_sequence() clears the “sequence was reset” marker too 
early.
       handle_failed_batch(..., adjust_sequences = true) records rewritten 
in-flight batches in reset_batch_ids, and can_retry_for_error() later relies on 
that marker for both
       UnknownWriterIdException and the OutOfOrderSequenceException special 
case.
   
   However, RecordAccumulator::re_enqueue() calls take_adjusted_sequence(), and 
that method immediately removes batch_id from reset_batch_ids before the batch 
is retried. From that point on, a rewritten batch is indistinguishable from a 
normal batch in can_retry_for_error().
   
   So the following flow changes semantics vs Java: an earlier batch fails -> a 
later batch gets its sequence adjusted -> that later batch is re-enqueued -> it 
then hits
   UnknownWriterIdException (or OutOfOrderSequenceException after becoming 
last_acked + 1). Java still treats it as retryable because the batch had been 
reopened/reset; Rust drops
   that hint during re-enqueue, so the later retry decision can no longer see 
it.
       
   It seems the reset marker should survive re-enqueue and only be cleared on 
completion / final removal, not when merely reading the adjusted sequence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to