fresh-borzoni commented on code in PR #184:
URL: https://github.com/apache/fluss-rust/pull/184#discussion_r2709622733
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -211,87 +215,159 @@ impl Sender {
records_by_bucket.insert(request_batch.table_bucket.clone(),
request_batch);
}
- let response = match connection.request(request).await {
- Ok(response) => response,
- Err(e) => {
- self.handle_batches_with_error(
- table_buckets
- .iter()
- .filter_map(|bucket|
records_by_bucket.remove(bucket))
- .collect(),
- FlussError::NetworkException,
- format!("Failed to send produce request: {e}"),
- )
- .await?;
- continue;
- }
- };
-
- self.handle_produce_response(
+ self.send_and_handle_response(
+ &connection,
+ write_request,
table_id,
&table_buckets,
&mut records_by_bucket,
- response,
)
.await?;
}
Ok(())
}
- async fn handle_produce_response(
+ fn build_write_request(
+ table_id: i64,
+ acks: i16,
+ timeout_ms: i32,
+ request_batches: &mut [ReadyWriteBatch],
+ ) -> Result<WriteRequest> {
+ let first_batch = &request_batches.first().unwrap().write_batch;
+
+ let request = match first_batch {
Review Comment:
Ah, I see, It's a bit hidden invariant at work, I think we may assume that
all the batches by table_id are of the same type, so first batch is fine here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]