AndreaBozzo commented on code in PR #184:
URL: https://github.com/apache/fluss-rust/pull/184#discussion_r2710121664


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -211,87 +215,159 @@ impl Sender {
                 records_by_bucket.insert(request_batch.table_bucket.clone(), 
request_batch);
             }
 
-            let response = match connection.request(request).await {
-                Ok(response) => response,
-                Err(e) => {
-                    self.handle_batches_with_error(
-                        table_buckets
-                            .iter()
-                            .filter_map(|bucket| 
records_by_bucket.remove(bucket))
-                            .collect(),
-                        FlussError::NetworkException,
-                        format!("Failed to send produce request: {e}"),
-                    )
-                    .await?;
-                    continue;
-                }
-            };
-
-            self.handle_produce_response(
+            self.send_and_handle_response(
+                &connection,
+                write_request,
                 table_id,
                 &table_buckets,
                 &mut records_by_bucket,
-                response,
             )
             .await?;
         }
 
         Ok(())
     }
 
-    async fn handle_produce_response(
+    fn build_write_request(
+        table_id: i64,
+        acks: i16,
+        timeout_ms: i32,
+        request_batches: &mut [ReadyWriteBatch],
+    ) -> Result<WriteRequest> {
+        let first_batch = &request_batches.first().unwrap().write_batch;
+
+        let request = match first_batch {
+            WriteBatch::ArrowLog(_) => {
+                let req = ProduceLogRequest::new(table_id, acks, timeout_ms, 
request_batches)?;
+                WriteRequest::ProduceLog(req)
+            }
+            WriteBatch::Kv(kv_write_batch) => {
+                let target_columns = kv_write_batch.target_columns();
+                for batch in request_batches.iter().skip(1) {
+                    match &batch.write_batch {
+                        WriteBatch::ArrowLog(_) => {
+                            return Err(UnexpectedError {
+                                message: "Expecting KvWriteBatch but found 
ArrowLogWriteBatch"
+                                    .to_string(),
+                                source: None,
+                            });
+                        }
+                        WriteBatch::Kv(kvb) => {
+                            if target_columns != kvb.target_columns() {
+                                return Err(UnexpectedError {
+                                    message: format!(
+                                        "All the write batches to make put kv 
request should have the same target columns, but got {:?} and {:?}.",
+                                        target_columns,
+                                        kvb.target_columns()
+                                    ),
+                                    source: None,
+                                });
+                            }
+                        }
+                    }
+                }
+                let cols = target_columns
+                    .map(|arc| arc.iter().map(|&c| c as i32).collect())
+                    .unwrap_or_default();
+                let req = PutKvRequest::new(table_id, acks, timeout_ms, cols, 
request_batches)?;
+                WriteRequest::PutKv(req)
+            }
+        };
+
+        Ok(request)
+    }
+
+    async fn send_and_handle_response(
+        &self,
+        connection: &ServerConnection,
+        write_request: WriteRequest,
+        table_id: i64,
+        table_buckets: &[TableBucket],
+        records_by_bucket: &mut HashMap<TableBucket, ReadyWriteBatch>,
+    ) -> Result<()> {
+        macro_rules! send {
+            ($request:expr) => {
+                match connection.request($request).await {
+                    Ok(response) => {
+                        self.handle_write_response(
+                            table_id,
+                            table_buckets,
+                            records_by_bucket,
+                            response,

Review Comment:
   cool macro, but i wonder if this can become an async trait



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to