leekeiabstraction commented on code in PR #220:
URL: https://github.com/apache/fluss-rust/pull/220#discussion_r2740749128
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -185,34 +190,73 @@ impl RecordAccumulator {
true, false, true,
));
}
- self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
+ self.append_new_batch(cluster, record, &mut dq_guard)
}
pub async fn ready(&self, cluster: &Arc<Cluster>) ->
Result<ReadyCheckResult> {
// Snapshot just the Arcs we need, avoiding cloning the entire
BucketAndWriteBatches struct
- let entries: Vec<(TablePath, BucketBatches)> = self
+ let entries: Vec<(
+ Arc<PhysicalTablePath>,
+ bool,
+ Option<PartitionId>,
+ BucketBatches,
+ )> = self
.write_batches
.iter()
.map(|entry| {
- let table_path = entry.key().clone();
+ let physical_table_path = Arc::clone(entry.key());
+ let is_partitioned_table = entry.value().is_partitioned_table;
+ let partition_id = entry.value().partition_id;
let bucket_batches: Vec<_> = entry
.value()
.batches
.iter()
.map(|(bucket_id, batch_arc)| (*bucket_id,
batch_arc.clone()))
.collect();
- (table_path, bucket_batches)
+ (
+ physical_table_path,
+ is_partitioned_table,
+ partition_id,
+ bucket_batches,
+ )
})
.collect();
let mut ready_nodes = HashSet::new();
let mut next_ready_check_delay_ms = self.batch_timeout_ms;
let mut unknown_leader_tables = HashSet::new();
- for (table_path, bucket_batches) in entries {
+ for (physical_table_path, is_partitioned_table, mut partition_id,
bucket_batches) in entries
+ {
+ // First check this table has partitionId.
+ if is_partitioned_table && partition_id.is_none() {
+ partition_id = cluster.get_partition_id(&physical_table_path);
+
+ if partition_id.is_some() {
+ // Update the cached partition_id
+ if let Some(mut entry) =
self.write_batches.get_mut(&physical_table_path) {
+ entry.partition_id = partition_id;
+ }
+ } else {
+ log::debug!(
+ "Partition does not exist for {}, bucket will not be
set to ready",
+ physical_table_path.as_ref()
+ );
+
+ // TODO: we shouldn't add unready partitions to
unknownLeaderTables,
+ // because it cases PartitionNotExistException later
+ unknown_leader_tables.insert(physical_table_path);
+ return Ok(ReadyCheckResult::new(
Review Comment:
This is similar to Java flow:
https://github.com/apache/fluss/blob/96faf08cbd84d1d43a2ef610c189829fa9c34e76/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java#L463-L469
Returning is fine because sender.run_once() resolves the unknown table
leader and on subsequent run, goes through this flow again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]