leekeiabstraction commented on code in PR #184:
URL: https://github.com/apache/fluss-rust/pull/184#discussion_r2709450885


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -211,87 +212,161 @@ impl Sender {
                 records_by_bucket.insert(request_batch.table_bucket.clone(), 
request_batch);
             }
 
-            let response = match connection.request(request).await {
-                Ok(response) => response,
-                Err(e) => {
-                    self.handle_batches_with_error(
-                        table_buckets
-                            .iter()
-                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
-                            .collect(),
-                        FlussError::NetworkException,
-                        format!("Failed to send produce request: {e}"),
-                    )
-                    .await?;
-                    continue;
-                }
-            };
-
-            self.handle_produce_response(
+            self.send_and_handle_response(
+                &connection,
+                write_request,
                 table_id,
                 &table_buckets,
                 &mut records_by_bucket,
-                response,
             )
             .await?;
         }
 
         Ok(())
     }
 
-    async fn handle_produce_response(
+    fn build_write_request(
+        table_id: i64,
+        acks: i16,
+        timeout_ms: i32,
+        request_batches: &mut [ReadyWriteBatch],
+    ) -> Result<Option<WriteRequest>> {
+        let first_batch = match request_batches.first() {
+            None => return Ok(None),
+            Some(b) => &b.write_batch,
+        };
+
+        let request = match first_batch {
+            WriteBatch::ArrowLog(_) => {
+                let req = ProduceLogRequest::new(table_id, acks, timeout_ms, 
request_batches)?;
+                WriteRequest::ProduceLog(req)
+            }
+            WriteBatch::Kv(kv_write_batch) => {
+                let target_columns = kv_write_batch.target_columns.clone();
+                if let Some(batch) = request_batches.iter().next() {

Review Comment:
   Very well spotted! 
   
   Eyes / brain fatigue from looking at code all day :D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to