fresh-borzoni commented on code in PR #184:
URL: https://github.com/apache/fluss-rust/pull/184#discussion_r2709135410


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -211,87 +212,161 @@ impl Sender {
                 records_by_bucket.insert(request_batch.table_bucket.clone(), 
request_batch);
             }
 
-            let response = match connection.request(request).await {
-                Ok(response) => response,
-                Err(e) => {
-                    self.handle_batches_with_error(
-                        table_buckets
-                            .iter()
-                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
-                            .collect(),
-                        FlussError::NetworkException,
-                        format!("Failed to send produce request: {e}"),
-                    )
-                    .await?;
-                    continue;
-                }
-            };
-
-            self.handle_produce_response(
+            self.send_and_handle_response(
+                &connection,
+                write_request,
                 table_id,
                 &table_buckets,
                 &mut records_by_bucket,
-                response,
             )
             .await?;
         }
 
         Ok(())
     }
 
-    async fn handle_produce_response(
+    fn build_write_request(
+        table_id: i64,
+        acks: i16,
+        timeout_ms: i32,
+        request_batches: &mut [ReadyWriteBatch],
+    ) -> Result<Option<WriteRequest>> {
+        let first_batch = match request_batches.first() {
+            None => return Ok(None),
+            Some(b) => &b.write_batch,
+        };
+
+        let request = match first_batch {
+            WriteBatch::ArrowLog(_) => {
+                let req = ProduceLogRequest::new(table_id, acks, timeout_ms, 
request_batches)?;
+                WriteRequest::ProduceLog(req)
+            }
+            WriteBatch::Kv(kv_write_batch) => {
+                let target_columns = kv_write_batch.target_columns.clone();
+                if let Some(batch) = request_batches.iter().next() {

Review Comment:
   why do we compare first batch with itself?
   Should it be cycle here and skip first batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to