fresh-borzoni commented on code in PR #220:
URL: https://github.com/apache/fluss-rust/pull/220#discussion_r2742426576


##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -185,34 +190,73 @@ impl RecordAccumulator {
                 true, false, true,
             ));
         }
-        self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
+        self.append_new_batch(cluster, record, &mut dq_guard)
     }
 
     pub async fn ready(&self, cluster: &Arc<Cluster>) -> 
Result<ReadyCheckResult> {
         // Snapshot just the Arcs we need, avoiding cloning the entire 
BucketAndWriteBatches struct
-        let entries: Vec<(TablePath, BucketBatches)> = self
+        let entries: Vec<(
+            Arc<PhysicalTablePath>,
+            bool,
+            Option<PartitionId>,
+            BucketBatches,
+        )> = self
             .write_batches
             .iter()
             .map(|entry| {
-                let table_path = entry.key().clone();
+                let physical_table_path = Arc::clone(entry.key());
+                let is_partitioned_table = entry.value().is_partitioned_table;
+                let partition_id = entry.value().partition_id;
                 let bucket_batches: Vec<_> = entry
                     .value()
                     .batches
                     .iter()
                     .map(|(bucket_id, batch_arc)| (*bucket_id, 
batch_arc.clone()))
                     .collect();
-                (table_path, bucket_batches)
+                (
+                    physical_table_path,
+                    is_partitioned_table,
+                    partition_id,
+                    bucket_batches,
+                )
             })
             .collect();
 
         let mut ready_nodes = HashSet::new();
         let mut next_ready_check_delay_ms = self.batch_timeout_ms;
         let mut unknown_leader_tables = HashSet::new();
 
-        for (table_path, bucket_batches) in entries {
+        for (physical_table_path, is_partitioned_table, mut partition_id, 
bucket_batches) in entries
+        {
+            // First check this table has partitionId.
+            if is_partitioned_table && partition_id.is_none() {
+                partition_id = cluster.get_partition_id(&physical_table_path);
+
+                if partition_id.is_some() {
+                    // Update the cached partition_id
+                    if let Some(mut entry) = 
self.write_batches.get_mut(&physical_table_path) {
+                        entry.partition_id = partition_id;
+                    }
+                } else {
+                    log::debug!(
+                        "Partition does not exist for {}, bucket will not be 
set to ready",
+                        physical_table_path.as_ref()
+                    );
+
+                    // TODO: we shouldn't add unready partitions to 
unknownLeaderTables,
+                    // because it cases PartitionNotExistException later
+                    unknown_leader_tables.insert(physical_table_path);
+                    return Ok(ReadyCheckResult::new(

Review Comment:
   Java has loop and separate BucketReady result per iteration, rust has loop 
and early return if condition isn't met, so it's quite different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to