luoyuxia commented on code in PR #220:
URL: https://github.com/apache/fluss-rust/pull/220#discussion_r2742462736
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -215,38 +238,94 @@ impl Cluster {
let bucket_id = bucket_metadata.bucket_id;
let bucket = TableBucket::new(table_id, bucket_id);
let bucket_location;
+ let physical_table_path =
+
Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone())));
+
if let Some(leader_id) = bucket_metadata.leader_id
&& let Some(server_node) = servers.get(&leader_id)
{
bucket_location = BucketLocation::new(
bucket.clone(),
Some(server_node.clone()),
- table_path.clone(),
+ physical_table_path,
);
available_bucket_for_table.push(bucket_location.clone());
tmp_available_location_by_bucket
.insert(bucket.clone(), bucket_location.clone());
} else {
found_unavailable_bucket = true;
- bucket_location = BucketLocation::new(bucket.clone(),
None, table_path.clone());
+ bucket_location =
+ BucketLocation::new(bucket.clone(), None,
physical_table_path);
}
bucket_for_table.push(bucket_location.clone());
}
if found_unavailable_bucket {
- tmp_available_locations_by_path
- .insert(table_path.clone(),
available_bucket_for_table.clone());
+ tmp_available_locations_by_path.insert(
+ Arc::new(PhysicalTablePath::of(Arc::new(table_path))),
+ available_bucket_for_table.clone(),
+ );
} else {
- tmp_available_locations_by_path.insert(table_path.clone(),
bucket_for_table);
+ tmp_available_locations_by_path.insert(
+ Arc::new(PhysicalTablePath::of(Arc::new(table_path))),
+ bucket_for_table,
+ );
+ }
+ }
+
+ // Process partition metadata
+ for partition_metadata in metadata_response.partition_metadata {
+ let table_id = partition_metadata.table_id;
+ let partition_name = partition_metadata.partition_name;
+ let partition_id = partition_metadata.partition_id as PartitionId;
+
+ if let Some(table_path) = table_id_by_path
+ .iter()
+ .find(|&(_, &id)| id == table_id)
+ .map(|(path, _)| path.clone())
+ {
+ let physical_table_path =
Arc::new(PhysicalTablePath::of_partitioned(
+ Arc::new(table_path.clone()),
+ Some(partition_name),
+ ));
+ partitions_id_by_path.insert(Arc::clone(&physical_table_path),
partition_id);
+
+ // Process bucket metadata for partitioned tables
+ for bucket_metadata in partition_metadata.bucket_metadata {
Review Comment:
Seems code duplicate logic here, we can follow java side to reduce
duplicate.
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -343,11 +378,12 @@ impl Sender {
.error_message()
.cloned()
.unwrap_or_else(|| error.message().to_string());
- if let Some(table_path) = self
+ if let Some((table_path, physical_table_path)) = self
Review Comment:
`physical_table_path` also contains `table_path`, so do the tunple is
required? Can just return `physical_table_path`?
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -82,9 +82,39 @@ impl Sender {
// Update metadata if needed
if !ready_check_result.unknown_leader_tables.is_empty() {
- self.metadata
-
.update_tables_metadata(&ready_check_result.unknown_leader_tables.iter().collect())
- .await?;
+ let mut table_paths: HashSet<&TablePath> = HashSet::new();
+ let mut physical_table_paths: HashSet<&Arc<PhysicalTablePath>> =
HashSet::new();
+
+ for unknown_paths in
ready_check_result.unknown_leader_tables.iter() {
+ if unknown_paths.get_partition_name().is_some() {
+ physical_table_paths.insert(unknown_paths);
+ } else {
+ table_paths.insert(unknown_paths.get_table_path());
+ }
+ }
+
+ if let Err(e) = self
+ .metadata
+ .update_tables_metadata(&table_paths, &physical_table_paths,
vec![])
+ .await
+ {
+ match &e {
+ crate::error::Error::FlussAPIError { api_error }
+ if api_error.code ==
FlussError::PartitionNotExists.code() =>
+ {
+ debug!(
Review Comment:
change to warn!.
We're also going to change the log level in java side.
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -185,34 +190,73 @@ impl RecordAccumulator {
true, false, true,
));
}
- self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
+ self.append_new_batch(cluster, record, &mut dq_guard)
}
pub async fn ready(&self, cluster: &Arc<Cluster>) ->
Result<ReadyCheckResult> {
// Snapshot just the Arcs we need, avoiding cloning the entire
BucketAndWriteBatches struct
- let entries: Vec<(TablePath, BucketBatches)> = self
+ let entries: Vec<(
+ Arc<PhysicalTablePath>,
+ bool,
+ Option<PartitionId>,
+ BucketBatches,
+ )> = self
.write_batches
.iter()
.map(|entry| {
- let table_path = entry.key().clone();
+ let physical_table_path = Arc::clone(entry.key());
+ let is_partitioned_table = entry.value().is_partitioned_table;
+ let partition_id = entry.value().partition_id;
let bucket_batches: Vec<_> = entry
.value()
.batches
.iter()
.map(|(bucket_id, batch_arc)| (*bucket_id,
batch_arc.clone()))
.collect();
- (table_path, bucket_batches)
+ (
+ physical_table_path,
+ is_partitioned_table,
+ partition_id,
+ bucket_batches,
+ )
})
.collect();
let mut ready_nodes = HashSet::new();
let mut next_ready_check_delay_ms = self.batch_timeout_ms;
let mut unknown_leader_tables = HashSet::new();
- for (table_path, bucket_batches) in entries {
+ for (physical_table_path, is_partitioned_table, mut partition_id,
bucket_batches) in entries
Review Comment:
if `partition_name` in `physical_table_path` is not none, it's partitioned
table. So, is_partitioned_table can be removed. Right?
##########
crates/fluss/src/metadata/table.rs:
##########
@@ -749,19 +750,19 @@ impl TablePath {
/// `partition_name` will be `Some(...)`; otherwise, it will be `None`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhysicalTablePath {
- table_path: TablePath,
+ table_path: Arc<TablePath>,
Review Comment:
why make it as `Arc`? I think `TablePath` should be good.
##########
crates/fluss/src/metadata/table.rs:
##########
@@ -373,7 +374,7 @@ impl TableDescriptorBuilder {
self
}
- pub fn partitioned_by(mut self, partition_keys: Vec<String>) -> Self {
+ pub fn partitioned_by(mut self, partition_keys: Arc<[String]>) -> Self {
Review Comment:
why change this. From user side, pass a `Vec<String>` should be simpler
than pass `Arc`, right?
##########
crates/fluss/src/client/table/lookup.rs:
##########
@@ -187,26 +213,47 @@ impl<'a> Lookuper<'a> {
/// * `Err(Error)` - If the lookup fails
pub async fn lookup(&mut self, row: &dyn InternalRow) ->
Result<LookupResult<'_>> {
// todo: support batch lookup
- // Encode the key from the row
- let encoded_key = self.key_encoder.encode_key(row)?;
- let key_bytes = encoded_key.to_vec();
+ let pk_bytes = self.primary_key_encoder.encode_key(row)?;
+ let pk_bytes_vec = pk_bytes.to_vec();
+ let bk_bytes = match &mut self.bucket_key_encoder {
+ Some(encoder) => &encoder.encode_key(row)?,
+ None => &pk_bytes,
+ };
+
+ let partition_id = if let Some(ref partition_getter) =
self.partition_getter {
+ let partition_name = partition_getter.get_partition(row)?;
+ let physical_table_path = PhysicalTablePath::of_partitioned(
+ Arc::clone(&self.table_path),
+ Some(partition_name),
+ );
+ let cluster = self.metadata.get_cluster();
+ match cluster.get_partition_id(&physical_table_path) {
+ Some(id) => Some(id),
+ None => {
+ // Partition doesn't exist, return empty result (like Java)
+ return Ok(LookupResult::empty(self.table_info.row_type()));
+ }
+ }
+ } else {
+ None
+ };
- // Compute bucket from encoded key
let bucket_id = self
.bucketing_function
- .bucketing(&key_bytes, self.num_buckets)?;
+ .bucketing(bk_bytes, self.num_buckets)?;
let table_id = self.table_info.get_table_id();
- let table_bucket = TableBucket::new(table_id, bucket_id);
+ let table_bucket = TableBucket::new_with_partition(table_id,
partition_id, bucket_id);
// Find the leader for this bucket
let cluster = self.metadata.get_cluster();
- let leader =
- cluster
- .leader_for(&table_bucket)
- .ok_or_else(|| Error::LeaderNotAvailable {
- message: format!("No leader found for table bucket:
{table_bucket}"),
- })?;
+ let leader = self
+ .metadata
+ .leader_for(self.table_info.get_table_path(), &table_bucket)
Review Comment:
nit:
```suggestion
.leader_for(self.table_path.as_ref(), &table_bucket)
```
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -269,14 +348,43 @@ impl Cluster {
pub fn get_table_bucket(
&self,
- table_path: &TablePath,
+ physical_table_path: &PhysicalTablePath,
bucket_id: BucketId,
) -> Result<TableBucket> {
- let table_info = self.get_table(table_path)?;
- Ok(TableBucket::new(table_info.table_id, bucket_id))
+ let table_info = self.get_table(physical_table_path.get_table_path())?;
+ let partition_id = self.get_partition_id(physical_table_path);
+
+ if physical_table_path.get_partition_name().is_some() &&
partition_id.is_none() {
+ return Err(InvalidPartition {
Review Comment:
May PartitionNotExistException be better?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]