Copilot commented on code in PR #132:
URL: https://github.com/apache/fluss-rust/pull/132#discussion_r2678550082


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -74,49 +74,11 @@ impl<'a> TableScan<'a> {
     /// Returns an error if `column_indices` is empty or if any column index 
is out of range.
     ///
     /// # Example
-    /// ```
-    /// # use fluss::client::FlussConnection;
-    /// # use fluss::config::Config;
-    /// # use fluss::error::Result;
-    /// # use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
-    /// # use fluss::row::InternalRow;
-    /// # use std::time::Duration;
-    ///
-    /// # pub async fn example() -> Result<()> {
-    ///     let mut config = Config::default();
-    ///     config.bootstrap_server = Some("127.0.0.1:9123".to_string());
-    ///     let conn = FlussConnection::new(config).await?;
-    ///
-    ///     let table_descriptor = TableDescriptor::builder()
-    ///         .schema(
-    ///             Schema::builder()
-    ///                 .column("col1", DataTypes::int())
-    ///                 .column("col2", DataTypes::string())
-    ///                 .column("col3", DataTypes::string())
-    ///                 .column("col3", DataTypes::string())
-    ///             .build()?,
-    ///         ).build()?;
-    ///     let table_path = TablePath::new("fluss".to_owned(), 
"rust_test_long".to_owned());
-    ///     let admin = conn.get_admin().await?;
-    ///     admin.create_table(&table_path, &table_descriptor, true)
-    ///         .await?;
-    ///     let table_info = admin.get_table(&table_path).await?;
-    ///     let table = conn.get_table(&table_path).await?;
-    ///
-    ///     // Project columns by indices
-    ///     let scanner = table.new_scan().project(&[0, 2, 
3])?.create_log_scanner()?;
-    ///     let scan_records = scanner.poll(Duration::from_secs(10)).await?;
-    ///     for record in scan_records {
-    ///         let row = record.row();
-    ///         println!(
-    ///             "{{{}, {}, {}}}@{}",
-    ///             row.get_int(0),
-    ///             row.get_string(2),
-    ///             row.get_string(3),
-    ///             record.offset()
-    ///         );
-    ///     }
-    ///     # Ok(())
+    /// ```no_run
+    /// # fn example() -> fluss::error::Result<()> {
+    /// # let table: fluss::client::FlussTable<'_> = todo!("requires a Fluss 
connection");
+    /// let scanner = table.new_scan().project(&[0, 2, 
3])?.create_log_scanner()?;

Review Comment:
   The simplified example now uses no_run which is appropriate since it 
requires actual Fluss connections. However, the example signature shows "fn 
example()" which is synchronous, but TableScan operations like 
create_log_scanner() may be async in actual usage. Consider verifying the 
example compiles correctly or adding async if needed.
   ```suggestion
       /// let scan = table.new_scan().project(&[0, 2, 3])?;
   ```



##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -175,7 +218,13 @@ impl ArrowLogWriteBatch {
     }
 
     pub fn build(&self) -> Result<Vec<u8>> {
-        self.arrow_builder.build()
+        let mut cached = self.built_records.lock();
+        if let Some(bytes) = cached.as_ref() {
+            return Ok(bytes.clone());
+        }
+        let bytes = self.arrow_builder.build()?;
+        *cached = Some(bytes.clone());
+        Ok(bytes)
     }

Review Comment:
   The built_records cache can grow indefinitely as it is never cleared. Once a 
batch is built and cached, the cached Vec<u8> is cloned on subsequent calls, 
but the cache is never invalidated even after the batch completes or is 
re-enqueued. This could lead to memory accumulation, especially if batches are 
retried multiple times or if many batches are processed.
   
   Consider clearing the cache when the batch is re-enqueued (in the 
re_enqueued method) or when the batch completes, or evaluate if caching is 
necessary given that build() should only be called once per successful send 
attempt.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -150,47 +112,14 @@ impl<'a> TableScan<'a> {
     /// Returns an error if `column_names` is empty or if any column name is 
not found in the table schema.
     ///
     /// # Example
-    /// ```
-    /// # use fluss::client::FlussConnection;
-    /// # use fluss::config::Config;
-    /// # use fluss::error::Result;
-    /// # use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
-    /// # use fluss::row::InternalRow;
-    /// # use std::time::Duration;
-    ///
-    /// # pub async fn example() -> Result<()> {
-    ///     let mut config = Config::default();
-    ///     config.bootstrap_server = Some("127.0.0.1:9123".to_string());
-    ///     let conn = FlussConnection::new(config).await?;
-    ///
-    ///     let table_descriptor = TableDescriptor::builder()
-    ///         .schema(
-    ///             Schema::builder()
-    ///                 .column("col1", DataTypes::int())
-    ///                 .column("col2", DataTypes::string())
-    ///                 .column("col3", DataTypes::string())
-    ///             .build()?,
-    ///         ).build()?;
-    ///     let table_path = TablePath::new("fluss".to_owned(), 
"rust_test_long".to_owned());
-    ///     let admin = conn.get_admin().await?;
-    ///     admin.create_table(&table_path, &table_descriptor, true)
-    ///         .await?;
-    ///     let table_info = admin.get_table(&table_path).await?;
-    ///     let table = conn.get_table(&table_path).await?;
-    ///
-    ///     // Project columns by column names
-    ///     let scanner = table.new_scan().project_by_name(&["col1", 
"col3"])?.create_log_scanner()?;
-    ///     let scan_records = scanner.poll(Duration::from_secs(10)).await?;
-    ///     for record in scan_records {
-    ///         let row = record.row();
-    ///         println!(
-    ///             "{{{}, {}}}@{}",
-    ///             row.get_int(0),
-    ///             row.get_string(1),
-    ///             record.offset()
-    ///         );
-    ///     }
-    ///     # Ok(())
+    /// ```no_run
+    /// # fn example() -> fluss::error::Result<()> {
+    /// # let table: fluss::client::FlussTable<'_> = todo!("requires a Fluss 
connection");
+    /// let scanner = table
+    ///     .new_scan()
+    ///     .project_by_name(&["col1", "col3"])?
+    ///     .create_log_scanner()?;
+    /// # Ok(())

Review Comment:
   Similar to the project() example, this simplified example uses a synchronous 
function signature but may require async context. Consider verifying the 
example compiles correctly or marking the function as async if needed.



##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
         &self,
         destination: i32,
         acks: i16,
-        batches: &Vec<Arc<ReadyWriteBatch>>,
+        batches: Vec<ReadyWriteBatch>,
     ) -> Result<()> {
         if batches.is_empty() {
             return Ok(());
         }
-        let mut records_by_bucket = HashMap::new();
-        let mut write_batch_by_table = HashMap::new();
+        let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> = 
HashMap::new();
+        let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> = 
HashMap::new();
 
         for batch in batches {
-            records_by_bucket.insert(batch.table_bucket.clone(), 
batch.clone());
+            let table_bucket = batch.table_bucket.clone();
             write_batch_by_table
-                .entry(batch.table_bucket.table_id())
-                .or_insert_with(Vec::new)
-                .push(batch);
+                .entry(table_bucket.table_id())
+                .or_default()
+                .push(table_bucket.clone());
+            records_by_bucket.insert(table_bucket, batch);
         }
 
         let cluster = self.metadata.get_cluster();
 
-        let destination_node =
-            cluster
-                .get_tablet_server(destination)
-                .ok_or(Error::LeaderNotAvailable {
-                    message: format!("destination node not found in metadata 
cache {destination}."),
-                })?;
-        let connection = self.metadata.get_connection(destination_node).await?;
+        let destination_node = match cluster.get_tablet_server(destination) {
+            Some(node) => node,
+            None => {
+                self.handle_batches_with_error(
+                    records_by_bucket.into_values().collect(),
+                    FlussError::LeaderNotAvailableException,
+                    format!("Destination node not found in metadata cache 
{destination}."),
+                )
+                .await?;
+                return Ok(());
+            }
+        };
+        let connection = match 
self.metadata.get_connection(destination_node).await {
+            Ok(connection) => connection,
+            Err(e) => {
+                self.handle_batches_with_error(
+                    records_by_bucket.into_values().collect(),
+                    FlussError::NetworkException,
+                    format!("Failed to connect destination node {destination}: 
{e}"),
+                )
+                .await?;
+                return Ok(());
+            }
+        };
+
+        for (table_id, table_buckets) in write_batch_by_table {
+            let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+                .iter()
+                .filter_map(|bucket| records_by_bucket.get(bucket))
+                .collect();
+            if request_batches.is_empty() {
+                continue;
+            }
+            let request = match ProduceLogRequest::new(
+                table_id,
+                acks,
+                self.max_request_timeout_ms,
+                request_batches.as_slice(),
+            ) {
+                Ok(request) => request,
+                Err(e) => {
+                    self.handle_batches_with_error(
+                        table_buckets
+                            .iter()
+                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
+                            .collect(),
+                        FlussError::UnknownServerError,
+                        format!("Failed to build produce request: {e}"),
+                    )
+                    .await?;
+                    continue;
+                }
+            };
 
-        for (table_id, write_batches) in write_batch_by_table {
-            let request =
-                ProduceLogRequest::new(table_id, acks, 
self.max_request_timeout_ms, write_batches)?;
-            let response = connection.request(request).await?;
-            self.handle_produce_response(table_id, &records_by_bucket, 
response)?
+            let response = match connection.request(request).await {
+                Ok(response) => response,
+                Err(e) => {
+                    self.handle_batches_with_error(
+                        table_buckets
+                            .iter()
+                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
+                            .collect(),
+                        FlussError::NetworkException,
+                        format!("Failed to send produce request: {e}"),
+                    )
+                    .await?;
+                    continue;
+                }
+            };
+
+            self.handle_produce_response(
+                table_id,
+                &table_buckets,
+                &mut records_by_bucket,
+                response,
+            )
+            .await?;
         }
 
         Ok(())
     }
 
-    fn handle_produce_response(
+    async fn handle_produce_response(
         &self,
         table_id: i64,
-        records_by_bucket: &HashMap<TableBucket, Arc<ReadyWriteBatch>>,
+        request_buckets: &[TableBucket],
+        records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
         response: ProduceLogResponse,
     ) -> Result<()> {
+        let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+        let mut pending_buckets: HashSet<TableBucket> = 
request_buckets.iter().cloned().collect();
         for produce_log_response_for_bucket in response.buckets_resp.iter() {
             let tb = TableBucket::new(table_id, 
produce_log_response_for_bucket.bucket_id);
 
-            let ready_batch = records_by_bucket.get(&tb).unwrap();
+            let Some(ready_batch) = records_by_bucket.remove(&tb) else {
+                warn!("Missing ready batch for table bucket {tb}");
+                continue;
+            };

Review Comment:
   The logic for handling buckets in the produce response may have an issue. 
When a bucket appears in the response but not in the request_buckets (line 
247), a warning is logged and the code continues, removing the batch from 
records_by_bucket. However, this bucket is never in pending_buckets (which is 
initialized from request_buckets on line 245), so it won't be handled in the 
pending_buckets loop (lines 276-290) either.
   
   This means if the server returns a response for a bucket that wasn't in the 
request, that batch gets silently removed from records_by_bucket without being 
completed or failed. Consider whether this scenario should be treated as an 
error condition, or if the batch should be explicitly handled.



##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -53,15 +57,36 @@ impl InnerWriteBatch {
     }
 
     fn complete(&self, write_result: BatchWriteResult) -> bool {
-        if !self.completed {
-            self.results.broadcast(write_result);
+        if self
+            .completed
+            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
+            .is_err()
+        {
+            return false;
         }
+        self.results.broadcast(write_result);
         true
     }
 
     fn drained(&mut self, now_ms: i64) {
         self.drained_ms = max(self.drained_ms, now_ms);

Review Comment:
   The drained_ms field in InnerWriteBatch is not protected by any 
synchronization mechanism (not an atomic type and accessed via &mut self). 
While the drained method currently takes &mut self which prevents concurrent 
modifications, this field could become a source of data races if the API is 
later changed or if WriteBatch is shared across threads in unexpected ways. 
Consider making drained_ms an AtomicI64 for consistency with the other shared 
state fields (completed and attempts), or add documentation that WriteBatch 
must not be shared across threads while mutable operations like drained() are 
being called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to