fresh-borzoni commented on code in PR #184:
URL: https://github.com/apache/fluss-rust/pull/184#discussion_r2709584004
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -211,87 +215,159 @@ impl Sender {
records_by_bucket.insert(request_batch.table_bucket.clone(),
request_batch);
}
- let response = match connection.request(request).await {
- Ok(response) => response,
- Err(e) => {
- self.handle_batches_with_error(
- table_buckets
- .iter()
- .filter_map(|bucket|
records_by_bucket.remove(bucket))
- .collect(),
- FlussError::NetworkException,
- format!("Failed to send produce request: {e}"),
- )
- .await?;
- continue;
- }
- };
-
- self.handle_produce_response(
+ self.send_and_handle_response(
+ &connection,
+ write_request,
table_id,
&table_buckets,
&mut records_by_bucket,
- response,
)
.await?;
}
Ok(())
}
- async fn handle_produce_response(
+ fn build_write_request(
+ table_id: i64,
+ acks: i16,
+ timeout_ms: i32,
+ request_batches: &mut [ReadyWriteBatch],
+ ) -> Result<WriteRequest> {
+ let first_batch = &request_batches.first().unwrap().write_batch;
+
+ let request = match first_batch {
Review Comment:
Should we symmetrically validate ArrowLog as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]