Copilot commented on code in PR #216:
URL: https://github.com/apache/fluss-rust/pull/216#discussion_r2739865690


##########
crates/fluss/src/client/lookup/lookup_query.rs:
##########
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Lookup query representation for batching lookup operations.
+
+use crate::metadata::{TableBucket, TablePath};
+use std::sync::atomic::{AtomicI32, Ordering};
+use tokio::sync::oneshot;
+
+/// Represents a single lookup query that will be batched and sent to the 
server.
+pub struct LookupQuery {
+    /// The table path for this lookup
+    table_path: TablePath,
+    /// The table bucket for this lookup
+    table_bucket: TableBucket,
+    /// The encoded primary key bytes
+    key: Vec<u8>,
+    /// Channel to send the result back to the caller
+    result_tx: Option<oneshot::Sender<Result<Option<Vec<u8>>, 
crate::error::Error>>>,
+    /// Number of retry attempts
+    retries: AtomicI32,
+}
+
+impl LookupQuery {
+    /// Creates a new lookup query.
+    pub fn new(
+        table_path: TablePath,
+        table_bucket: TableBucket,
+        key: Vec<u8>,
+        result_tx: oneshot::Sender<Result<Option<Vec<u8>>, 
crate::error::Error>>,
+    ) -> Self {
+        Self {
+            table_path,
+            table_bucket,
+            key,
+            result_tx: Some(result_tx),
+            retries: AtomicI32::new(0),
+        }
+    }
+
+    /// Returns the table path.
+    #[allow(dead_code)]
+    pub fn table_path(&self) -> &TablePath {
+        &self.table_path
+    }
+
+    /// Returns the table bucket.
+    pub fn table_bucket(&self) -> &TableBucket {
+        &self.table_bucket
+    }
+
+    /// Returns the encoded key bytes.
+    #[allow(dead_code)]
+    pub fn key(&self) -> &[u8] {
+        &self.key
+    }
+
+    /// Takes ownership of the key bytes.
+    pub fn take_key(&mut self) -> Vec<u8> {
+        std::mem::take(&mut self.key)

Review Comment:
   The `take_key()` method moves the key out and replaces it with an empty Vec. 
When a lookup fails and is retried via `re_enqueue_lookup()`, the LookupQuery 
will have an empty key. This means retry attempts will send empty keys to the 
server instead of the actual lookup keys, causing all retries to fail.
   
   The key should be preserved across retries. Consider cloning the key when 
adding to the batch instead of taking ownership, or storing a separate copy of 
the key that persists across retries.
   ```suggestion
           self.key.clone()
   ```



##########
crates/fluss/tests/integration/kv_table.rs:
##########
@@ -35,7 +35,7 @@ mod kv_table_test {
     use super::SHARED_FLUSS_CLUSTER;
     use crate::integration::fluss_cluster::FlussTestingCluster;
     use crate::integration::utils::{create_table, get_cluster, start_cluster, 
stop_cluster};
-    use fluss::client::UpsertWriter;
+    use fluss::client::{TableWriter, UpsertWriter};

Review Comment:
   The `TableWriter` import is unused in this test file. None of the test 
functions reference this trait. Consider removing it to keep imports clean.
   ```suggestion
       use fluss::client::UpsertWriter;
   ```



##########
crates/fluss/src/client/lookup/lookup_queue.rs:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Lookup queue for buffering pending lookup operations.
+//!
+//! This queue buffers lookup operations and provides batched draining
+//! to improve throughput by reducing network round trips.
+
+use super::LookupQuery;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::mpsc;
+use tokio::time::timeout;
+
+/// A queue that buffers pending lookup operations and provides batched 
draining.
+///
+/// The queue supports two types of entries:
+/// - New lookups from client calls
+/// - Re-enqueued lookups from retry logic
+///
+/// Re-enqueued lookups are prioritized over new lookups to ensure fair 
processing.
+pub struct LookupQueue {
+    /// Whether the queue is closed
+    closed: AtomicBool,
+    /// Channel for receiving lookup requests
+    lookup_rx: mpsc::Receiver<LookupQuery>,
+    /// Channel for receiving re-enqueued lookups
+    re_enqueue_rx: mpsc::UnboundedReceiver<LookupQuery>,
+    /// Maximum batch size for draining
+    max_batch_size: usize,
+    /// Timeout for batch collection
+    batch_timeout: Duration,
+}
+
+impl LookupQueue {
+    /// Creates a new lookup queue with the specified configuration.
+    pub fn new(
+        queue_size: usize,
+        max_batch_size: usize,
+        batch_timeout_ms: u64,
+    ) -> (
+        Self,
+        mpsc::Sender<LookupQuery>,
+        mpsc::UnboundedSender<LookupQuery>,
+    ) {
+        let (lookup_tx, lookup_rx) = mpsc::channel(queue_size);
+        let (re_enqueue_tx, re_enqueue_rx) = mpsc::unbounded_channel();
+
+        let queue = Self {
+            closed: AtomicBool::new(false),
+            lookup_rx,
+            re_enqueue_rx,
+            max_batch_size,
+            batch_timeout: Duration::from_millis(batch_timeout_ms),
+        };
+
+        (queue, lookup_tx, re_enqueue_tx)
+    }
+
+    /// Drains a batch of lookup queries from the queue.
+    pub async fn drain(&mut self) -> Vec<LookupQuery> {
+        let mut lookups = Vec::with_capacity(self.max_batch_size);
+        let deadline = tokio::time::Instant::now() + self.batch_timeout;
+
+        // First, drain re-enqueued lookups (prioritized)
+        while lookups.len() < self.max_batch_size {
+            match self.re_enqueue_rx.try_recv() {
+                Ok(lookup) => lookups.push(lookup),
+                Err(_) => break,
+            }
+        }
+
+        // Then drain from main queue
+        while lookups.len() < self.max_batch_size {
+            let remaining = 
deadline.saturating_duration_since(tokio::time::Instant::now());
+            if remaining.is_zero() {
+                break;
+            }
+
+            match timeout(remaining, self.lookup_rx.recv()).await {
+                Ok(Some(lookup)) => {
+                    lookups.push(lookup);
+                    // Try to drain more without waiting
+                    while lookups.len() < self.max_batch_size {
+                        match self.lookup_rx.try_recv() {
+                            Ok(lookup) => lookups.push(lookup),
+                            Err(_) => break,
+                        }
+                    }
+                }
+                Ok(None) => break, // Channel closed
+                Err(_) => break,   // Timeout
+            }
+        }
+
+        lookups
+    }
+
+    /// Drains all remaining lookups from the queue.
+    pub fn drain_all(&mut self) -> Vec<LookupQuery> {
+        let mut lookups = Vec::new();
+
+        // Drain re-enqueued lookups
+        while let Ok(lookup) = self.re_enqueue_rx.try_recv() {
+            lookups.push(lookup);
+        }
+
+        // Drain main queue
+        while let Ok(lookup) = self.lookup_rx.try_recv() {
+            lookups.push(lookup);
+        }
+
+        lookups
+    }
+
+    /// Returns true if there are undrained lookups in the queue.
+    pub fn has_undrained(&self) -> bool {
+        !self.lookup_rx.is_empty() || !self.re_enqueue_rx.is_empty()
+    }
+
+    /// Closes the queue, preventing new lookups from being added.
+    pub fn close(&self) {
+        self.closed.store(true, Ordering::Release);
+    }

Review Comment:
   The `closed` field in `LookupQueue` is set by the `close()` method but never 
checked. The `drain()` method will continue processing lookups even after the 
queue is closed. This could lead to new lookups being queued and processed 
during shutdown, when they should be rejected.
   
   Check the `closed` flag in the `drain()` method or in 
`LookupClient::lookup()` to prevent new lookups from being queued after 
shutdown has been initiated.



##########
crates/fluss/src/client/connection.rs:
##########
@@ -90,6 +93,18 @@ impl FlussConnection {
         Ok(new_client)
     }
 
+    /// Gets or creates a lookup client for batched lookup operations.
+    pub fn get_or_create_lookup_client(&self) -> Result<Arc<LookupClient>> {
+        if let Some(client) = self.lookup_client.read().as_ref() {
+            return Ok(client.clone());
+        }
+
+        // If not exists, create new one
+        let client = Arc::new(LookupClient::new(&self.args, 
self.metadata.clone()));
+        *self.lookup_client.write() = Some(client.clone());

Review Comment:
   The `get_or_create_lookup_client` method is missing the double-check locking 
pattern that `get_or_create_writer_client` correctly implements. After 
acquiring the write lock, there's no check to see if another thread created the 
client while waiting. This could lead to multiple LookupClient instances being 
created, each spawning their own background sender task, resulting in resource 
leaks and unpredictable behavior.
   
   Add a double-check after acquiring the write lock, similar to the pattern 
used in `get_or_create_writer_client` at lines 80-86.
   ```suggestion
           // 1. Fast path: Attempt to acquire a read lock to check if the 
client already exists.
           if let Some(client) = self.lookup_client.read().as_ref() {
               return Ok(client.clone());
           }
   
           // 2. Slow path: Acquire the write lock.
           let mut lookup_guard = self.lookup_client.write();
   
           // 3. Double-check: Another thread might have initialized the client
           // while this thread was waiting for the write lock.
           if let Some(client) = lookup_guard.as_ref() {
               return Ok(client.clone());
           }
   
           // 4. Initialize the client since we are certain it doesn't exist 
yet.
           let client = Arc::new(LookupClient::new(&self.args, 
self.metadata.clone()));
   
           // 5. Store and return the newly created client.
           *lookup_guard = Some(client.clone());
   ```



##########
crates/fluss/src/client/lookup/lookup_sender.rs:
##########
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Lookup sender that processes batched lookup requests.
+//!
+//! The sender runs as a background task, draining lookups from the queue,
+//! grouping them by destination server, and sending batched requests.
+
+use super::{LookupQuery, LookupQueue};
+use crate::client::metadata::Metadata;
+use crate::error::{Error, FlussError, Result};
+use crate::metadata::TableBucket;
+use crate::proto::LookupResponse;
+use crate::rpc::message::LookupRequest;
+use log::{debug, error, warn};
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::{Semaphore, mpsc};
+
+/// Lookup sender that batches and sends lookup requests.
+pub struct LookupSender {
+    /// Metadata for leader lookup
+    metadata: Arc<Metadata>,
+    /// The lookup queue to drain from
+    queue: LookupQueue,
+    /// Channel to re-enqueue failed lookups
+    re_enqueue_tx: mpsc::UnboundedSender<LookupQuery>,
+    /// Semaphore to limit in-flight requests
+    inflight_semaphore: Arc<Semaphore>,
+    /// Maximum number of retries
+    max_retries: i32,
+    /// Whether the sender is running
+    running: AtomicBool,
+    /// Whether to force close (abandon pending lookups)
+    force_close: AtomicBool,
+}
+
+/// A batch of lookups going to the same table bucket.
+struct LookupBatch {
+    table_bucket: TableBucket,
+    lookups: Vec<LookupQuery>,
+    keys: Vec<Vec<u8>>,
+}
+
+impl LookupBatch {
+    fn new(table_bucket: TableBucket) -> Self {
+        Self {
+            table_bucket,
+            lookups: Vec::new(),
+            keys: Vec::new(),
+        }
+    }
+
+    fn add_lookup(&mut self, mut lookup: LookupQuery) {
+        self.keys.push(lookup.take_key());
+        self.lookups.push(lookup);
+    }
+
+    fn complete(&mut self, values: Vec<Option<Vec<u8>>>) {
+        if values.len() != self.lookups.len() {
+            let err_msg = format!(
+                "The number of return values ({}) does not match the number of 
lookups ({})",
+                values.len(),
+                self.lookups.len()
+            );
+            for lookup in &mut self.lookups {
+                lookup.complete(Err(Error::UnexpectedError {
+                    message: err_msg.clone(),
+                    source: None,
+                }));
+            }
+            return;
+        }
+
+        for (lookup, value) in self.lookups.iter_mut().zip(values.into_iter()) 
{
+            lookup.complete(Ok(value));
+        }
+    }
+
+    fn complete_exceptionally(&mut self, error_msg: &str) {
+        for lookup in &mut self.lookups {
+            lookup.complete(Err(Error::UnexpectedError {
+                message: error_msg.to_string(),
+                source: None,
+            }));
+        }
+    }
+}
+
+impl LookupSender {
+    /// Creates a new lookup sender.
+    pub fn new(
+        metadata: Arc<Metadata>,
+        queue: LookupQueue,
+        re_enqueue_tx: mpsc::UnboundedSender<LookupQuery>,
+        max_inflight_requests: usize,
+        max_retries: i32,
+    ) -> Self {
+        Self {
+            metadata,
+            queue,
+            re_enqueue_tx,
+            inflight_semaphore: 
Arc::new(Semaphore::new(max_inflight_requests)),
+            max_retries,
+            running: AtomicBool::new(true),
+            force_close: AtomicBool::new(false),
+        }
+    }
+
+    /// Runs the sender loop.
+    pub async fn run(&mut self) {
+        debug!("Starting Fluss lookup sender");
+
+        while self.running.load(Ordering::Acquire) {
+            if let Err(e) = self.run_once(false).await {
+                error!("Error in lookup sender: {}", e);
+            }
+        }
+
+        debug!("Beginning shutdown of lookup sender, sending remaining 
lookups");
+
+        // Process remaining lookups during shutdown
+        if !self.force_close.load(Ordering::Acquire) && 
self.queue.has_undrained() {
+            if let Err(e) = self.run_once(true).await {
+                error!("Error during lookup sender shutdown: {}", e);
+            }
+        }
+
+        debug!("Lookup sender shutdown complete");
+    }
+
+    /// Runs a single iteration of the sender loop.
+    async fn run_once(&mut self, drain_all: bool) -> Result<()> {
+        let lookups = if drain_all {
+            self.queue.drain_all()
+        } else {
+            self.queue.drain().await
+        };
+
+        self.send_lookups(lookups).await
+    }
+
+    /// Groups and sends lookups to appropriate servers.
+    async fn send_lookups(&self, lookups: Vec<LookupQuery>) -> Result<()> {
+        if lookups.is_empty() {
+            return Ok(());
+        }
+
+        // Group by leader
+        let lookup_batches = self.group_by_leader(lookups);
+
+        if lookup_batches.is_empty() && !self.queue.has_undrained() {
+            // No lookups to send and queue is empty, sleep to avoid busy loop
+            tokio::time::sleep(Duration::from_millis(100)).await;
+            return Ok(());
+        }
+
+        // Send batches to each destination
+        for (destination, batches) in lookup_batches {
+            self.send_lookup_request(destination, batches).await;
+        }
+
+        Ok(())
+    }
+
+    /// Groups lookups by leader server.
+    fn group_by_leader(
+        &self,
+        lookups: Vec<LookupQuery>,
+    ) -> HashMap<i32, HashMap<TableBucket, LookupBatch>> {
+        let cluster = self.metadata.get_cluster();
+        let mut batches_by_leader: HashMap<i32, HashMap<TableBucket, 
LookupBatch>> = HashMap::new();
+
+        for lookup in lookups {
+            let table_bucket = lookup.table_bucket().clone();
+
+            // Find leader for this bucket
+            let leader = match cluster.leader_for(&table_bucket) {
+                Some(leader) => leader.id(),
+                None => {
+                    warn!(
+                        "No leader found for table bucket {} during lookup",
+                        table_bucket
+                    );
+                    self.re_enqueue_lookup(lookup);
+                    continue;
+                }
+            };
+
+            batches_by_leader
+                .entry(leader)
+                .or_default()
+                .entry(table_bucket.clone())
+                .or_insert_with(|| LookupBatch::new(table_bucket))
+                .add_lookup(lookup);
+        }
+
+        batches_by_leader
+    }
+
+    /// Sends lookup requests to a specific destination server.
+    async fn send_lookup_request(
+        &self,
+        destination: i32,
+        batches_by_bucket: HashMap<TableBucket, LookupBatch>,
+    ) {
+        // Group by table_id for request batching
+        let mut batches_by_table: HashMap<i64, Vec<LookupBatch>> = 
HashMap::new();
+        for (table_bucket, batch) in batches_by_bucket {
+            batches_by_table
+                .entry(table_bucket.table_id())
+                .or_default()
+                .push(batch);
+        }
+
+        let cluster = self.metadata.get_cluster();
+        let tablet_server = match cluster.get_tablet_server(destination) {
+            Some(server) => server.clone(),
+            None => {
+                let err_msg = format!("Server {} is not found in metadata 
cache", destination);
+                for batches in batches_by_table.into_values() {
+                    for mut batch in batches {
+                        self.handle_lookup_error(&err_msg, true, &mut batch);
+                    }
+                }
+                return;
+            }
+        };
+
+        let connection = match 
self.metadata.get_connection(&tablet_server).await {
+            Ok(conn) => conn,
+            Err(e) => {
+                let err_msg = format!("Failed to get connection to server {}: 
{}", destination, e);
+                for batches in batches_by_table.into_values() {
+                    for mut batch in batches {
+                        self.handle_lookup_error(&err_msg, true, &mut batch);
+                    }
+                }
+                return;
+            }
+        };
+
+        // Send requests for each table
+        for (table_id, mut batches) in batches_by_table {
+            // Build the request with all buckets for this table
+            let mut all_keys_by_bucket: Vec<(i32, Option<i64>, Vec<Vec<u8>>)> 
= Vec::new();
+            for batch in &batches {
+                all_keys_by_bucket.push((
+                    batch.table_bucket.bucket_id(),
+                    batch.table_bucket.partition_id(),
+                    batch.keys.clone(),

Review Comment:
   The keys from each batch are cloned when building the lookup request. Since 
`batch.keys` is a `Vec<Vec<u8>>`, this performs a deep copy of all the key 
bytes for every bucket in the request. For large batches, this could be a 
significant performance overhead.
   
   Consider restructuring the code to avoid cloning the keys. For example, you 
could move the keys out of the batches instead of cloning them, or use a 
reference-based approach if possible.
   ```suggestion
               for batch in &mut batches {
                   all_keys_by_bucket.push((
                       batch.table_bucket.bucket_id(),
                       batch.table_bucket.partition_id(),
                       std::mem::take(&mut batch.keys),
   ```



##########
crates/fluss/tests/integration/kv_table.rs:
##########
@@ -685,4 +685,223 @@ mod kv_table_test {
             .await
             .expect("Failed to drop table");
     }
+
+    /// Integration test for concurrent batched lookups.
+    /// This test verifies that multiple concurrent lookup requests are 
properly batched
+    /// and processed efficiently.
+    #[tokio::test]
+    async fn batched_concurrent_lookups() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_batched_lookups".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("name", DataTypes::string())
+                    .column("value", DataTypes::bigint())
+                    .primary_key(vec!["id".to_string()])
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
+        let mut upsert_writer = table_upsert
+            .create_writer()
+            .expect("Failed to create writer");
+
+        // Insert 100 records
+        let num_records = 100i32;
+        for i in 0..num_records {
+            let mut row = GenericRow::new(3);
+            row.set_field(0, i);
+            row.set_field(1, format!("name_{}", i));
+            row.set_field(2, (i * 100) as i64);
+            upsert_writer.upsert(&row).await.expect("Failed to upsert");
+        }
+
+        // Create lookuper
+        let mut lookuper = table
+            .new_lookup()
+            .expect("Failed to create lookup")
+            .create_lookuper()
+            .expect("Failed to create lookuper");
+
+        // Perform 50 lookups sequentially (they will be batched internally)
+        let num_lookups = 50i32;
+        for i in 0..num_lookups {
+            let key = make_key(i);
+            let result = lookuper.lookup(&key).await.expect("Failed to 
lookup");
+            let row = result
+                .get_single_row()
+                .expect("Failed to get row")
+                .expect("Row should exist");
+
+            assert_eq!(row.get_int(0), i, "id mismatch");
+            assert_eq!(row.get_string(1), format!("name_{}", i), "name 
mismatch");
+            assert_eq!(row.get_long(2), (i * 100) as i64, "value mismatch");
+        }

Review Comment:
   The test comment claims lookups "will be batched internally", but the code 
performs sequential lookups with `.await` on each one. This means each lookup 
completes before the next one starts, preventing any batching from occurring. 
The test name "batched_concurrent_lookups" is also misleading since there's no 
concurrency.
   
   To actually test batching, spawn multiple lookup futures concurrently using 
`tokio::spawn` or `futures::join_all`, then await all results together. This 
would allow multiple lookups to be queued simultaneously and batched by the 
sender.



##########
crates/fluss/src/rpc/fluss_api_error.rs:
##########
@@ -283,6 +283,25 @@ impl FlussError {
         }
     }
 
+    /// Returns true if this error is retriable.
+    /// Based on Java client's RetriableException hierarchy.
+    pub fn is_retriable(&self) -> bool {
+        matches!(
+            self,
+            FlussError::NetworkException
+                | FlussError::NotLeaderOrFollower
+                | FlussError::UnknownTableOrBucketException
+                | FlussError::LeaderNotAvailableException
+                | FlussError::CorruptMessage
+                | FlussError::CorruptRecordException
+                | FlussError::RequestTimeOut
+                | FlussError::StorageException
+                | FlussError::NotEnoughReplicasAfterAppendException
+                | FlussError::NotEnoughReplicasException
+                | FlussError::SchemaNotExist
+        )

Review Comment:
   The `SchemaNotExist` error is marked as retriable for lookup operations. 
However, schema errors are typically persistent issues that won't resolve on 
retry - if the schema doesn't exist, retrying the same operation won't make it 
suddenly exist. This could lead to unnecessary retry loops that waste resources 
and delay error reporting to the caller.
   
   Verify whether the Java client treats `SchemaNotExist` as retriable for 
lookup operations. If not, remove it from the retriable errors list to avoid 
unnecessary retries.



##########
crates/fluss/tests/integration/kv_table.rs:
##########
@@ -685,4 +685,223 @@ mod kv_table_test {
             .await
             .expect("Failed to drop table");
     }
+
+    /// Integration test for concurrent batched lookups.
+    /// This test verifies that multiple concurrent lookup requests are 
properly batched
+    /// and processed efficiently.
+    #[tokio::test]
+    async fn batched_concurrent_lookups() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_batched_lookups".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("name", DataTypes::string())
+                    .column("value", DataTypes::bigint())
+                    .primary_key(vec!["id".to_string()])
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
+        let mut upsert_writer = table_upsert
+            .create_writer()
+            .expect("Failed to create writer");
+
+        // Insert 100 records
+        let num_records = 100i32;
+        for i in 0..num_records {
+            let mut row = GenericRow::new(3);
+            row.set_field(0, i);
+            row.set_field(1, format!("name_{}", i));
+            row.set_field(2, (i * 100) as i64);
+            upsert_writer.upsert(&row).await.expect("Failed to upsert");
+        }
+
+        // Create lookuper
+        let mut lookuper = table
+            .new_lookup()
+            .expect("Failed to create lookup")
+            .create_lookuper()
+            .expect("Failed to create lookuper");
+
+        // Perform 50 lookups sequentially (they will be batched internally)
+        let num_lookups = 50i32;
+        for i in 0..num_lookups {
+            let key = make_key(i);
+            let result = lookuper.lookup(&key).await.expect("Failed to 
lookup");
+            let row = result
+                .get_single_row()
+                .expect("Failed to get row")
+                .expect("Row should exist");
+
+            assert_eq!(row.get_int(0), i, "id mismatch");
+            assert_eq!(row.get_string(1), format!("name_{}", i), "name 
mismatch");
+            assert_eq!(row.get_long(2), (i * 100) as i64, "value mismatch");
+        }
+
+        admin
+            .drop_table(&table_path, false)
+            .await
+            .expect("Failed to drop table");
+    }
+
+    /// Integration test for batched lookups with mixed existing and 
non-existing keys.
+    #[tokio::test]
+    async fn batched_lookups_mixed_keys() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_batched_mixed_keys".to_string());
+
+        let table_descriptor = TableDescriptor::builder()
+            .schema(
+                Schema::builder()
+                    .column("id", DataTypes::int())
+                    .column("data", DataTypes::string())
+                    .primary_key(vec!["id".to_string()])
+                    .build()
+                    .expect("Failed to build schema"),
+            )
+            .build()
+            .expect("Failed to build table");
+
+        create_table(&admin, &table_path, &table_descriptor).await;
+
+        let table = connection
+            .get_table(&table_path)
+            .await
+            .expect("Failed to get table");
+
+        let table_upsert = table.new_upsert().expect("Failed to create 
upsert");
+        let mut upsert_writer = table_upsert
+            .create_writer()
+            .expect("Failed to create writer");
+
+        // Insert only even-numbered records (0, 2, 4, 6, ...)
+        for i in (0..20).step_by(2) {
+            let mut row = GenericRow::new(2);
+            row.set_field(0, i as i32);
+            row.set_field(1, format!("data_{}", i));
+            upsert_writer.upsert(&row).await.expect("Failed to upsert");
+        }
+
+        let mut lookuper = table
+            .new_lookup()
+            .expect("Failed to create lookup")
+            .create_lookuper()
+            .expect("Failed to create lookuper");
+
+        // Lookup all keys 0-19 (half exist, half don't)
+        for i in 0..20 {
+            let mut key = GenericRow::new(2);
+            key.set_field(0, i as i32);
+            let result = lookuper.lookup(&key).await.expect("Failed to 
lookup");
+            let row_opt = result.get_single_row().expect("Failed to get row");
+
+            if i % 2 == 0 {
+                // Even keys should exist
+                let row = row_opt.expect(&format!("Row {} should exist", i));

Review Comment:
   The `format!` macro is used inside `expect()` which will always allocate a 
String, even when the row exists and the panic never occurs. This is 
inefficient for the success case.
   
   Use a closure with `unwrap_or_else()` or `expect()` with a static string 
instead. For example: `row_opt.unwrap_or_else(|| panic!("Row {} should exist", 
i))`
   ```suggestion
                   let row = row_opt.unwrap_or_else(|| panic!("Row {} should 
exist", i));
   ```



##########
crates/fluss/src/client/lookup/lookup_sender.rs:
##########
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Lookup sender that processes batched lookup requests.
+//!
+//! The sender runs as a background task, draining lookups from the queue,
+//! grouping them by destination server, and sending batched requests.
+
+use super::{LookupQuery, LookupQueue};
+use crate::client::metadata::Metadata;
+use crate::error::{Error, FlussError, Result};
+use crate::metadata::TableBucket;
+use crate::proto::LookupResponse;
+use crate::rpc::message::LookupRequest;
+use log::{debug, error, warn};
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::sync::{Semaphore, mpsc};
+
+/// Lookup sender that batches and sends lookup requests.
+pub struct LookupSender {
+    /// Metadata for leader lookup
+    metadata: Arc<Metadata>,
+    /// The lookup queue to drain from
+    queue: LookupQueue,
+    /// Channel to re-enqueue failed lookups
+    re_enqueue_tx: mpsc::UnboundedSender<LookupQuery>,
+    /// Semaphore to limit in-flight requests
+    inflight_semaphore: Arc<Semaphore>,
+    /// Maximum number of retries
+    max_retries: i32,
+    /// Whether the sender is running
+    running: AtomicBool,
+    /// Whether to force close (abandon pending lookups)
+    force_close: AtomicBool,
+}
+
+/// A batch of lookups going to the same table bucket.
+struct LookupBatch {
+    table_bucket: TableBucket,
+    lookups: Vec<LookupQuery>,
+    keys: Vec<Vec<u8>>,
+}
+
+impl LookupBatch {
+    fn new(table_bucket: TableBucket) -> Self {
+        Self {
+            table_bucket,
+            lookups: Vec::new(),
+            keys: Vec::new(),
+        }
+    }
+
+    fn add_lookup(&mut self, mut lookup: LookupQuery) {
+        self.keys.push(lookup.take_key());
+        self.lookups.push(lookup);
+    }
+
+    fn complete(&mut self, values: Vec<Option<Vec<u8>>>) {
+        if values.len() != self.lookups.len() {
+            let err_msg = format!(
+                "The number of return values ({}) does not match the number of 
lookups ({})",
+                values.len(),
+                self.lookups.len()
+            );
+            for lookup in &mut self.lookups {
+                lookup.complete(Err(Error::UnexpectedError {
+                    message: err_msg.clone(),
+                    source: None,
+                }));
+            }
+            return;
+        }
+
+        for (lookup, value) in self.lookups.iter_mut().zip(values.into_iter()) 
{
+            lookup.complete(Ok(value));
+        }
+    }
+
+    fn complete_exceptionally(&mut self, error_msg: &str) {
+        for lookup in &mut self.lookups {
+            lookup.complete(Err(Error::UnexpectedError {
+                message: error_msg.to_string(),
+                source: None,
+            }));
+        }
+    }
+}
+
+impl LookupSender {
+    /// Creates a new lookup sender.
+    pub fn new(
+        metadata: Arc<Metadata>,
+        queue: LookupQueue,
+        re_enqueue_tx: mpsc::UnboundedSender<LookupQuery>,
+        max_inflight_requests: usize,
+        max_retries: i32,
+    ) -> Self {
+        Self {
+            metadata,
+            queue,
+            re_enqueue_tx,
+            inflight_semaphore: 
Arc::new(Semaphore::new(max_inflight_requests)),
+            max_retries,
+            running: AtomicBool::new(true),
+            force_close: AtomicBool::new(false),
+        }
+    }
+
+    /// Runs the sender loop.
+    pub async fn run(&mut self) {
+        debug!("Starting Fluss lookup sender");
+
+        while self.running.load(Ordering::Acquire) {
+            if let Err(e) = self.run_once(false).await {
+                error!("Error in lookup sender: {}", e);
+            }
+        }
+
+        debug!("Beginning shutdown of lookup sender, sending remaining 
lookups");
+
+        // Process remaining lookups during shutdown
+        if !self.force_close.load(Ordering::Acquire) && 
self.queue.has_undrained() {
+            if let Err(e) = self.run_once(true).await {
+                error!("Error during lookup sender shutdown: {}", e);
+            }
+        }
+
+        debug!("Lookup sender shutdown complete");
+    }
+
+    /// Runs a single iteration of the sender loop.
+    async fn run_once(&mut self, drain_all: bool) -> Result<()> {
+        let lookups = if drain_all {
+            self.queue.drain_all()
+        } else {
+            self.queue.drain().await
+        };
+
+        self.send_lookups(lookups).await
+    }
+
+    /// Groups and sends lookups to appropriate servers.
+    async fn send_lookups(&self, lookups: Vec<LookupQuery>) -> Result<()> {
+        if lookups.is_empty() {
+            return Ok(());
+        }
+
+        // Group by leader
+        let lookup_batches = self.group_by_leader(lookups);
+
+        if lookup_batches.is_empty() && !self.queue.has_undrained() {
+            // No lookups to send and queue is empty, sleep to avoid busy loop
+            tokio::time::sleep(Duration::from_millis(100)).await;
+            return Ok(());
+        }
+
+        // Send batches to each destination
+        for (destination, batches) in lookup_batches {
+            self.send_lookup_request(destination, batches).await;
+        }
+
+        Ok(())
+    }
+
+    /// Groups lookups by leader server.
+    fn group_by_leader(
+        &self,
+        lookups: Vec<LookupQuery>,
+    ) -> HashMap<i32, HashMap<TableBucket, LookupBatch>> {
+        let cluster = self.metadata.get_cluster();
+        let mut batches_by_leader: HashMap<i32, HashMap<TableBucket, 
LookupBatch>> = HashMap::new();
+
+        for lookup in lookups {
+            let table_bucket = lookup.table_bucket().clone();
+
+            // Find leader for this bucket
+            let leader = match cluster.leader_for(&table_bucket) {
+                Some(leader) => leader.id(),
+                None => {
+                    warn!(
+                        "No leader found for table bucket {} during lookup",
+                        table_bucket
+                    );
+                    self.re_enqueue_lookup(lookup);
+                    continue;
+                }
+            };
+
+            batches_by_leader
+                .entry(leader)
+                .or_default()
+                .entry(table_bucket.clone())
+                .or_insert_with(|| LookupBatch::new(table_bucket))
+                .add_lookup(lookup);
+        }
+
+        batches_by_leader
+    }
+
+    /// Sends lookup requests to a specific destination server.
+    async fn send_lookup_request(
+        &self,
+        destination: i32,
+        batches_by_bucket: HashMap<TableBucket, LookupBatch>,
+    ) {
+        // Group by table_id for request batching
+        let mut batches_by_table: HashMap<i64, Vec<LookupBatch>> = 
HashMap::new();
+        for (table_bucket, batch) in batches_by_bucket {
+            batches_by_table
+                .entry(table_bucket.table_id())
+                .or_default()
+                .push(batch);
+        }
+
+        let cluster = self.metadata.get_cluster();
+        let tablet_server = match cluster.get_tablet_server(destination) {
+            Some(server) => server.clone(),
+            None => {
+                let err_msg = format!("Server {} is not found in metadata 
cache", destination);
+                for batches in batches_by_table.into_values() {
+                    for mut batch in batches {
+                        self.handle_lookup_error(&err_msg, true, &mut batch);
+                    }
+                }
+                return;
+            }
+        };
+
+        let connection = match 
self.metadata.get_connection(&tablet_server).await {
+            Ok(conn) => conn,
+            Err(e) => {
+                let err_msg = format!("Failed to get connection to server {}: 
{}", destination, e);
+                for batches in batches_by_table.into_values() {
+                    for mut batch in batches {
+                        self.handle_lookup_error(&err_msg, true, &mut batch);
+                    }
+                }
+                return;
+            }
+        };
+
+        // Send requests for each table
+        for (table_id, mut batches) in batches_by_table {
+            // Build the request with all buckets for this table
+            let mut all_keys_by_bucket: Vec<(i32, Option<i64>, Vec<Vec<u8>>)> 
= Vec::new();
+            for batch in &batches {
+                all_keys_by_bucket.push((
+                    batch.table_bucket.bucket_id(),
+                    batch.table_bucket.partition_id(),
+                    batch.keys.clone(),
+                ));
+            }
+
+            // Create lookup request for all buckets in this table
+            let request = LookupRequest::new_batched(table_id, 
all_keys_by_bucket);
+
+            // Acquire semaphore permit
+            let _permit = match 
self.inflight_semaphore.clone().acquire_owned().await {
+                Ok(permit) => permit,
+                Err(_) => {
+                    error!("Semaphore closed during lookup");
+                    for batch in &mut batches {
+                        batch.complete_exceptionally("Lookup sender shutdown");
+                    }
+                    return;
+                }
+            };
+
+            // Send request and handle response
+            match connection.request(request).await {
+                Ok(response) => {
+                    self.handle_lookup_response(destination, response, &mut 
batches);
+                }
+                Err(e) => {
+                    let err_msg = format!("Lookup request failed: {}", e);
+                    let is_retriable = Self::is_retriable_error(&e);
+                    for batch in &mut batches {
+                        self.handle_lookup_error(&err_msg, is_retriable, 
batch);
+                    }
+                }
+            }
+        }
+    }
+
+    /// Checks if an error is retriable.
+    fn is_retriable_error(error: &Error) -> bool {
+        match error {
+            Error::LeaderNotAvailable { .. } => true,
+            Error::FlussAPIError { api_error } => {
+                let fluss_error = FlussError::for_code(api_error.code);
+                fluss_error.is_retriable()
+            }
+            _ => false,
+        }
+    }
+
+    /// Handles the lookup response.
+    fn handle_lookup_response(
+        &self,
+        destination: i32,
+        response: LookupResponse,
+        batches: &mut [LookupBatch],
+    ) {
+        // Create a map from bucket_id to batch index for quick lookup
+        let bucket_to_index: HashMap<i32, usize> = batches
+            .iter()
+            .enumerate()
+            .map(|(idx, batch)| (batch.table_bucket.bucket_id(), idx))
+            .collect();
+
+        for bucket_resp in response.buckets_resp {
+            let bucket_id = bucket_resp.bucket_id;
+            if let Some(&batch_idx) = bucket_to_index.get(&bucket_id) {
+                let batch = &mut batches[batch_idx];
+
+                // Check for errors
+                if let Some(error_code) = bucket_resp.error_code {
+                    let fluss_error = FlussError::for_code(error_code);
+                    if fluss_error != FlussError::None {
+                        let err_msg = format!(
+                            "Lookup error for bucket {}: code={}, message={}",
+                            bucket_id,
+                            error_code,
+                            bucket_resp.error_message.unwrap_or_default()
+                        );
+                        let is_retriable = fluss_error.is_retriable();
+                        self.handle_lookup_error(&err_msg, is_retriable, 
batch);
+                        continue;
+                    }
+                }
+
+                // Extract values
+                let values: Vec<Option<Vec<u8>>> = bucket_resp
+                    .values
+                    .into_iter()
+                    .map(|pb_value| pb_value.values)
+                    .collect();
+
+                batch.complete(values);
+            } else {
+                error!(
+                    "Received response for unknown bucket {} from server {}",
+                    bucket_id, destination
+                );
+            }
+        }
+    }

Review Comment:
   The `handle_lookup_response` method only processes buckets that are present 
in the response. If the server returns a successful response but omits some 
bucket responses (for example, if a bucket had no data or encountered an 
issue), those batches will remain unprocessed and their LookupQuery oneshot 
channels will never be completed, causing the waiting callers to hang 
indefinitely.
   
   After processing all bucket responses, check if any batches were not 
processed and handle them appropriately (either retry or complete with an 
error).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to