fresh-borzoni commented on code in PR #220:
URL: https://github.com/apache/fluss-rust/pull/220#discussion_r2740261996


##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -185,34 +190,73 @@ impl RecordAccumulator {
                 true, false, true,
             ));
         }
-        self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
+        self.append_new_batch(cluster, record, &mut dq_guard)
     }
 
     pub async fn ready(&self, cluster: &Arc<Cluster>) -> 
Result<ReadyCheckResult> {
         // Snapshot just the Arcs we need, avoiding cloning the entire 
BucketAndWriteBatches struct
-        let entries: Vec<(TablePath, BucketBatches)> = self
+        let entries: Vec<(
+            Arc<PhysicalTablePath>,
+            bool,
+            Option<PartitionId>,
+            BucketBatches,
+        )> = self
             .write_batches
             .iter()
             .map(|entry| {
-                let table_path = entry.key().clone();
+                let physical_table_path = Arc::clone(entry.key());
+                let is_partitioned_table = entry.value().is_partitioned_table;
+                let partition_id = entry.value().partition_id;
                 let bucket_batches: Vec<_> = entry
                     .value()
                     .batches
                     .iter()
                     .map(|(bucket_id, batch_arc)| (*bucket_id, 
batch_arc.clone()))
                     .collect();
-                (table_path, bucket_batches)
+                (
+                    physical_table_path,
+                    is_partitioned_table,
+                    partition_id,
+                    bucket_batches,
+                )
             })
             .collect();
 
         let mut ready_nodes = HashSet::new();
         let mut next_ready_check_delay_ms = self.batch_timeout_ms;
         let mut unknown_leader_tables = HashSet::new();
 
-        for (table_path, bucket_batches) in entries {
+        for (physical_table_path, is_partitioned_table, mut partition_id, 
bucket_batches) in entries
+        {
+            // First check this table has partitionId.
+            if is_partitioned_table && partition_id.is_none() {
+                partition_id = cluster.get_partition_id(&physical_table_path);
+
+                if partition_id.is_some() {
+                    // Update the cached partition_id
+                    if let Some(mut entry) = 
self.write_batches.get_mut(&physical_table_path) {
+                        entry.partition_id = partition_id;
+                    }
+                } else {
+                    log::debug!(
+                        "Partition does not exist for {}, bucket will not be 
set to ready",
+                        physical_table_path.as_ref()
+                    );
+
+                    // TODO: we shouldn't add unready partitions to 
unknownLeaderTables,
+                    // because it cases PartitionNotExistException later
+                    unknown_leader_tables.insert(physical_table_path);
+                    return Ok(ReadyCheckResult::new(

Review Comment:
   why return? shouldn't we continue?



##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -187,26 +213,46 @@ impl<'a> Lookuper<'a> {
     /// * `Err(Error)` - If the lookup fails
     pub async fn lookup(&mut self, row: &dyn InternalRow) -> 
Result<LookupResult<'_>> {
         // todo: support batch lookup
-        // Encode the key from the row
-        let encoded_key = self.key_encoder.encode_key(row)?;
-        let key_bytes = encoded_key.to_vec();
+        let pk_bytes = self.primary_key_encoder.encode_key(row)?.to_vec();
+        let bk_bytes = match &mut self.bucket_key_encoder {
+            Some(encoder) => &encoder.encode_key(row)?.to_vec(),

Review Comment:
   Wouldn't this temporary Vec be freed at the end of match, so we would return 
reference to something that would be freed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to