zhaohaidao commented on code in PR #132:
URL: https://github.com/apache/fluss-rust/pull/132#discussion_r2678769032


##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -175,7 +218,13 @@ impl ArrowLogWriteBatch {
     }
 
     pub fn build(&self) -> Result<Vec<u8>> {
-        self.arrow_builder.build()
+        let mut cached = self.built_records.lock();
+        if let Some(bytes) = cached.as_ref() {
+            return Ok(bytes.clone());
+        }
+        let bytes = self.arrow_builder.build()?;
+        *cached = Some(bytes.clone());

Review Comment:
   makes sense.



##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
         &self,
         destination: i32,
         acks: i16,
-        batches: &Vec<Arc<ReadyWriteBatch>>,
+        batches: Vec<ReadyWriteBatch>,
     ) -> Result<()> {
         if batches.is_empty() {
             return Ok(());
         }
-        let mut records_by_bucket = HashMap::new();
-        let mut write_batch_by_table = HashMap::new();
+        let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> = 
HashMap::new();
+        let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> = 
HashMap::new();
 
         for batch in batches {
-            records_by_bucket.insert(batch.table_bucket.clone(), 
batch.clone());
+            let table_bucket = batch.table_bucket.clone();
             write_batch_by_table
-                .entry(batch.table_bucket.table_id())
-                .or_insert_with(Vec::new)
-                .push(batch);
+                .entry(table_bucket.table_id())
+                .or_default()
+                .push(table_bucket.clone());
+            records_by_bucket.insert(table_bucket, batch);
         }
 
         let cluster = self.metadata.get_cluster();
 
-        let destination_node =
-            cluster
-                .get_tablet_server(destination)
-                .ok_or(Error::LeaderNotAvailable {
-                    message: format!("destination node not found in metadata 
cache {destination}."),
-                })?;
-        let connection = self.metadata.get_connection(destination_node).await?;
+        let destination_node = match cluster.get_tablet_server(destination) {
+            Some(node) => node,
+            None => {
+                self.handle_batches_with_error(
+                    records_by_bucket.into_values().collect(),
+                    FlussError::LeaderNotAvailableException,
+                    format!("Destination node not found in metadata cache 
{destination}."),
+                )
+                .await?;
+                return Ok(());
+            }
+        };
+        let connection = match 
self.metadata.get_connection(destination_node).await {
+            Ok(connection) => connection,
+            Err(e) => {
+                self.handle_batches_with_error(
+                    records_by_bucket.into_values().collect(),
+                    FlussError::NetworkException,
+                    format!("Failed to connect destination node {destination}: 
{e}"),
+                )
+                .await?;
+                return Ok(());
+            }
+        };
+
+        for (table_id, table_buckets) in write_batch_by_table {
+            let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+                .iter()
+                .filter_map(|bucket| records_by_bucket.get(bucket))
+                .collect();
+            if request_batches.is_empty() {
+                continue;
+            }
+            let request = match ProduceLogRequest::new(
+                table_id,
+                acks,
+                self.max_request_timeout_ms,
+                request_batches.as_slice(),
+            ) {
+                Ok(request) => request,
+                Err(e) => {
+                    self.handle_batches_with_error(
+                        table_buckets
+                            .iter()
+                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
+                            .collect(),
+                        FlussError::UnknownServerError,

Review Comment:
   thanks for explanation. fixed.



##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -175,7 +218,13 @@ impl ArrowLogWriteBatch {
     }
 
     pub fn build(&self) -> Result<Vec<u8>> {
-        self.arrow_builder.build()
+        let mut cached = self.built_records.lock();
+        if let Some(bytes) = cached.as_ref() {
+            return Ok(bytes.clone());
+        }
+        let bytes = self.arrow_builder.build()?;
+        *cached = Some(bytes.clone());

Review Comment:
   makes sense.



##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -342,6 +342,29 @@ impl RecordAccumulator {
         self.incomplete_batches.write().remove(&batch_id);
     }
 
+    pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
+        ready_write_batch.write_batch.re_enqueued();
+        let table_path = ready_write_batch.write_batch.table_path().clone();
+        let bucket_id = ready_write_batch.table_bucket.bucket_id();
+        let table_id = 
u64::try_from(ready_write_batch.table_bucket.table_id()).unwrap_or(0);
+        let mut binding =
+            self.write_batches
+                .entry(table_path)
+                .or_insert_with(|| BucketAndWriteBatches {
+                    table_id,
+                    is_partitioned_table: false,
+                    partition_id: None,
+                    batches: Default::default(),
+                });
+        let bucket_and_batches = binding.value_mut();
+        let dq = bucket_and_batches
+            .batches
+            .entry(bucket_id)
+            .or_insert_with(|| Mutex::new(VecDeque::new()));
+        let mut dq_guard = dq.lock().await;

Review Comment:
   good catch. Thanks. I will fix it in another pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to