zhaohaidao commented on code in PR #132:
URL: https://github.com/apache/fluss-rust/pull/132#discussion_r2678765759


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -132,78 +133,502 @@ impl Sender {
         &self,
         destination: i32,
         acks: i16,
-        batches: &Vec<Arc<ReadyWriteBatch>>,
+        batches: Vec<ReadyWriteBatch>,
     ) -> Result<()> {
         if batches.is_empty() {
             return Ok(());
         }
-        let mut records_by_bucket = HashMap::new();
-        let mut write_batch_by_table = HashMap::new();
+        let mut records_by_bucket: HashMap<TableBucket, ReadyWriteBatch> = 
HashMap::new();
+        let mut write_batch_by_table: HashMap<i64, Vec<TableBucket>> = 
HashMap::new();
 
         for batch in batches {
-            records_by_bucket.insert(batch.table_bucket.clone(), 
batch.clone());
+            let table_bucket = batch.table_bucket.clone();
             write_batch_by_table
-                .entry(batch.table_bucket.table_id())
-                .or_insert_with(Vec::new)
-                .push(batch);
+                .entry(table_bucket.table_id())
+                .or_default()
+                .push(table_bucket.clone());
+            records_by_bucket.insert(table_bucket, batch);
         }
 
         let cluster = self.metadata.get_cluster();
 
-        let destination_node =
-            cluster
-                .get_tablet_server(destination)
-                .ok_or(Error::LeaderNotAvailable {
-                    message: format!("destination node not found in metadata 
cache {destination}."),
-                })?;
-        let connection = self.metadata.get_connection(destination_node).await?;
+        let destination_node = match cluster.get_tablet_server(destination) {
+            Some(node) => node,
+            None => {
+                self.handle_batches_with_error(
+                    records_by_bucket.into_values().collect(),
+                    FlussError::LeaderNotAvailableException,
+                    format!("Destination node not found in metadata cache 
{destination}."),
+                )
+                .await?;
+                return Ok(());
+            }
+        };
+        let connection = match 
self.metadata.get_connection(destination_node).await {
+            Ok(connection) => connection,
+            Err(e) => {
+                self.handle_batches_with_error(
+                    records_by_bucket.into_values().collect(),
+                    FlussError::NetworkException,
+                    format!("Failed to connect destination node {destination}: 
{e}"),
+                )
+                .await?;
+                return Ok(());
+            }
+        };
+
+        for (table_id, table_buckets) in write_batch_by_table {
+            let request_batches: Vec<&ReadyWriteBatch> = table_buckets
+                .iter()
+                .filter_map(|bucket| records_by_bucket.get(bucket))
+                .collect();
+            if request_batches.is_empty() {
+                continue;
+            }
+            let request = match ProduceLogRequest::new(
+                table_id,
+                acks,
+                self.max_request_timeout_ms,
+                request_batches.as_slice(),
+            ) {
+                Ok(request) => request,
+                Err(e) => {
+                    self.handle_batches_with_error(
+                        table_buckets
+                            .iter()
+                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
+                            .collect(),
+                        FlussError::UnknownServerError,
+                        format!("Failed to build produce request: {e}"),
+                    )
+                    .await?;
+                    continue;
+                }
+            };
 
-        for (table_id, write_batches) in write_batch_by_table {
-            let request =
-                ProduceLogRequest::new(table_id, acks, 
self.max_request_timeout_ms, write_batches)?;
-            let response = connection.request(request).await?;
-            self.handle_produce_response(table_id, &records_by_bucket, 
response)?
+            let response = match connection.request(request).await {
+                Ok(response) => response,
+                Err(e) => {
+                    self.handle_batches_with_error(
+                        table_buckets
+                            .iter()
+                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
+                            .collect(),
+                        FlussError::NetworkException,
+                        format!("Failed to send produce request: {e}"),
+                    )
+                    .await?;
+                    continue;
+                }
+            };
+
+            self.handle_produce_response(
+                table_id,
+                &table_buckets,
+                &mut records_by_bucket,
+                response,
+            )
+            .await?;
         }
 
         Ok(())
     }
 
-    fn handle_produce_response(
+    async fn handle_produce_response(
         &self,
         table_id: i64,
-        records_by_bucket: &HashMap<TableBucket, Arc<ReadyWriteBatch>>,
+        request_buckets: &[TableBucket],
+        records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
         response: ProduceLogResponse,
     ) -> Result<()> {
+        let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+        let mut pending_buckets: HashSet<TableBucket> = 
request_buckets.iter().cloned().collect();
         for produce_log_response_for_bucket in response.buckets_resp.iter() {
             let tb = TableBucket::new(table_id, 
produce_log_response_for_bucket.bucket_id);
 
-            let ready_batch = records_by_bucket.get(&tb).unwrap();
+            let Some(ready_batch) = records_by_bucket.remove(&tb) else {
+                warn!("Missing ready batch for table bucket {tb}");
+                continue;
+            };
+            pending_buckets.remove(&tb);
+
             if let Some(error_code) = 
produce_log_response_for_bucket.error_code {
-                todo!("handle_produce_response error: {}", error_code)
+                if error_code == FlussError::None.code() {
+                    self.complete_batch(ready_batch);
+                    continue;
+                }
+
+                let error = FlussError::for_code(error_code);
+                let message = produce_log_response_for_bucket
+                    .error_message
+                    .clone()
+                    .unwrap_or_else(|| error.message().to_string());
+                if let Some(table_path) = self
+                    .handle_write_batch_error(ready_batch, error, message)
+                    .await?
+                {
+                    invalid_metadata_tables.insert(table_path);
+                }
             } else {
                 self.complete_batch(ready_batch)
             }
         }
+        if !pending_buckets.is_empty() {
+            for bucket in pending_buckets {
+                if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
+                    let message =
+                        format!("Missing response for table bucket {bucket} in 
produce response.");

Review Comment:
   makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to