This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 83fa0aa feat: Implement partitioning in Upsert / Lookup (#220)
83fa0aa is described below
commit 83fa0aa698140e613c3700d8811cd6a1258781f3
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 30 08:08:25 2026 +0000
feat: Implement partitioning in Upsert / Lookup (#220)
---
crates/examples/Cargo.toml | 6 +-
.../examples/src/example_partitioned_kv_table.rs | 153 ++++++++++++++++
crates/examples/src/example_table.rs | 1 +
crates/fluss/src/bucketing/mod.rs | 1 -
crates/fluss/src/client/admin.rs | 8 +-
crates/fluss/src/client/connection.rs | 5 -
crates/fluss/src/client/metadata.rs | 81 ++++++--
crates/fluss/src/client/table/append.rs | 23 ++-
crates/fluss/src/client/table/log_fetch_buffer.rs | 9 +-
crates/fluss/src/client/table/lookup.rs | 95 +++++++---
crates/fluss/src/client/table/mod.rs | 2 +-
crates/fluss/src/client/table/scanner.rs | 19 +-
crates/fluss/src/client/table/upsert.rs | 39 +++-
crates/fluss/src/client/table/writer.rs | 55 ------
crates/fluss/src/client/write/accumulator.rs | 128 ++++++++-----
crates/fluss/src/client/write/batch.rs | 72 ++++----
crates/fluss/src/client/write/bucket_assigner.rs | 16 +-
crates/fluss/src/client/write/mod.rs | 33 +++-
crates/fluss/src/client/write/sender.rs | 118 +++++++++---
crates/fluss/src/client/write/writer_client.rs | 47 +++--
crates/fluss/src/cluster/cluster.rs | 204 ++++++++++++++++-----
crates/fluss/src/cluster/mod.rs | 13 +-
crates/fluss/src/error.rs | 6 +
crates/fluss/src/metadata/datatype.rs | 9 +
crates/fluss/src/metadata/table.rs | 45 +++--
crates/fluss/src/record/arrow.rs | 13 +-
crates/fluss/src/rpc/message/update_metadata.rs | 26 ++-
crates/fluss/src/test_utils.rs | 11 +-
crates/fluss/tests/integration/kv_table.rs | 175 +++++++++++++++++-
29 files changed, 1071 insertions(+), 342 deletions(-)
diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index 117ceb2..16629be 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -33,4 +33,8 @@ path = "src/example_table.rs"
[[example]]
name = "example-upsert-lookup"
-path = "src/example_kv_table.rs"
\ No newline at end of file
+path = "src/example_kv_table.rs"
+
+[[example]]
+name = "example-partitioned-upsert-lookup"
+path = "src/example_partitioned_kv_table.rs"
\ No newline at end of file
diff --git a/crates/examples/src/example_partitioned_kv_table.rs
b/crates/examples/src/example_partitioned_kv_table.rs
new file mode 100644
index 0000000..a5e76fa
--- /dev/null
+++ b/crates/examples/src/example_partitioned_kv_table.rs
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor,
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+use std::collections::HashMap;
+
+#[tokio::main]
+#[allow(dead_code)]
+pub async fn main() -> Result<()> {
+ let mut config = Config::parse();
+ config.bootstrap_server = Some("127.0.0.1:9123".to_string());
+
+ let conn = FlussConnection::new(config).await?;
+
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("id", DataTypes::int())
+ .column("region", DataTypes::string())
+ .column("zone", DataTypes::bigint())
+ .column("score", DataTypes::bigint())
+ .primary_key(vec![
+ "id".to_string(),
+ "region".to_string(),
+ "zone".to_string(),
+ ])
+ .build()?,
+ )
+ .partitioned_by(vec!["region".to_string(), "zone".to_string()])
+ .build()?;
+
+ let table_path = TablePath::new("fluss".to_owned(),
"partitioned_kv_example".to_owned());
+
+ let mut admin = conn.get_admin().await?;
+ admin
+ .create_table(&table_path, &table_descriptor, true)
+ .await?;
+ println!(
+ "Created KV Table:\n {}\n",
+ admin.get_table(&table_path).await?
+ );
+
+ create_partition(&table_path, &mut admin, "APAC", 1).await;
+ create_partition(&table_path, &mut admin, "EMEA", 2).await;
+ create_partition(&table_path, &mut admin, "US", 3).await;
+
+ let table = conn.get_table(&table_path).await?;
+ let table_upsert = table.new_upsert()?;
+ let mut upsert_writer = table_upsert.create_writer()?;
+
+ println!("\n=== Upserting ===");
+ for (id, region, zone, score) in [
+ (1001, "APAC", 1i64, 1234i64),
+ (1002, "EMEA", 2, 2234),
+ (1003, "US", 3, 3234),
+ ] {
+ let mut row = GenericRow::new(4);
+ row.set_field(0, id);
+ row.set_field(1, region);
+ row.set_field(2, zone);
+ row.set_field(3, score);
+ upsert_writer.upsert(&row).await?;
+ println!("Upserted: {row:?}");
+ }
+
+ println!("\n=== Looking up ===");
+ let mut lookuper = table.new_lookup()?.create_lookuper()?;
+
+ for (id, region, zone) in [(1001, "APAC", 1i64), (1002, "EMEA", 2), (1003,
"US", 3)] {
+ let result = lookuper
+ .lookup(&make_key(id, region, zone))
+ .await
+ .expect("lookup");
+ let row = result.get_single_row()?.unwrap();
+ println!(
+ "Found id={id}: region={}, zone={}, score={}",
+ row.get_string(1),
+ row.get_long(2),
+ row.get_long(3)
+ );
+ }
+
+ println!("\n=== Updating ===");
+ let mut row = GenericRow::new(4);
+ row.set_field(0, 1001);
+ row.set_field(1, "APAC");
+ row.set_field(2, 1i64);
+ row.set_field(3, 4321i64);
+ upsert_writer.upsert(&row).await?;
+ println!("Updated: {row:?}");
+
+ let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?;
+ let row = result.get_single_row()?.unwrap();
+ println!(
+ "Verified update: region={}, zone={}",
+ row.get_string(1),
+ row.get_long(2)
+ );
+
+ println!("\n=== Deleting ===");
+ let mut row = GenericRow::new(4);
+ row.set_field(0, 1002);
+ row.set_field(1, "EMEA");
+ row.set_field(2, 2i64);
+ upsert_writer.delete(&row).await?;
+ println!("Deleted: {row:?}");
+
+ let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
+ if result.get_single_row()?.is_none() {
+ println!("Verified deletion");
+ }
+
+ Ok(())
+}
+
+async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin,
region: &str, zone: i64) {
+ let mut partition_values = HashMap::new();
+ partition_values.insert("region".to_string(), region.to_string());
+ partition_values.insert("zone".to_string(), zone.to_string());
+ let partition_spec = PartitionSpec::new(partition_values);
+
+ admin
+ .create_partition(table_path, &partition_spec, true)
+ .await
+ .unwrap();
+}
+
+fn make_key(id: i32, region: &str, zone: i64) -> GenericRow<'static> {
+ let mut row = GenericRow::new(4);
+ row.set_field(0, id);
+ row.set_field(1, region.to_string());
+ row.set_field(2, zone);
+ row
+}
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index ca6b942..92055a7 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -16,6 +16,7 @@
// under the License.
mod example_kv_table;
+mod example_partitioned_kv_table;
use clap::Parser;
use fluss::client::FlussConnection;
diff --git a/crates/fluss/src/bucketing/mod.rs
b/crates/fluss/src/bucketing/mod.rs
index 2611ac7..1b43d12 100644
--- a/crates/fluss/src/bucketing/mod.rs
+++ b/crates/fluss/src/bucketing/mod.rs
@@ -24,7 +24,6 @@ pub trait BucketingFunction: Sync + Send {
fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32>;
}
-#[allow(dead_code)]
impl dyn BucketingFunction {
/// Provides the bucketing function for a given [DataLakeFormat]
///
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 286c46c..ea1efc3 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -32,7 +32,7 @@ use crate::rpc::{RpcClient, ServerConnection};
use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
use crate::{BucketId, PartitionId, TableId};
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::slice::from_ref;
use std::sync::Arc;
use tokio::task::JoinHandle;
@@ -111,6 +111,12 @@ impl FlussAdmin {
.admin_gateway
.request(GetTableRequest::new(table_path))
.await?;
+
+ // force update to avoid stale data in cache
+ self.metadata
+ .update_tables_metadata(&HashSet::from([table_path]),
&HashSet::new(), vec![])
+ .await?;
+
let GetTableInfoResponse {
table_id,
schema_id,
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index a19dbd2..e021011 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -93,11 +93,6 @@ impl FlussConnection {
pub async fn get_table(&self, table_path: &TablePath) ->
Result<FlussTable<'_>> {
self.metadata.update_table_metadata(table_path).await?;
let table_info =
self.metadata.get_cluster().get_table(table_path)?.clone();
- if table_info.is_partitioned() {
- return Err(crate::error::Error::UnsupportedOperation {
- message: "Partitioned tables are not supported".to_string(),
- });
- }
Ok(FlussTable::new(self, self.metadata.clone(), table_info))
}
}
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index 614f6e7..52ccd62 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -58,7 +58,14 @@ impl Metadata {
ServerType::CoordinatorServer,
);
let con = connections.get_connection(&server_node).await?;
- let response = con.request(UpdateMetadataRequest::new(&[])).await?;
+
+ let response = con
+ .request(UpdateMetadataRequest::new(
+ &HashSet::default(),
+ &HashSet::new(),
+ vec![],
+ ))
+ .await?;
Cluster::from_metadata_response(response, None)
}
@@ -95,7 +102,12 @@ impl Metadata {
Ok(())
}
- pub async fn update_tables_metadata(&self, table_paths:
&HashSet<&TablePath>) -> Result<()> {
+ pub async fn update_tables_metadata(
+ &self,
+ table_paths: &HashSet<&TablePath>,
+ physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
+ partition_ids: Vec<i64>,
+ ) -> Result<()> {
let maybe_server = {
let guard = self.cluster.read();
guard.get_one_available_server().cloned()
@@ -114,16 +126,19 @@ impl Metadata {
let conn = self.connections.get_connection(&server).await?;
- let update_table_paths: Vec<&TablePath> =
table_paths.iter().copied().collect();
let response = conn
- .request(UpdateMetadataRequest::new(update_table_paths.as_slice()))
+ .request(UpdateMetadataRequest::new(
+ table_paths,
+ physical_table_paths,
+ partition_ids,
+ ))
.await?;
self.update(response).await?;
Ok(())
}
pub async fn update_table_metadata(&self, table_path: &TablePath) ->
Result<()> {
- self.update_tables_metadata(&HashSet::from([table_path]))
+ self.update_tables_metadata(&HashSet::from([table_path]),
&HashSet::new(), vec![])
.await
}
@@ -133,8 +148,9 @@ impl Metadata {
.iter()
.filter(|table_path|
cluster_binding.opt_get_table(table_path).is_none())
.collect();
+
if !need_update_table_paths.is_empty() {
- self.update_tables_metadata(&need_update_table_paths)
+ self.update_tables_metadata(&need_update_table_paths,
&HashSet::new(), vec![])
.await?;
}
Ok(())
@@ -150,7 +166,48 @@ impl Metadata {
guard.clone()
}
- pub fn leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode>
{
+ const MAX_RETRY_TIMES: u8 = 3;
+
+ pub async fn leader_for(
+ &self,
+ table_path: &TablePath,
+ table_bucket: &TableBucket,
+ ) -> Result<Option<ServerNode>> {
+ let leader = self.get_leader_for(table_bucket);
+
+ if leader.is_some() {
+ Ok(leader)
+ } else {
+ for _ in 0..Self::MAX_RETRY_TIMES {
+ if let Some(partition_id) = table_bucket.partition_id() {
+ self.update_tables_metadata(
+ &HashSet::from([table_path]),
+ &HashSet::new(),
+ vec![partition_id],
+ )
+ .await?;
+ } else {
+ self.update_tables_metadata(
+ &HashSet::from([table_path]),
+ &HashSet::new(),
+ vec![],
+ )
+ .await?;
+ }
+
+ let cluster = self.cluster.read();
+ let leader = cluster.leader_for(table_bucket);
+
+ if leader.is_some() {
+ return Ok(leader.cloned());
+ }
+ }
+
+ Ok(None)
+ }
+ }
+
+ fn get_leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode>
{
let cluster = self.cluster.read();
cluster.leader_for(table_bucket).cloned()
}
@@ -173,14 +230,16 @@ mod tests {
use crate::metadata::{TableBucket, TablePath};
use crate::test_utils::build_cluster_arc;
- #[test]
- fn leader_for_returns_server() {
+ #[tokio::test]
+ async fn leader_for_returns_server() {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let cluster = build_cluster_arc(&table_path, 1, 1);
let metadata = Metadata::new_for_test(cluster);
let leader = metadata
- .leader_for(&TableBucket::new(1, 0))
- .expect("leader");
+ .leader_for(&table_path, &TableBucket::new(1, 0))
+ .await
+ .expect("leader request should be Ok")
+ .expect("leader should exist");
assert_eq!(leader.id(), 1);
}
diff --git a/crates/fluss/src/client/table/append.rs
b/crates/fluss/src/client/table/append.rs
index 6d76f28..7fe2023 100644
--- a/crates/fluss/src/client/table/append.rs
+++ b/crates/fluss/src/client/table/append.rs
@@ -17,7 +17,7 @@
use crate::client::{WriteRecord, WriterClient};
use crate::error::Result;
-use crate::metadata::{TableInfo, TablePath};
+use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
use crate::row::GenericRow;
use arrow::array::RecordBatch;
use std::sync::Arc;
@@ -25,14 +25,14 @@ use std::sync::Arc;
#[allow(dead_code)]
pub struct TableAppend {
table_path: TablePath,
- table_info: TableInfo,
+ table_info: Arc<TableInfo>,
writer_client: Arc<WriterClient>,
}
impl TableAppend {
pub(super) fn new(
table_path: TablePath,
- table_info: TableInfo,
+ table_info: Arc<TableInfo>,
writer_client: Arc<WriterClient>,
) -> Self {
Self {
@@ -44,23 +44,27 @@ impl TableAppend {
pub fn create_writer(&self) -> AppendWriter {
AppendWriter {
- table_path: Arc::new(self.table_path.clone()),
+ physical_table_path:
Arc::new(PhysicalTablePath::of(Arc::new(self.table_path.clone()))),
writer_client: self.writer_client.clone(),
- table_info: Arc::new(self.table_info.clone()),
+ table_info: Arc::clone(&self.table_info),
}
}
}
pub struct AppendWriter {
- table_path: Arc<TablePath>,
+ physical_table_path: Arc<PhysicalTablePath>,
writer_client: Arc<WriterClient>,
table_info: Arc<TableInfo>,
}
impl AppendWriter {
pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
- let record =
- WriteRecord::for_append(self.table_path.clone(),
self.table_info.schema_id, row);
+ let record = WriteRecord::for_append(
+ Arc::clone(&self.table_info),
+ Arc::clone(&self.physical_table_path),
+ self.table_info.schema_id,
+ row,
+ );
let result_handle = self.writer_client.send(&record).await?;
let result = result_handle.wait().await?;
result_handle.result(result)
@@ -68,7 +72,8 @@ impl AppendWriter {
pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
let record = WriteRecord::for_append_record_batch(
- self.table_path.clone(),
+ Arc::clone(&self.table_info),
+ Arc::clone(&self.physical_table_path),
self.table_info.schema_id,
batch,
);
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 7ece34b..78ee065 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -833,9 +833,10 @@ mod tests {
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
- use crate::metadata::{DataField, DataTypes, RowType, TablePath};
+ use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType,
TablePath};
use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext,
to_arrow_schema};
use crate::row::GenericRow;
+ use crate::test_utils::build_table_info;
use std::sync::Arc;
fn test_read_context() -> Result<ReadContext> {
@@ -899,7 +900,9 @@ mod tests {
DataField::new("id".to_string(), DataTypes::int(), None),
DataField::new("name".to_string(), DataTypes::string(), None),
]);
- let table_path = Arc::new(TablePath::new("db".to_string(),
"tbl".to_string()));
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+ let physical_table_path =
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
let mut builder = MemoryLogRecordsArrowBuilder::new(
1,
@@ -914,7 +917,7 @@ mod tests {
let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "alice");
- let record = WriteRecord::for_append(table_path, 1, row);
+ let record = WriteRecord::for_append(table_info, physical_table_path,
1, row);
builder.append(&record)?;
let data = builder.build()?;
diff --git a/crates/fluss/src/client/table/lookup.rs
b/crates/fluss/src/client/table/lookup.rs
index 4e89176..69cb91e 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -18,8 +18,9 @@
use crate::bucketing::BucketingFunction;
use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
+use crate::client::table::partition_getter::PartitionGetter;
use crate::error::{Error, Result};
-use crate::metadata::{RowType, TableBucket, TableInfo};
+use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo,
TablePath};
use crate::record::kv::SCHEMA_ID_LENGTH;
use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
@@ -133,20 +134,43 @@ impl<'a> TableLookup<'a> {
let data_lake_format =
self.table_info.get_table_config().get_datalake_format()?;
let bucketing_function = <dyn
BucketingFunction>::of(data_lake_format.as_ref());
- // Create key encoder for the primary key fields
- let pk_fields = self.table_info.get_physical_primary_keys().to_vec();
- let key_encoder = KeyEncoderFactory::of(
- self.table_info.row_type(),
- pk_fields.as_slice(),
- &data_lake_format,
- )?;
+ let row_type = self.table_info.row_type();
+ let primary_keys = self.table_info.get_primary_keys();
+ let lookup_row_type = row_type.project_with_field_names(primary_keys)?;
+
+ let physical_primary_keys =
self.table_info.get_physical_primary_keys().to_vec();
+ let primary_key_encoder =
+ KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys,
&data_lake_format)?;
+
+ let bucket_key_encoder = if self.table_info.is_default_bucket_key() {
+ None
+ } else {
+ let bucket_keys = self.table_info.get_bucket_keys().to_vec();
+ Some(KeyEncoderFactory::of(
+ &lookup_row_type,
+ &bucket_keys,
+ &data_lake_format,
+ )?)
+ };
+
+ let partition_getter = if self.table_info.is_partitioned() {
+ Some(PartitionGetter::new(
+ &lookup_row_type,
+ Arc::clone(self.table_info.get_partition_keys()),
+ )?)
+ } else {
+ None
+ };
Ok(Lookuper {
conn: self.conn,
+ table_path: Arc::new(self.table_info.table_path.clone()),
table_info: self.table_info,
metadata: self.metadata,
bucketing_function,
- key_encoder,
+ primary_key_encoder,
+ bucket_key_encoder,
+ partition_getter,
num_buckets,
})
}
@@ -163,13 +187,15 @@ impl<'a> TableLookup<'a> {
/// let row = GenericRow::new(vec![Datum::Int32(42)]); // lookup key
/// let result = lookuper.lookup(&row).await?;
/// ```
-// TODO: Support partitioned tables (extract partition from key)
pub struct Lookuper<'a> {
conn: &'a FlussConnection,
table_info: TableInfo,
+ table_path: Arc<TablePath>,
metadata: Arc<Metadata>,
bucketing_function: Box<dyn BucketingFunction>,
- key_encoder: Box<dyn KeyEncoder>,
+ primary_key_encoder: Box<dyn KeyEncoder>,
+ bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
+ partition_getter: Option<PartitionGetter>,
num_buckets: i32,
}
@@ -187,26 +213,47 @@ impl<'a> Lookuper<'a> {
/// * `Err(Error)` - If the lookup fails
pub async fn lookup(&mut self, row: &dyn InternalRow) ->
Result<LookupResult<'_>> {
// todo: support batch lookup
- // Encode the key from the row
- let encoded_key = self.key_encoder.encode_key(row)?;
- let key_bytes = encoded_key.to_vec();
+ let pk_bytes = self.primary_key_encoder.encode_key(row)?;
+ let pk_bytes_vec = pk_bytes.to_vec();
+ let bk_bytes = match &mut self.bucket_key_encoder {
+ Some(encoder) => &encoder.encode_key(row)?,
+ None => &pk_bytes,
+ };
+
+ let partition_id = if let Some(ref partition_getter) =
self.partition_getter {
+ let partition_name = partition_getter.get_partition(row)?;
+ let physical_table_path = PhysicalTablePath::of_partitioned(
+ Arc::clone(&self.table_path),
+ Some(partition_name),
+ );
+ let cluster = self.metadata.get_cluster();
+ match cluster.get_partition_id(&physical_table_path) {
+ Some(id) => Some(id),
+ None => {
+ // Partition doesn't exist, return empty result (like Java)
+ return Ok(LookupResult::empty(self.table_info.row_type()));
+ }
+ }
+ } else {
+ None
+ };
- // Compute bucket from encoded key
let bucket_id = self
.bucketing_function
- .bucketing(&key_bytes, self.num_buckets)?;
+ .bucketing(bk_bytes, self.num_buckets)?;
let table_id = self.table_info.get_table_id();
- let table_bucket = TableBucket::new(table_id, bucket_id);
+ let table_bucket = TableBucket::new_with_partition(table_id,
partition_id, bucket_id);
// Find the leader for this bucket
let cluster = self.metadata.get_cluster();
- let leader =
- cluster
- .leader_for(&table_bucket)
- .ok_or_else(|| Error::LeaderNotAvailable {
- message: format!("No leader found for table bucket:
{table_bucket}"),
- })?;
+ let leader = self
+ .metadata
+ .leader_for(self.table_path.as_ref(), &table_bucket)
+ .await?
+ .ok_or_else(|| Error::LeaderNotAvailable {
+ message: format!("No leader found for table bucket:
{table_bucket}"),
+ })?;
// Get connection to the tablet server
let tablet_server =
@@ -223,7 +270,7 @@ impl<'a> Lookuper<'a> {
let connection = connections.get_connection(tablet_server).await?;
// Send lookup request
- let request = LookupRequest::new(table_id, None, bucket_id,
vec![key_bytes]);
+ let request = LookupRequest::new(table_id, partition_id, bucket_id,
vec![pk_bytes_vec]);
let response = connection.request(request).await?;
// Extract the values from response
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 2dc56d5..2fbbbc9 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -69,7 +69,7 @@ impl<'a> FlussTable<'a> {
pub fn new_append(&self) -> Result<TableAppend> {
Ok(TableAppend::new(
self.table_path.clone(),
- self.table_info.clone(),
+ Arc::new(self.table_info.clone()),
self.conn.get_or_create_writer_client()?,
))
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 356ba1c..14d2841 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -630,7 +630,7 @@ impl LogFetcher {
if self.is_partitioned {
// Fallback to full table metadata refresh until partition-aware
updates are available.
self.metadata
- .update_tables_metadata(&HashSet::from([&self.table_path]))
+ .update_tables_metadata(&HashSet::from([&self.table_path]),
&HashSet::new(), vec![])
.await
.or_else(|e| {
if let Error::RpcError { source, .. } = &e
@@ -649,7 +649,7 @@ impl LogFetcher {
// TODO: Handle PartitionNotExist error
self.metadata
- .update_tables_metadata(&HashSet::from([&self.table_path]))
+ .update_tables_metadata(&HashSet::from([&self.table_path]),
&HashSet::new(), vec![])
.await
.or_else(|e| {
if let Error::RpcError { source, .. } = &e
@@ -799,8 +799,9 @@ impl LogFetcher {
let table_id = table_bucket.table_id();
let cluster = metadata.get_cluster();
if let Some(table_path) =
cluster.get_table_path_by_id(table_id) {
- let physical_tables =
-
HashSet::from([PhysicalTablePath::of(table_path.clone())]);
+ let physical_tables =
HashSet::from([PhysicalTablePath::of(Arc::new(
+ table_path.clone(),
+ ))]);
metadata.invalidate_physical_table_meta(&physical_tables);
} else {
warn!(
@@ -1498,7 +1499,7 @@ mod tests {
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
- use crate::metadata::{TableInfo, TablePath};
+ use crate::metadata::{PhysicalTablePath, TableInfo, TablePath};
use crate::record::MemoryLogRecordsArrowBuilder;
use crate::row::{Datum, GenericRow};
use crate::rpc::FlussError;
@@ -1514,8 +1515,10 @@ mod tests {
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
},
)?;
+ let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
let record = WriteRecord::for_append(
- table_path,
+ Arc::new(table_info.clone()),
+ physical_table_path,
1,
GenericRow {
values: vec![Datum::Int32(1)],
@@ -1684,7 +1687,7 @@ mod tests {
)?;
let bucket = TableBucket::new(1, 0);
- assert!(metadata.leader_for(&bucket).is_some());
+ assert!(metadata.leader_for(&table_path, &bucket).await?.is_some());
let response = crate::proto::FetchLogResponse {
tables_resp: vec![crate::proto::PbFetchLogRespForTable {
@@ -1713,7 +1716,7 @@ mod tests {
LogFetcher::handle_fetch_response(response, response_context).await;
- assert!(metadata.leader_for(&bucket).is_none());
+ assert!(metadata.get_cluster().leader_for(&bucket).is_none());
Ok(())
}
}
diff --git a/crates/fluss/src/client/table/upsert.rs
b/crates/fluss/src/client/table/upsert.rs
index 984592d..269d525 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -19,12 +19,13 @@ use crate::client::table::writer::{DeleteResult,
TableWriter, UpsertResult, Upse
use crate::client::{RowBytes, WriteFormat, WriteRecord, WriterClient};
use crate::error::Error::IllegalArgument;
use crate::error::Result;
-use crate::metadata::{KvFormat, RowType, TableInfo, TablePath};
+use crate::metadata::{PhysicalTablePath, RowType, TableInfo, TablePath};
use crate::row::InternalRow;
use crate::row::encode::{KeyEncoder, KeyEncoderFactory, RowEncoder,
RowEncoderFactory};
use crate::row::field_getter::FieldGetter;
use std::sync::Arc;
+use crate::client::table::partition_getter::PartitionGetter;
use bitvec::prelude::bitvec;
use bytes::Bytes;
@@ -107,30 +108,25 @@ impl TableUpsert {
}
}
-#[allow(dead_code)]
struct UpsertWriterImpl<RE>
where
RE: RowEncoder,
{
table_path: Arc<TablePath>,
writer_client: Arc<WriterClient>,
- // TODO: Partitioning
- // partition_field_getter: Option<Box<dyn KeyEncoder>>,
+ partition_field_getter: Option<PartitionGetter>,
primary_key_encoder: Box<dyn KeyEncoder>,
target_columns: Option<Arc<Vec<usize>>>,
// Use primary key encoder as bucket key encoder when None
bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
- kv_format: KvFormat,
write_format: WriteFormat,
row_encoder: RE,
field_getters: Box<[FieldGetter]>,
table_info: Arc<TableInfo>,
}
-#[allow(dead_code)]
struct UpsertWriterFactory;
-#[allow(dead_code)]
impl UpsertWriterFactory {
pub fn create(
table_path: Arc<TablePath>,
@@ -168,13 +164,22 @@ impl UpsertWriterFactory {
let field_getters = FieldGetter::create_field_getters(row_type);
+ let partition_field_getter = if table_info.is_partitioned() {
+ Some(PartitionGetter::new(
+ row_type,
+ Arc::clone(table_info.get_partition_keys()),
+ )?)
+ } else {
+ None
+ };
+
Ok(UpsertWriterImpl {
table_path,
+ partition_field_getter,
writer_client,
primary_key_encoder,
target_columns: partial_update_columns,
bucket_key_encoder,
- kv_format: kv_format.clone(),
write_format,
row_encoder: RowEncoderFactory::create(kv_format,
row_type.clone())?,
field_getters,
@@ -311,6 +316,18 @@ impl<RE: RowEncoder> UpsertWriterImpl<RE> {
}
self.row_encoder.finish_row()
}
+
+ fn get_physical_path<R: InternalRow>(&self, row: &R) ->
Result<PhysicalTablePath> {
+ if let Some(partition_getter) = &self.partition_field_getter {
+ let partition = partition_getter.get_partition(row);
+ Ok(PhysicalTablePath::of_partitioned(
+ Arc::clone(&self.table_path),
+ Some(partition?),
+ ))
+ } else {
+ Ok(PhysicalTablePath::of(Arc::clone(&self.table_path)))
+ }
+ }
}
impl<RE: RowEncoder> TableWriter for UpsertWriterImpl<RE> {
@@ -343,7 +360,8 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
};
let write_record = WriteRecord::for_upsert(
- Arc::clone(&self.table_path),
+ Arc::clone(&self.table_info),
+ Arc::new(self.get_physical_path(row)?),
self.table_info.schema_id,
key,
bucket_key,
@@ -372,7 +390,8 @@ impl<RE: RowEncoder> UpsertWriter for UpsertWriterImpl<RE> {
let (key, bucket_key) = self.get_keys(row)?;
let write_record = WriteRecord::for_upsert(
- Arc::clone(&self.table_path),
+ Arc::clone(&self.table_info),
+ Arc::new(self.get_physical_path(row)?),
self.table_info.schema_id,
key,
bucket_key,
diff --git a/crates/fluss/src/client/table/writer.rs
b/crates/fluss/src/client/table/writer.rs
index 8276545..ec26ec6 100644
--- a/crates/fluss/src/client/table/writer.rs
+++ b/crates/fluss/src/client/table/writer.rs
@@ -15,12 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use crate::client::{WriteRecord, WriterClient};
use crate::row::{GenericRow, InternalRow};
-use std::sync::Arc;
use crate::error::Result;
-use crate::metadata::{TableInfo, TablePath};
#[allow(dead_code, async_fn_in_trait)]
pub trait TableWriter {
@@ -47,55 +44,3 @@ pub struct UpsertResult;
/// Currently this is an empty struct to allow for compatible evolution in the
future
#[derive(Default)]
pub struct DeleteResult;
-
-#[allow(dead_code)]
-pub struct AbstractTableWriter {
- table_path: Arc<TablePath>,
- writer_client: Arc<WriterClient>,
- field_count: i32,
- schema_id: i32,
-}
-
-#[allow(dead_code)]
-impl AbstractTableWriter {
- pub fn new(
- table_path: TablePath,
- table_info: &TableInfo,
- writer_client: Arc<WriterClient>,
- ) -> Self {
- // todo: partition
- Self {
- table_path: Arc::new(table_path),
- writer_client,
- field_count: table_info.row_type().fields().len() as i32,
- schema_id: table_info.schema_id,
- }
- }
-
- pub async fn send(&self, write_record: &WriteRecord<'_>) -> Result<()> {
- let result_handle = self.writer_client.send(write_record).await?;
- let result = result_handle.wait().await?;
- result_handle.result(result)
- }
-}
-
-impl TableWriter for AbstractTableWriter {
- async fn flush(&self) -> Result<()> {
- todo!()
- }
-}
-
-// Append writer implementation
-#[allow(dead_code)]
-pub struct AppendWriterImpl {
- base: AbstractTableWriter,
-}
-
-#[allow(dead_code)]
-impl AppendWriterImpl {
- pub async fn append(&self, row: GenericRow<'_>) -> Result<()> {
- let record =
- WriteRecord::for_append(self.base.table_path.clone(),
self.base.schema_id, row);
- self.base.send(&record).await
- }
-}
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index a5b9832..2a45517 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -21,7 +21,7 @@ use crate::client::{LogWriteRecord, Record, ResultHandle,
WriteRecord};
use crate::cluster::{BucketLocation, Cluster, ServerNode};
use crate::config::Config;
use crate::error::Result;
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket};
use crate::util::current_time_ms;
use crate::{BucketId, PartitionId, TableId};
use dashmap::DashMap;
@@ -37,7 +37,7 @@ type BucketBatches = Vec<(BucketId,
Arc<Mutex<VecDeque<WriteBatch>>>)>;
#[allow(dead_code)]
pub struct RecordAccumulator {
config: Config,
- write_batches: DashMap<TablePath, BucketAndWriteBatches>,
+ write_batches: DashMap<Arc<PhysicalTablePath>, BucketAndWriteBatches>,
// batch_id -> complete callback
incomplete_batches: RwLock<HashMap<i64, ResultHandle>>,
batch_timeout_ms: i64,
@@ -88,14 +88,14 @@ impl RecordAccumulator {
&self,
cluster: &Cluster,
record: &WriteRecord,
- bucket_id: BucketId,
dq: &mut VecDeque<WriteBatch>,
) -> Result<RecordAppendResult> {
if let Some(append_result) = self.try_append(record, dq)? {
return Ok(append_result);
}
- let table_path = &record.table_path;
+ let physical_table_path = &record.physical_table_path;
+ let table_path = physical_table_path.get_table_path();
let table_info = cluster.get_table(table_path)?;
let arrow_compression_info =
table_info.get_table_config().get_arrow_compression_info()?;
let row_type = &table_info.row_type;
@@ -105,22 +105,20 @@ impl RecordAccumulator {
let mut batch: WriteBatch = match record.record() {
Record::Log(_) => ArrowLog(ArrowLogWriteBatch::new(
self.batch_id.fetch_add(1, Ordering::Relaxed),
- table_path.as_ref().clone(),
+ Arc::clone(physical_table_path),
schema_id,
arrow_compression_info,
row_type,
- bucket_id,
current_time_ms(),
matches!(&record.record,
Record::Log(LogWriteRecord::RecordBatch(_))),
)?),
Record::Kv(kv_record) => Kv(KvWriteBatch::new(
self.batch_id.fetch_add(1, Ordering::Relaxed),
- table_path.as_ref().clone(),
+ Arc::clone(physical_table_path),
schema_id,
// TODO: Decide how to derive write limit in the absence of
java's equivalent of PreAllocatedPagedOutputView
KvWriteBatch::DEFAULT_WRITE_LIMIT,
record.write_format.to_kv_format()?,
- bucket_id,
kv_record.target_columns.clone(),
current_time_ms(),
)),
@@ -153,18 +151,25 @@ impl RecordAccumulator {
cluster: &Cluster,
abort_if_batch_full: bool,
) -> Result<RecordAppendResult> {
- let table_path = &record.table_path;
+ let physical_table_path = &record.physical_table_path;
+ let table_path = physical_table_path.get_table_path();
+ let table_info = cluster.get_table(table_path)?;
+ let is_partitioned_table = table_info.is_partitioned();
- // TODO: Implement partitioning
+ let partition_id = if is_partitioned_table {
+ cluster.get_partition_id(physical_table_path)
+ } else {
+ None
+ };
let dq = {
let mut binding = self
.write_batches
- .entry(table_path.as_ref().clone())
+ .entry(Arc::clone(physical_table_path))
.or_insert_with(|| BucketAndWriteBatches {
- table_id: 0,
- is_partitioned_table: false,
- partition_id: None,
+ table_id: table_info.table_id,
+ is_partitioned_table,
+ partition_id,
batches: Default::default(),
});
let bucket_and_batches = binding.value_mut();
@@ -185,23 +190,24 @@ impl RecordAccumulator {
true, false, true,
));
}
- self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
+ self.append_new_batch(cluster, record, &mut dq_guard)
}
pub async fn ready(&self, cluster: &Arc<Cluster>) ->
Result<ReadyCheckResult> {
// Snapshot just the Arcs we need, avoiding cloning the entire
BucketAndWriteBatches struct
- let entries: Vec<(TablePath, BucketBatches)> = self
+ let entries: Vec<(Arc<PhysicalTablePath>, Option<PartitionId>,
BucketBatches)> = self
.write_batches
.iter()
.map(|entry| {
- let table_path = entry.key().clone();
+ let physical_table_path = Arc::clone(entry.key());
+ let partition_id = entry.value().partition_id;
let bucket_batches: Vec<_> = entry
.value()
.batches
.iter()
.map(|(bucket_id, batch_arc)| (*bucket_id,
batch_arc.clone()))
.collect();
- (table_path, bucket_batches)
+ (physical_table_path, partition_id, bucket_batches)
})
.collect();
@@ -209,10 +215,12 @@ impl RecordAccumulator {
let mut next_ready_check_delay_ms = self.batch_timeout_ms;
let mut unknown_leader_tables = HashSet::new();
- for (table_path, bucket_batches) in entries {
+ for (physical_table_path, mut partition_id, bucket_batches) in entries
{
next_ready_check_delay_ms = self
.bucket_ready(
- &table_path,
+ &physical_table_path,
+ physical_table_path.get_partition_name().is_some(),
+ &mut partition_id,
bucket_batches,
&mut ready_nodes,
&mut unknown_leader_tables,
@@ -229,17 +237,42 @@ impl RecordAccumulator {
})
}
+ #[allow(clippy::too_many_arguments)]
async fn bucket_ready(
&self,
- table_path: &TablePath,
+ physical_table_path: &Arc<PhysicalTablePath>,
+ is_partitioned_table: bool,
+ partition_id: &mut Option<PartitionId>,
bucket_batches: BucketBatches,
ready_nodes: &mut HashSet<ServerNode>,
- unknown_leader_tables: &mut HashSet<TablePath>,
+ unknown_leader_tables: &mut HashSet<Arc<PhysicalTablePath>>,
cluster: &Cluster,
next_ready_check_delay_ms: i64,
) -> Result<i64> {
let mut next_delay = next_ready_check_delay_ms;
+ // First check this table has partitionId.
+ if is_partitioned_table && partition_id.is_none() {
+ let partition_id = cluster.get_partition_id(physical_table_path);
+
+ if partition_id.is_some() {
+ // Update the cached partition_id
+ if let Some(mut entry) =
self.write_batches.get_mut(physical_table_path) {
+ entry.partition_id = partition_id;
+ }
+ } else {
+ log::debug!(
+ "Partition does not exist for {}, bucket will not be set
to ready",
+ physical_table_path.as_ref()
+ );
+
+ // TODO: we shouldn't add unready partitions to
unknownLeaderTables,
+ // because it cases PartitionNotExistException later
+ unknown_leader_tables.insert(Arc::clone(physical_table_path));
+ return Ok(next_delay);
+ }
+ }
+
for (bucket_id, batch) in bucket_batches {
let batch_guard = batch.lock().await;
if batch_guard.is_empty() {
@@ -250,12 +283,12 @@ impl RecordAccumulator {
let waited_time_ms = batch.waited_time_ms(current_time_ms());
let deque_size = batch_guard.len();
let full = deque_size > 1 || batch.is_closed();
- let table_bucket = cluster.get_table_bucket(table_path,
bucket_id)?;
+ let table_bucket = cluster.get_table_bucket(physical_table_path,
bucket_id)?;
if let Some(leader) = cluster.leader_for(&table_bucket) {
next_delay =
self.batch_ready(leader, waited_time_ms, full,
ready_nodes, next_delay);
} else {
- unknown_leader_tables.insert(table_path.clone());
+ unknown_leader_tables.insert(Arc::clone(physical_table_path));
}
}
Ok(next_delay)
@@ -332,14 +365,14 @@ impl RecordAccumulator {
loop {
let bucket = &buckets[current_index];
- let table_path = bucket.table_path.clone();
+ let table_path = bucket.physical_table_path();
let table_bucket = bucket.table_bucket.clone();
last_processed_index = current_index;
current_index = (current_index + 1) % buckets.len();
let deque = self
.write_batches
- .get(&table_path)
+ .get(table_path)
.and_then(|bucket_and_write_batches| {
bucket_and_write_batches
.batches
@@ -399,20 +432,22 @@ impl RecordAccumulator {
pub async fn re_enqueue(&self, ready_write_batch: ReadyWriteBatch) {
ready_write_batch.write_batch.re_enqueued();
- let table_path = ready_write_batch.write_batch.table_path().clone();
+ let physical_table_path =
ready_write_batch.write_batch.physical_table_path();
let bucket_id = ready_write_batch.table_bucket.bucket_id();
let table_id = ready_write_batch.table_bucket.table_id();
+ let partition_id = ready_write_batch.table_bucket.partition_id();
+ let is_partitioned_table = partition_id.is_some();
let dq = {
- let mut binding =
- self.write_batches
- .entry(table_path)
- .or_insert_with(|| BucketAndWriteBatches {
- table_id,
- is_partitioned_table: false,
- partition_id: None,
- batches: Default::default(),
- });
+ let mut binding = self
+ .write_batches
+ .entry(Arc::clone(physical_table_path))
+ .or_insert_with(|| BucketAndWriteBatches {
+ table_id,
+ is_partitioned_table,
+ partition_id,
+ batches: Default::default(),
+ });
let bucket_and_batches = binding.value_mut();
bucket_and_batches
.batches
@@ -478,6 +513,12 @@ pub struct ReadyWriteBatch {
pub write_batch: WriteBatch,
}
+impl ReadyWriteBatch {
+ pub fn write_batch(&self) -> &WriteBatch {
+ &self.write_batch
+ }
+}
+
#[allow(dead_code)]
struct BucketAndWriteBatches {
table_id: TableId,
@@ -525,14 +566,14 @@ impl RecordAppendResult {
pub struct ReadyCheckResult {
pub ready_nodes: HashSet<ServerNode>,
pub next_ready_check_delay_ms: i64,
- pub unknown_leader_tables: HashSet<TablePath>,
+ pub unknown_leader_tables: HashSet<Arc<PhysicalTablePath>>,
}
impl ReadyCheckResult {
pub fn new(
ready_nodes: HashSet<ServerNode>,
next_ready_check_delay_ms: i64,
- unknown_leader_tables: HashSet<TablePath>,
+ unknown_leader_tables: HashSet<Arc<PhysicalTablePath>>,
) -> Self {
ReadyCheckResult {
ready_nodes,
@@ -547,17 +588,20 @@ mod tests {
use super::*;
use crate::metadata::TablePath;
use crate::row::{Datum, GenericRow};
- use crate::test_utils::build_cluster;
+ use crate::test_utils::{build_cluster, build_table_info};
use std::sync::Arc;
#[tokio::test]
async fn re_enqueue_increments_attempts() -> Result<()> {
let config = Config::default();
let accumulator = RecordAccumulator::new(config);
- let table_path = Arc::new(TablePath::new("db".to_string(),
"tbl".to_string()));
- let cluster = Arc::new(build_cluster(table_path.as_ref(), 1, 1));
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let physical_table_path =
Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone())));
+ let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+ let cluster = Arc::new(build_cluster(&table_path, 1, 1));
let record = WriteRecord::for_append(
- table_path.clone(),
+ table_info,
+ physical_table_path,
1,
GenericRow {
values: vec![Datum::Int32(1)],
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 41561d4..da30c8a 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use crate::BucketId;
use crate::client::broadcast::{BatchWriteResult, BroadcastOnce};
use crate::client::{Record, ResultHandle, WriteRecord};
use crate::compression::ArrowCompressionInfo;
use crate::error::{Error, Result};
-use crate::metadata::{KvFormat, RowType, TablePath};
+use crate::metadata::{KvFormat, PhysicalTablePath, RowType};
use crate::record::MemoryLogRecordsArrowBuilder;
use crate::record::kv::KvRecordBatchBuilder;
use bytes::Bytes;
@@ -28,12 +27,10 @@ use std::cmp::max;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
-#[allow(dead_code)]
pub struct InnerWriteBatch {
batch_id: i64,
- table_path: TablePath,
+ physical_table_path: Arc<PhysicalTablePath>,
create_ms: i64,
- bucket_id: BucketId,
results: BroadcastOnce<BatchWriteResult>,
completed: AtomicBool,
attempts: AtomicI32,
@@ -41,12 +38,11 @@ pub struct InnerWriteBatch {
}
impl InnerWriteBatch {
- fn new(batch_id: i64, table_path: TablePath, create_ms: i64, bucket_id:
BucketId) -> Self {
+ fn new(batch_id: i64, physical_table_path: Arc<PhysicalTablePath>,
create_ms: i64) -> Self {
InnerWriteBatch {
batch_id,
- table_path,
+ physical_table_path,
create_ms,
- bucket_id,
results: Default::default(),
completed: AtomicBool::new(false),
attempts: AtomicI32::new(0),
@@ -74,8 +70,8 @@ impl InnerWriteBatch {
self.drained_ms = max(self.drained_ms, now_ms);
}
- fn table_path(&self) -> &TablePath {
- &self.table_path
+ fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+ &self.physical_table_path
}
fn attempts(&self) -> i32 {
@@ -165,8 +161,8 @@ impl WriteBatch {
self.inner_batch().batch_id
}
- pub fn table_path(&self) -> &TablePath {
- self.inner_batch().table_path()
+ pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+ self.inner_batch().physical_table_path()
}
pub fn attempts(&self) -> i32 {
@@ -192,15 +188,14 @@ impl ArrowLogWriteBatch {
#[allow(clippy::too_many_arguments)]
pub fn new(
batch_id: i64,
- table_path: TablePath,
+ physical_table_path: Arc<PhysicalTablePath>,
schema_id: i32,
arrow_compression_info: ArrowCompressionInfo,
row_type: &RowType,
- bucket_id: BucketId,
create_ms: i64,
to_append_record_batch: bool,
) -> Result<Self> {
- let base = InnerWriteBatch::new(batch_id, table_path, create_ms,
bucket_id);
+ let base = InnerWriteBatch::new(batch_id, physical_table_path,
create_ms);
Ok(Self {
write_batch: base,
arrow_builder: MemoryLogRecordsArrowBuilder::new(
@@ -273,15 +268,14 @@ impl KvWriteBatch {
#[allow(clippy::too_many_arguments)]
pub fn new(
batch_id: i64,
- table_path: TablePath,
+ physical_table_path: Arc<PhysicalTablePath>,
schema_id: i32,
write_limit: usize,
kv_format: KvFormat,
- bucket_id: BucketId,
target_columns: Option<Arc<Vec<usize>>>,
create_ms: i64,
) -> Self {
- let base = InnerWriteBatch::new(batch_id, table_path, create_ms,
bucket_id);
+ let base = InnerWriteBatch::new(batch_id, physical_table_path,
create_ms);
Self {
write_batch: base,
kv_batch_builder: KvRecordBatchBuilder::new(schema_id,
write_limit, kv_format),
@@ -367,19 +361,22 @@ mod tests {
use super::*;
use crate::client::{RowBytes, WriteFormat};
use crate::metadata::TablePath;
+ use crate::test_utils::build_table_info;
#[test]
fn complete_only_once() {
- let batch =
- InnerWriteBatch::new(1, TablePath::new("db".to_string(),
"tbl".to_string()), 0, 0);
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let physical_path = PhysicalTablePath::of(Arc::new(table_path));
+ let batch = InnerWriteBatch::new(1, Arc::new(physical_path), 0);
assert!(batch.complete(Ok(())));
assert!(!batch.complete(Err(crate::client::broadcast::Error::Dropped)));
}
#[test]
fn attempts_increment_on_reenqueue() {
- let batch =
- InnerWriteBatch::new(1, TablePath::new("db".to_string(),
"tbl".to_string()), 0, 0);
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let physical_path = PhysicalTablePath::of(Arc::new(table_path));
+ let batch = InnerWriteBatch::new(1, Arc::new(physical_path), 0);
assert_eq!(batch.attempts(), 0);
batch.re_enqueued();
assert_eq!(batch.attempts(), 1);
@@ -401,12 +398,14 @@ mod tests {
DataField::new("name".to_string(), DataTypes::string(), None),
]);
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+ let physical_table_path =
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
// Test 1: RowAppendRecordBatchBuilder (to_append_record_batch=false)
{
let mut batch = ArrowLogWriteBatch::new(
1,
- table_path.clone(),
+ Arc::clone(&physical_table_path),
1,
ArrowCompressionInfo {
compression_type: ArrowCompressionType::None,
@@ -414,7 +413,6 @@ mod tests {
},
&row_type,
0,
- 0,
false,
)
.unwrap();
@@ -424,7 +422,12 @@ mod tests {
let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "hello");
- let record =
WriteRecord::for_append(Arc::new(table_path.clone()), 1, row);
+ let record = WriteRecord::for_append(
+ Arc::clone(&table_info),
+ Arc::clone(&physical_table_path),
+ 1,
+ row,
+ );
batch.try_append(&record).unwrap();
}
@@ -446,7 +449,7 @@ mod tests {
{
let mut batch = ArrowLogWriteBatch::new(
1,
- table_path.clone(),
+ physical_table_path.clone(),
1,
ArrowCompressionInfo {
compression_type: ArrowCompressionType::None,
@@ -454,7 +457,6 @@ mod tests {
},
&row_type,
0,
- 0,
true,
)
.unwrap();
@@ -472,8 +474,12 @@ mod tests {
)
.unwrap();
- let record =
-
WriteRecord::for_append_record_batch(Arc::new(table_path.clone()), 1,
record_batch);
+ let record = WriteRecord::for_append_record_batch(
+ Arc::clone(&table_info),
+ Arc::clone(&physical_table_path),
+ 1,
+ record_batch,
+ );
batch.try_append(&record).unwrap();
let estimated_size = batch.estimated_size_in_bytes();
@@ -496,21 +502,23 @@ mod tests {
use crate::metadata::KvFormat;
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+ let physical_path =
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
let mut batch = KvWriteBatch::new(
1,
- table_path.clone(),
+ Arc::clone(&physical_path),
1,
KvWriteBatch::DEFAULT_WRITE_LIMIT,
KvFormat::COMPACTED,
- 0,
None,
0,
);
for _ in 0..200 {
let record = WriteRecord::for_upsert(
- Arc::new(table_path.clone()),
+ Arc::clone(&table_info),
+ Arc::clone(&physical_path),
1,
Bytes::from(vec![1_u8, 2_u8, 3_u8]),
None,
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs
b/crates/fluss/src/client/write/bucket_assigner.rs
index 817101a..7fcd20b 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -19,9 +19,10 @@ use crate::bucketing::BucketingFunction;
use crate::cluster::Cluster;
use crate::error::Error::IllegalArgument;
use crate::error::Result;
-use crate::metadata::TablePath;
+use crate::metadata::PhysicalTablePath;
use bytes::Bytes;
use rand::Rng;
+use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
pub trait BucketAssigner: Sync + Send {
@@ -34,12 +35,12 @@ pub trait BucketAssigner: Sync + Send {
#[derive(Debug)]
pub struct StickyBucketAssigner {
- table_path: TablePath,
+ table_path: Arc<PhysicalTablePath>,
current_bucket_id: AtomicI32,
}
impl StickyBucketAssigner {
- pub fn new(table_path: TablePath) -> Self {
+ pub fn new(table_path: Arc<PhysicalTablePath>) -> Self {
Self {
table_path,
current_bucket_id: AtomicI32::new(-1),
@@ -55,7 +56,7 @@ impl StickyBucketAssigner {
let mut rng = rand::rng();
let mut random: i32 = rng.random();
random &= i32::MAX;
- new_bucket = random %
cluster.get_bucket_count(&self.table_path);
+ new_bucket = random %
cluster.get_bucket_count(self.table_path.get_table_path());
} else if available_buckets.len() == 1 {
new_bucket = available_buckets[0].table_bucket.bucket_id();
} else {
@@ -155,12 +156,15 @@ mod tests {
use crate::cluster::Cluster;
use crate::metadata::TablePath;
use crate::test_utils::build_cluster;
+ use std::sync::Arc;
#[test]
fn sticky_bucket_assigner_picks_available_bucket() {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let cluster = build_cluster(&table_path, 1, 2);
- let assigner = StickyBucketAssigner::new(table_path);
+ let assigner =
StickyBucketAssigner::new(Arc::new(PhysicalTablePath::of(Arc::new(
+ table_path.clone(),
+ ))));
let bucket = assigner.assign_bucket(None, &cluster).expect("bucket");
assert!((0..2).contains(&bucket));
@@ -174,7 +178,7 @@ mod tests {
let assigner = HashBucketAssigner::new(3, <dyn
BucketingFunction>::of(None));
let cluster = Cluster::default();
let err = assigner.assign_bucket(None, &cluster).unwrap_err();
- assert!(matches!(err, crate::error::Error::IllegalArgument { .. }));
+ assert!(matches!(err, IllegalArgument { .. }));
}
#[test]
diff --git a/crates/fluss/src/client/write/mod.rs
b/crates/fluss/src/client/write/mod.rs
index dcc6795..868b582 100644
--- a/crates/fluss/src/client/write/mod.rs
+++ b/crates/fluss/src/client/write/mod.rs
@@ -20,7 +20,8 @@ mod batch;
use crate::client::broadcast::{self as client_broadcast, BatchWriteResult,
BroadcastOnceReceiver};
use crate::error::Error;
-use crate::metadata::TablePath;
+use crate::metadata::{PhysicalTablePath, TableInfo};
+
use crate::row::GenericRow;
pub use accumulator::*;
use arrow::array::RecordBatch;
@@ -40,16 +41,21 @@ pub use writer_client::WriterClient;
#[allow(dead_code)]
pub struct WriteRecord<'a> {
record: Record<'a>,
- table_path: Arc<TablePath>,
+ physical_table_path: Arc<PhysicalTablePath>,
bucket_key: Option<Bytes>,
schema_id: i32,
write_format: WriteFormat,
+ table_info: Arc<TableInfo>,
}
impl<'a> WriteRecord<'a> {
pub fn record(&self) -> &Record<'a> {
&self.record
}
+
+ pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+ &self.physical_table_path
+ }
}
pub enum Record<'a> {
@@ -102,10 +108,16 @@ impl<'a> KvWriteRecord<'a> {
}
impl<'a> WriteRecord<'a> {
- pub fn for_append(table_path: Arc<TablePath>, schema_id: i32, row:
GenericRow<'a>) -> Self {
+ pub fn for_append(
+ table_info: Arc<TableInfo>,
+ physical_table_path: Arc<PhysicalTablePath>,
+ schema_id: i32,
+ row: GenericRow<'a>,
+ ) -> Self {
Self {
+ table_info,
record: Record::Log(LogWriteRecord::Generic(row)),
- table_path,
+ physical_table_path,
bucket_key: None,
schema_id,
write_format: WriteFormat::ArrowLog,
@@ -113,21 +125,25 @@ impl<'a> WriteRecord<'a> {
}
pub fn for_append_record_batch(
- table_path: Arc<TablePath>,
+ table_info: Arc<TableInfo>,
+ physical_table_path: Arc<PhysicalTablePath>,
schema_id: i32,
row: RecordBatch,
) -> Self {
Self {
+ table_info,
record: Record::Log(LogWriteRecord::RecordBatch(Arc::new(row))),
- table_path,
+ physical_table_path,
bucket_key: None,
schema_id,
write_format: WriteFormat::ArrowLog,
}
}
+ #[allow(clippy::too_many_arguments)]
pub fn for_upsert(
- table_path: Arc<TablePath>,
+ table_info: Arc<TableInfo>,
+ physical_table_path: Arc<PhysicalTablePath>,
schema_id: i32,
key: Bytes,
bucket_key: Option<Bytes>,
@@ -136,8 +152,9 @@ impl<'a> WriteRecord<'a> {
row_bytes: Option<RowBytes<'a>>,
) -> Self {
Self {
+ table_info,
record: Record::Kv(KvWriteRecord::new(key, target_columns,
row_bytes)),
- table_path,
+ physical_table_path,
bucket_key,
schema_id,
write_format,
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index 905ef80..6a7dad0 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -15,20 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-use crate::TableId;
use crate::client::broadcast;
use crate::client::metadata::Metadata;
use crate::client::write::batch::WriteBatch;
use crate::client::{ReadyWriteBatch, RecordAccumulator};
use crate::error::Error::UnexpectedError;
use crate::error::{FlussError, Result};
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
use crate::proto::{
PbProduceLogRespForBucket, PbPutKvRespForBucket, ProduceLogResponse,
PutKvResponse,
};
use crate::rpc::ServerConnection;
use crate::rpc::message::{ProduceLogRequest, PutKvRequest};
-use log::warn;
+use crate::{PartitionId, TableId};
+use log::{debug, warn};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -82,9 +82,39 @@ impl Sender {
// Update metadata if needed
if !ready_check_result.unknown_leader_tables.is_empty() {
- self.metadata
-
.update_tables_metadata(&ready_check_result.unknown_leader_tables.iter().collect())
- .await?;
+ let mut table_paths: HashSet<&TablePath> = HashSet::new();
+ let mut physical_table_paths: HashSet<&Arc<PhysicalTablePath>> =
HashSet::new();
+
+ for unknown_paths in
ready_check_result.unknown_leader_tables.iter() {
+ if unknown_paths.get_partition_name().is_some() {
+ physical_table_paths.insert(unknown_paths);
+ } else {
+ table_paths.insert(unknown_paths.get_table_path());
+ }
+ }
+
+ if let Err(e) = self
+ .metadata
+ .update_tables_metadata(&table_paths, &physical_table_paths,
vec![])
+ .await
+ {
+ match &e {
+ crate::error::Error::FlussAPIError { api_error }
+ if api_error.code ==
FlussError::PartitionNotExists.code() =>
+ {
+ warn!(
+ "Partition does not exist during metadata update,
continuing: {}",
+ api_error
+ );
+ }
+ _ => return Err(e),
+ }
+ }
+
+ debug!(
+ "Client update metadata due to unknown leader tables from the
batched records: {:?}",
+ ready_check_result.unknown_leader_tables
+ );
}
if ready_check_result.ready_nodes.is_empty() {
@@ -327,10 +357,15 @@ impl Sender {
response: R,
) -> Result<()> {
let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+ let mut invalid_physical_table_paths: HashSet<Arc<PhysicalTablePath>>
= HashSet::new();
let mut pending_buckets: HashSet<TableBucket> =
request_buckets.iter().cloned().collect();
for bucket_resp in response.buckets_resp() {
- let tb = TableBucket::new(table_id, bucket_resp.bucket_id());
+ let tb = TableBucket::new_with_partition(
+ table_id,
+ bucket_resp.partition_id(),
+ bucket_resp.bucket_id(),
+ );
let Some(ready_batch) = records_by_bucket.remove(&tb) else {
panic!("Missing ready batch for table bucket {tb}");
};
@@ -343,11 +378,13 @@ impl Sender {
.error_message()
.cloned()
.unwrap_or_else(|| error.message().to_string());
- if let Some(table_path) = self
+ if let Some(physical_table_path) = self
.handle_write_batch_error(ready_batch, error, message)
.await?
{
- invalid_metadata_tables.insert(table_path);
+ invalid_metadata_tables
+
.insert(physical_table_path.get_table_path().clone());
+
invalid_physical_table_paths.insert(physical_table_path);
}
}
_ => self.complete_batch(ready_batch),
@@ -356,7 +393,7 @@ impl Sender {
for bucket in pending_buckets {
if let Some(ready_batch) = records_by_bucket.remove(&bucket) {
- if let Some(table_path) = self
+ if let Some(physical_table_path) = self
.handle_write_batch_error(
ready_batch,
FlussError::UnknownServerError,
@@ -364,12 +401,13 @@ impl Sender {
)
.await?
{
- invalid_metadata_tables.insert(table_path);
+
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
+ invalid_physical_table_paths.insert(physical_table_path);
}
}
}
- self.update_metadata_if_needed(invalid_metadata_tables)
+ self.update_metadata_if_needed(invalid_metadata_tables,
invalid_physical_table_paths)
.await;
Ok(())
}
@@ -398,15 +436,18 @@ impl Sender {
message: String,
) -> Result<()> {
let mut invalid_metadata_tables: HashSet<TablePath> = HashSet::new();
+ let mut invalid_physical_table_paths: HashSet<Arc<PhysicalTablePath>>
= HashSet::new();
+
for batch in batches {
- if let Some(table_path) = self
+ if let Some(physical_table_path) = self
.handle_write_batch_error(batch, error, message.clone())
.await?
{
- invalid_metadata_tables.insert(table_path);
+
invalid_metadata_tables.insert(physical_table_path.get_table_path().clone());
+ invalid_physical_table_paths.insert(physical_table_path);
}
}
- self.update_metadata_if_needed(invalid_metadata_tables)
+ self.update_metadata_if_needed(invalid_metadata_tables,
invalid_physical_table_paths)
.await;
Ok(())
}
@@ -432,20 +473,22 @@ impl Sender {
ready_write_batch: ReadyWriteBatch,
error: FlussError,
message: String,
- ) -> Result<Option<TablePath>> {
- let table_path = ready_write_batch.write_batch.table_path().clone();
+ ) -> Result<Option<Arc<PhysicalTablePath>>> {
+ let physical_table_path =
Arc::clone(ready_write_batch.write_batch.physical_table_path());
if self.can_retry(&ready_write_batch, error) {
warn!(
- "Retrying write batch for {table_path} on bucket {} after
error {error:?}: {message}",
+ "Retrying write batch for {} on bucket {} after error
{error:?}: {message}",
+ physical_table_path.as_ref(),
ready_write_batch.table_bucket.bucket_id()
);
self.re_enqueue_batch(ready_write_batch).await;
- return
Ok(Self::is_invalid_metadata_error(error).then_some(table_path));
+ return
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
}
if error == FlussError::DuplicateSequenceException {
warn!(
- "Duplicate sequence for {table_path} on bucket {}: {message}",
+ "Duplicate sequence for {} on bucket {}: {message}",
+ physical_table_path.as_ref(),
ready_write_batch.table_bucket.bucket_id()
);
self.complete_batch(ready_write_batch);
@@ -459,7 +502,7 @@ impl Sender {
message,
},
);
- Ok(Self::is_invalid_metadata_error(error).then_some(table_path))
+
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path))
}
async fn re_enqueue_batch(&self, ready_write_batch: ReadyWriteBatch) {
@@ -484,12 +527,22 @@ impl Sender {
&& Self::is_retriable_error(error)
}
- async fn update_metadata_if_needed(&self, table_paths: HashSet<TablePath>)
{
+ async fn update_metadata_if_needed(
+ &self,
+ table_paths: HashSet<TablePath>,
+ physical_table_path: HashSet<Arc<PhysicalTablePath>>,
+ ) {
if table_paths.is_empty() {
return;
}
let table_path_refs: HashSet<&TablePath> =
table_paths.iter().collect();
- if let Err(e) =
self.metadata.update_tables_metadata(&table_path_refs).await {
+ let physical_table_path_refs: HashSet<&Arc<PhysicalTablePath>> =
+ physical_table_path.iter().collect();
+ if let Err(e) = self
+ .metadata
+ .update_tables_metadata(&table_path_refs,
&physical_table_path_refs, vec![])
+ .await
+ {
warn!("Failed to update metadata after write error: {e:?}");
}
}
@@ -536,6 +589,8 @@ trait BucketResponse {
fn bucket_id(&self) -> i32;
fn error_code(&self) -> Option<i32>;
fn error_message(&self) -> Option<&String>;
+
+ fn partition_id(&self) -> Option<PartitionId>;
}
impl BucketResponse for PbProduceLogRespForBucket {
@@ -548,6 +603,10 @@ impl BucketResponse for PbProduceLogRespForBucket {
fn error_message(&self) -> Option<&String> {
self.error_message.as_ref()
}
+
+ fn partition_id(&self) -> Option<PartitionId> {
+ self.partition_id
+ }
}
impl BucketResponse for PbPutKvRespForBucket {
@@ -560,6 +619,10 @@ impl BucketResponse for PbPutKvRespForBucket {
fn error_message(&self) -> Option<&String> {
self.error_message.as_ref()
}
+
+ fn partition_id(&self) -> Option<PartitionId> {
+ self.partition_id
+ }
}
trait WriteResponse {
@@ -587,11 +650,11 @@ mod tests {
use crate::client::WriteRecord;
use crate::cluster::Cluster;
use crate::config::Config;
- use crate::metadata::TablePath;
+ use crate::metadata::{PhysicalTablePath, TablePath};
use crate::proto::{PbProduceLogRespForBucket, ProduceLogResponse};
use crate::row::{Datum, GenericRow};
use crate::rpc::FlussError;
- use crate::test_utils::build_cluster_arc;
+ use crate::test_utils::{build_cluster_arc, build_table_info};
use std::collections::{HashMap, HashSet};
async fn build_ready_batch(
@@ -599,8 +662,11 @@ mod tests {
cluster: Arc<Cluster>,
table_path: Arc<TablePath>,
) -> Result<(ReadyWriteBatch, crate::client::ResultHandle)> {
+ let table_info =
Arc::new(build_table_info(table_path.as_ref().clone(), 1, 1));
+ let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
let record = WriteRecord::for_append(
- table_path,
+ table_info,
+ physical_table_path,
1,
GenericRow {
values: vec![Datum::Int32(1)],
diff --git a/crates/fluss/src/client/write/writer_client.rs
b/crates/fluss/src/client/write/writer_client.rs
index 65b04f5..c386adf 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -15,20 +15,23 @@
// specific language governing permissions and limitations
// under the License.
+use crate::BucketId;
+use crate::bucketing::BucketingFunction;
use crate::client::metadata::Metadata;
-use crate::client::write::bucket_assigner::{BucketAssigner,
StickyBucketAssigner};
+use crate::client::write::bucket_assigner::{
+ BucketAssigner, HashBucketAssigner, StickyBucketAssigner,
+};
use crate::client::write::sender::Sender;
use crate::client::{RecordAccumulator, ResultHandle, WriteRecord};
use crate::config::Config;
-use crate::metadata::TablePath;
+use crate::error::{Error, Result};
+use crate::metadata::{PhysicalTablePath, TableInfo};
use bytes::Bytes;
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
-use crate::error::{Error, Result};
-
#[allow(dead_code)]
pub struct WriterClient {
config: Config,
@@ -37,7 +40,7 @@ pub struct WriterClient {
shutdown_tx: mpsc::Sender<()>,
sender_join_handle: JoinHandle<()>,
metadata: Arc<Metadata>,
- bucket_assigners: DashMap<TablePath, Arc<Box<dyn BucketAssigner>>>,
+ bucket_assigners: DashMap<Arc<PhysicalTablePath>, Arc<dyn BucketAssigner>>,
}
impl WriterClient {
@@ -89,11 +92,12 @@ impl WriterClient {
}
pub async fn send(&self, record: &WriteRecord<'_>) -> Result<ResultHandle>
{
- let table_path = &record.table_path;
+ let physical_table_path = &record.physical_table_path;
let cluster = self.metadata.get_cluster();
let bucket_key = record.bucket_key.as_ref();
- let (bucket_assigner, bucket_id) = self.assign_bucket(bucket_key,
table_path)?;
+ let (bucket_assigner, bucket_id) =
+ self.assign_bucket(&record.table_info, bucket_key,
physical_table_path)?;
let mut result = self
.accumulate
@@ -118,17 +122,19 @@ impl WriterClient {
}
fn assign_bucket(
&self,
+ table_info: &Arc<TableInfo>,
bucket_key: Option<&Bytes>,
- table_path: &Arc<TablePath>,
- ) -> Result<(Arc<Box<dyn BucketAssigner>>, i32)> {
+ table_path: &Arc<PhysicalTablePath>,
+ ) -> Result<(Arc<dyn BucketAssigner>, BucketId)> {
let cluster = self.metadata.get_cluster();
let bucket_assigner = {
if let Some(assigner) = self.bucket_assigners.get(table_path) {
assigner.clone()
} else {
- let assigner =
Arc::new(Self::create_bucket_assigner(table_path.as_ref()));
+ let assigner =
+ Self::create_bucket_assigner(table_info,
Arc::clone(table_path), bucket_key)?;
self.bucket_assigners
- .insert(table_path.as_ref().clone(), assigner.clone());
+ .insert(Arc::clone(table_path),
Arc::clone(&assigner.clone()));
assigner
}
};
@@ -160,8 +166,21 @@ impl WriterClient {
Ok(())
}
- pub fn create_bucket_assigner(table_path: &TablePath) -> Box<dyn
BucketAssigner> {
- // always sticky
- Box::new(StickyBucketAssigner::new(table_path.clone()))
+ pub fn create_bucket_assigner(
+ table_info: &Arc<TableInfo>,
+ table_path: Arc<PhysicalTablePath>,
+ bucket_key: Option<&Bytes>,
+ ) -> Result<Arc<dyn BucketAssigner>> {
+ if bucket_key.is_some() {
+ let datalake_format =
table_info.get_table_config().get_datalake_format()?;
+ let function = <dyn
BucketingFunction>::of(datalake_format.as_ref());
+ Ok(Arc::new(HashBucketAssigner::new(
+ table_info.num_buckets,
+ function,
+ )))
+ } else {
+ // TODO: Wire up toi use round robin/sticky according to
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
+ Ok(Arc::new(StickyBucketAssigner::new(table_path)))
+ }
}
}
diff --git a/crates/fluss/src/cluster/cluster.rs
b/crates/fluss/src/cluster/cluster.rs
index d6fe0ae..1f950ad 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -15,16 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-use crate::BucketId;
use crate::cluster::{BucketLocation, ServerNode, ServerType};
+use crate::error::Error::PartitionNotExist;
use crate::error::{Error, Result};
use crate::metadata::{
JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo,
TablePath,
};
-use crate::proto::MetadataResponse;
+use crate::proto::{MetadataResponse, PbBucketMetadata};
use crate::rpc::{from_pb_server_node, from_pb_table_path};
+use crate::{BucketId, PartitionId, TableId};
use rand::random_range;
use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
static EMPTY: Vec<BucketLocation> = Vec::new();
@@ -33,27 +35,35 @@ pub struct Cluster {
coordinator_server: Option<ServerNode>,
alive_tablet_servers_by_id: HashMap<i32, ServerNode>,
alive_tablet_servers: Vec<ServerNode>,
- available_locations_by_path: HashMap<TablePath, Vec<BucketLocation>>,
+ available_locations_by_path: HashMap<Arc<PhysicalTablePath>,
Vec<BucketLocation>>,
available_locations_by_bucket: HashMap<TableBucket, BucketLocation>,
- table_id_by_path: HashMap<TablePath, i64>,
- table_path_by_id: HashMap<i64, TablePath>,
+ table_id_by_path: HashMap<TablePath, TableId>,
+ table_path_by_id: HashMap<TableId, TablePath>,
table_info_by_path: HashMap<TablePath, TableInfo>,
+ partitions_id_by_path: HashMap<Arc<PhysicalTablePath>, PartitionId>,
+ partition_name_by_id: HashMap<PartitionId, String>,
}
impl Cluster {
+ #[allow(clippy::too_many_arguments)]
pub fn new(
coordinator_server: Option<ServerNode>,
alive_tablet_servers_by_id: HashMap<i32, ServerNode>,
- available_locations_by_path: HashMap<TablePath, Vec<BucketLocation>>,
+ available_locations_by_path: HashMap<Arc<PhysicalTablePath>,
Vec<BucketLocation>>,
available_locations_by_bucket: HashMap<TableBucket, BucketLocation>,
- table_id_by_path: HashMap<TablePath, i64>,
+ table_id_by_path: HashMap<TablePath, TableId>,
table_info_by_path: HashMap<TablePath, TableInfo>,
+ partitions_id_by_path: HashMap<Arc<PhysicalTablePath>, PartitionId>,
) -> Self {
let alive_tablet_servers =
alive_tablet_servers_by_id.values().cloned().collect();
let table_path_by_id = table_id_by_path
.iter()
.map(|(path, table_id)| (*table_id, path.clone()))
.collect();
+ let partition_name_by_id = partitions_id_by_path
+ .iter()
+ .filter_map(|(path, id)| path.get_partition_name().map(|name|
(*id, name.clone())))
+ .collect();
Cluster {
coordinator_server,
alive_tablet_servers_by_id,
@@ -63,10 +73,12 @@ impl Cluster {
table_id_by_path,
table_path_by_id,
table_info_by_path,
+ partitions_id_by_path,
+ partition_name_by_id,
}
}
- pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) ->
Self {
+ pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<TableId>)
-> Self {
let alive_tablet_servers_by_id = self
.alive_tablet_servers_by_id
.iter()
@@ -89,6 +101,7 @@ impl Cluster {
available_locations_by_bucket,
self.table_id_by_path.clone(),
self.table_info_by_path.clone(),
+ self.partitions_id_by_path.clone(),
)
}
@@ -110,6 +123,7 @@ impl Cluster {
available_locations_by_bucket,
self.table_id_by_path.clone(),
self.table_info_by_path.clone(),
+ self.partitions_id_by_path.clone(),
)
}
@@ -123,6 +137,8 @@ impl Cluster {
table_id_by_path,
table_path_by_id,
table_info_by_path,
+ partitions_id_by_path,
+ partition_name_by_id,
} = cluster;
self.coordinator_server = coordinator_server;
self.alive_tablet_servers_by_id = alive_tablet_servers_by_id;
@@ -132,26 +148,30 @@ impl Cluster {
self.table_id_by_path = table_id_by_path;
self.table_path_by_id = table_path_by_id;
self.table_info_by_path = table_info_by_path;
+ self.partitions_id_by_path = partitions_id_by_path;
+ self.partition_name_by_id = partition_name_by_id;
}
fn filter_bucket_locations_by_path(
&self,
table_paths: &HashSet<&TablePath>,
) -> (
- HashMap<TablePath, Vec<BucketLocation>>,
+ HashMap<Arc<PhysicalTablePath>, Vec<BucketLocation>>,
HashMap<TableBucket, BucketLocation>,
) {
let available_locations_by_path = self
.available_locations_by_path
.iter()
- .filter(|&(path, _)| !table_paths.contains(path))
+ .filter(|&(path, _)| !table_paths.contains(path.get_table_path()))
.map(|(path, locations)| (path.clone(), locations.clone()))
.collect();
let available_locations_by_bucket = self
.available_locations_by_bucket
.iter()
- .filter(|&(_bucket, location)|
!table_paths.contains(&location.table_path))
+ .filter(|&(_bucket, location)| {
+
!table_paths.contains(&location.physical_table_path.get_table_path())
+ })
.map(|(bucket, location)| (bucket.clone(), location.clone()))
.collect();
@@ -175,15 +195,19 @@ impl Cluster {
let mut table_id_by_path = HashMap::new();
let mut table_info_by_path = HashMap::new();
+ let mut partitions_id_by_path = HashMap::new();
+ let mut tmp_available_locations_by_path = HashMap::new();
+ let mut tmp_available_location_by_bucket = HashMap::new();
+
if let Some(origin) = origin_cluster {
table_info_by_path.extend(origin.get_table_info_by_path().clone());
table_id_by_path.extend(origin.get_table_id_by_path().clone());
+ partitions_id_by_path.extend(origin.partitions_id_by_path.clone());
+
tmp_available_locations_by_path.extend(origin.available_locations_by_path.clone());
+
tmp_available_location_by_bucket.extend(origin.available_locations_by_bucket.clone());
}
- // Index the bucket locations by table path, and index bucket location
by bucket
- let mut tmp_available_location_by_bucket = HashMap::new();
- let mut tmp_available_locations_by_path = HashMap::new();
-
+ // iterate all table metadata
for table_metadata in metadata_response.table_metadata {
let table_id = table_metadata.table_id;
let table_path = from_pb_table_path(&table_metadata.table_path);
@@ -207,39 +231,56 @@ impl Cluster {
table_info_by_path.insert(table_path.clone(), table_info);
table_id_by_path.insert(table_path.clone(), table_id);
- // now, get bucket matadata
- let mut found_unavailable_bucket = false;
- let mut available_bucket_for_table = vec![];
- let mut bucket_for_table = vec![];
- for bucket_metadata in table_metadata.bucket_metadata {
- let bucket_id = bucket_metadata.bucket_id;
- let bucket = TableBucket::new(table_id, bucket_id);
- let bucket_location;
- if let Some(leader_id) = bucket_metadata.leader_id
- && let Some(server_node) = servers.get(&leader_id)
- {
- bucket_location = BucketLocation::new(
- bucket.clone(),
- Some(server_node.clone()),
- table_path.clone(),
- );
- available_bucket_for_table.push(bucket_location.clone());
- tmp_available_location_by_bucket
- .insert(bucket.clone(), bucket_location.clone());
- } else {
- found_unavailable_bucket = true;
- bucket_location = BucketLocation::new(bucket.clone(),
None, table_path.clone());
- }
- bucket_for_table.push(bucket_location.clone());
+ let bucket_metadata = table_metadata.bucket_metadata;
+ let physical_table_path =
Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone())));
+
+ let bucket_locations = get_bucket_locations(
+ &mut servers,
+ bucket_metadata.as_slice(),
+ table_id,
+ None,
+ &physical_table_path,
+ );
+ tmp_available_locations_by_path.insert(physical_table_path,
bucket_locations);
+ }
+
+ // iterate all partition metadata
+ for partition_metadata in metadata_response.partition_metadata {
+ let table_id = partition_metadata.table_id;
+
+ if let Some(cluster) = origin_cluster {
+ let partition_name = partition_metadata.partition_name;
+ let table_path =
cluster.get_table_path_by_id(table_id).unwrap();
+ let partition_id = partition_metadata.partition_id;
+
+ let physical_table_path =
Arc::new(PhysicalTablePath::of_partitioned(
+ Arc::new(table_path.clone()),
+ Some(partition_name),
+ ));
+
+ partitions_id_by_path.insert(Arc::clone(&physical_table_path),
partition_id);
+
+ let bucket_locations = get_bucket_locations(
+ &mut servers,
+ partition_metadata.bucket_metadata.as_slice(),
+ table_id,
+ Some(partition_id),
+ &physical_table_path,
+ );
+
+ tmp_available_locations_by_path.insert(physical_table_path,
bucket_locations);
}
+ }
- if found_unavailable_bucket {
- tmp_available_locations_by_path
- .insert(table_path.clone(),
available_bucket_for_table.clone());
- } else {
- tmp_available_locations_by_path.insert(table_path.clone(),
bucket_for_table);
+ for bucket_locations in &mut tmp_available_locations_by_path.values() {
+ for location in bucket_locations {
+ if location.leader().is_some() {
+ tmp_available_location_by_bucket
+ .insert(location.table_bucket.clone(),
location.clone());
+ }
}
}
+
Ok(Cluster::new(
coordinator_server,
servers,
@@ -247,6 +288,7 @@ impl Cluster {
tmp_available_location_by_bucket,
table_id_by_path,
table_info_by_path,
+ partitions_id_by_path,
))
}
@@ -269,14 +311,43 @@ impl Cluster {
pub fn get_table_bucket(
&self,
- table_path: &TablePath,
+ physical_table_path: &PhysicalTablePath,
bucket_id: BucketId,
) -> Result<TableBucket> {
- let table_info = self.get_table(table_path)?;
- Ok(TableBucket::new(table_info.table_id, bucket_id))
+ let table_info = self.get_table(physical_table_path.get_table_path())?;
+ let partition_id = self.get_partition_id(physical_table_path);
+
+ if physical_table_path.get_partition_name().is_some() &&
partition_id.is_none() {
+ return Err(PartitionNotExist {
+ message: format!(
+ "The partition {} is not found in cluster",
+ physical_table_path.get_partition_name().unwrap()
+ ),
+ });
+ }
+
+ Ok(TableBucket::new_with_partition(
+ table_info.table_id,
+ partition_id,
+ bucket_id,
+ ))
}
- pub fn get_bucket_locations_by_path(&self) -> &HashMap<TablePath,
Vec<BucketLocation>> {
+ pub fn get_partition_id(&self, physical_table_path: &PhysicalTablePath) ->
Option<PartitionId> {
+ self.partitions_id_by_path.get(physical_table_path).copied()
+ }
+
+ pub fn get_partition_name(&self, partition_id: PartitionId) ->
Option<&String> {
+ self.partition_name_by_id.get(&partition_id)
+ }
+
+ pub fn get_table_id(&self, table_path: &TablePath) -> Option<i64> {
+ self.table_id_by_path.get(table_path).copied()
+ }
+
+ pub fn get_bucket_locations_by_path(
+ &self,
+ ) -> &HashMap<Arc<PhysicalTablePath>, Vec<BucketLocation>> {
&self.available_locations_by_path
}
@@ -288,13 +359,13 @@ impl Cluster {
&self.table_id_by_path
}
- pub fn get_table_path_by_id(&self, table_id: i64) -> Option<&TablePath> {
+ pub fn get_table_path_by_id(&self, table_id: TableId) ->
Option<&TablePath> {
self.table_path_by_id.get(&table_id)
}
pub fn get_available_buckets_for_table_path(
&self,
- table_path: &TablePath,
+ table_path: &PhysicalTablePath,
) -> &Vec<BucketLocation> {
self.available_locations_by_path
.get(table_path)
@@ -327,4 +398,37 @@ impl Cluster {
pub fn opt_get_table(&self, table_path: &TablePath) -> Option<&TableInfo> {
self.table_info_by_path.get(table_path)
}
+
+ pub fn get_partition_id_by_path(&self) -> &HashMap<Arc<PhysicalTablePath>,
PartitionId> {
+ &self.partitions_id_by_path
+ }
+}
+
+fn get_bucket_locations(
+ servers: &mut HashMap<i32, ServerNode>,
+ bucket_metadata: &[PbBucketMetadata],
+ table_id: i64,
+ partition_id: Option<PartitionId>,
+ physical_table_path: &Arc<PhysicalTablePath>,
+) -> Vec<BucketLocation> {
+ let mut bucket_locations = Vec::new();
+ for metadata in bucket_metadata {
+ let bucket_id = metadata.bucket_id;
+ let bucket = TableBucket::new_with_partition(table_id, partition_id,
bucket_id);
+
+ let server = if let Some(leader_id) = metadata.leader_id
+ && let Some(server_node) = servers.get(&leader_id)
+ {
+ Some(server_node.clone())
+ } else {
+ None
+ };
+
+ bucket_locations.push(BucketLocation::new(
+ bucket.clone(),
+ server,
+ Arc::clone(physical_table_path),
+ ));
+ }
+ bucket_locations
}
diff --git a/crates/fluss/src/cluster/mod.rs b/crates/fluss/src/cluster/mod.rs
index f9d42e4..58e80c0 100644
--- a/crates/fluss/src/cluster/mod.rs
+++ b/crates/fluss/src/cluster/mod.rs
@@ -16,7 +16,8 @@
// under the License.
use crate::BucketId;
-use crate::metadata::{TableBucket, TablePath};
+use crate::metadata::{PhysicalTablePath, TableBucket};
+use std::sync::Arc;
#[allow(clippy::module_inception)]
mod cluster;
@@ -69,19 +70,19 @@ pub enum ServerType {
pub struct BucketLocation {
pub table_bucket: TableBucket,
leader: Option<ServerNode>,
- pub table_path: TablePath,
+ physical_table_path: Arc<PhysicalTablePath>,
}
impl BucketLocation {
pub fn new(
table_bucket: TableBucket,
leader: Option<ServerNode>,
- table_path: TablePath,
+ physical_table_path: Arc<PhysicalTablePath>,
) -> BucketLocation {
BucketLocation {
table_bucket,
leader,
- table_path,
+ physical_table_path,
}
}
@@ -96,4 +97,8 @@ impl BucketLocation {
pub fn bucket_id(&self) -> BucketId {
self.table_bucket.bucket_id()
}
+
+ pub fn physical_table_path(&self) -> &Arc<PhysicalTablePath> {
+ &self.physical_table_path
+ }
}
diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs
index 68426d7..ef86530 100644
--- a/crates/fluss/src/error.rs
+++ b/crates/fluss/src/error.rs
@@ -99,6 +99,12 @@ pub enum Error {
)]
InvalidPartition { message: String },
+ #[snafu(
+ visibility(pub(crate)),
+ display("Fluss hitting partition not exist error {}.", message)
+ )]
+ PartitionNotExist { message: String },
+
#[snafu(
visibility(pub(crate)),
display("Fluss hitting IO not supported error {}.", message)
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index e365237..6431d3a 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -930,6 +930,15 @@ impl RowType {
self.fields.iter().map(|f| f.name.as_str()).collect()
}
+ pub fn project_with_field_names(&self, field_names: &[String]) ->
Result<RowType> {
+ let indices: Vec<usize> = field_names
+ .iter()
+ .filter_map(|pk| self.get_field_index(pk))
+ .collect();
+
+ self.project(indices.as_slice())
+ }
+
pub fn project(&self, project_field_positions: &[usize]) ->
Result<RowType> {
Ok(RowType::with_nullable(
self.nullable,
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 7b93aca..ce362c4 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -25,6 +25,7 @@ use core::fmt;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
+use std::sync::Arc;
use strum_macros::EnumString;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -172,7 +173,7 @@ impl SchemaBuilder {
self
}
_ => {
- panic!("data type msut be row type")
+ panic!("data type must be row type")
}
}
}
@@ -325,7 +326,7 @@ pub struct TableDescriptorBuilder {
schema: Option<Schema>,
properties: HashMap<String, String>,
custom_properties: HashMap<String, String>,
- partition_keys: Vec<String>,
+ partition_keys: Arc<[String]>,
comment: Option<String>,
table_distribution: Option<TableDistribution>,
}
@@ -374,7 +375,7 @@ impl TableDescriptorBuilder {
}
pub fn partitioned_by(mut self, partition_keys: Vec<String>) -> Self {
- self.partition_keys = partition_keys;
+ self.partition_keys = Arc::from(partition_keys);
self
}
@@ -413,7 +414,7 @@ impl TableDescriptorBuilder {
pub struct TableDescriptor {
schema: Schema,
comment: Option<String>,
- partition_keys: Vec<String>,
+ partition_keys: Arc<[String]>,
table_distribution: Option<TableDistribution>,
properties: HashMap<String, String>,
custom_properties: HashMap<String, String>,
@@ -749,19 +750,19 @@ impl TablePath {
/// `partition_name` will be `Some(...)`; otherwise, it will be `None`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PhysicalTablePath {
- table_path: TablePath,
+ table_path: Arc<TablePath>,
partition_name: Option<String>,
}
impl PhysicalTablePath {
- pub fn of(table_path: TablePath) -> Self {
+ pub fn of(table_path: Arc<TablePath>) -> Self {
Self {
table_path,
partition_name: None,
}
}
- pub fn of_partitioned(table_path: TablePath, partition_name:
Option<String>) -> Self {
+ pub fn of_partitioned(table_path: Arc<TablePath>, partition_name:
Option<String>) -> Self {
Self {
table_path,
partition_name,
@@ -774,7 +775,7 @@ impl PhysicalTablePath {
partition_name: Option<String>,
) -> Self {
Self {
- table_path: TablePath::new(database_name, table_name),
+ table_path: Arc::new(TablePath::new(database_name, table_name)),
partition_name,
}
}
@@ -815,7 +816,7 @@ pub struct TableInfo {
pub primary_keys: Vec<String>,
pub physical_primary_keys: Vec<String>,
pub bucket_keys: Vec<String>,
- pub partition_keys: Vec<String>,
+ pub partition_keys: Arc<[String]>,
pub num_buckets: i32,
pub properties: HashMap<String, String>,
pub table_config: TableConfig,
@@ -982,7 +983,7 @@ impl TableInfo {
schema_id: i32,
schema: Schema,
bucket_keys: Vec<String>,
- partition_keys: Vec<String>,
+ partition_keys: Arc<[String]>,
num_buckets: i32,
properties: HashMap<String, String>,
custom_properties: HashMap<String, String>,
@@ -1080,7 +1081,7 @@ impl TableInfo {
.is_auto_partition_enabled()
}
- pub fn get_partition_keys(&self) -> &[String] {
+ pub fn get_partition_keys(&self) -> &Arc<[String]> {
&self.partition_keys
}
@@ -1115,7 +1116,7 @@ impl TableInfo {
pub fn to_table_descriptor(&self) -> Result<TableDescriptor> {
let mut builder = TableDescriptor::builder()
.schema(self.schema.clone())
- .partitioned_by(self.partition_keys.clone())
+ .partitioned_by(self.partition_keys.to_vec())
.distributed_by(Some(self.num_buckets), self.bucket_keys.clone())
.properties(self.properties.clone())
.custom_properties(self.custom_properties.clone());
@@ -1177,6 +1178,18 @@ impl TableBucket {
}
}
+ pub fn new_with_partition(
+ table_id: TableId,
+ partition_id: Option<PartitionId>,
+ bucket: BucketId,
+ ) -> Self {
+ TableBucket {
+ table_id,
+ partition_id,
+ bucket,
+ }
+ }
+
pub fn table_id(&self) -> TableId {
self.table_id
}
@@ -1308,7 +1321,7 @@ mod tests {
1,
schema.clone(),
vec!["id".to_string()],
- vec![], // No partition keys
+ Arc::from(vec![]), // No partition keys
1,
properties.clone(),
HashMap::new(),
@@ -1329,7 +1342,7 @@ mod tests {
1,
schema.clone(),
vec!["id".to_string()],
- vec![], // No partition keys
+ Arc::from(vec![]), // No partition keys
1,
properties.clone(),
HashMap::new(),
@@ -1350,7 +1363,7 @@ mod tests {
1,
schema.clone(),
vec!["id".to_string()],
- vec!["name".to_string()], // Partition keys
+ Arc::from(vec!["name".to_string()]), // Partition keys
1,
properties.clone(),
HashMap::new(),
@@ -1371,7 +1384,7 @@ mod tests {
1,
schema.clone(),
vec!["id".to_string()],
- vec!["name".to_string()], // Partition keys
+ Arc::from(vec!["name".to_string()]), // Partition keys
1,
properties.clone(),
HashMap::new(),
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 6340dc8..726106b 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -1450,6 +1450,7 @@ pub struct MyVec<T>(pub StreamReader<T>);
mod tests {
use super::*;
use crate::metadata::{DataField, DataTypes, RowType};
+ use crate::test_utils::build_table_info;
#[test]
fn test_to_array_type() {
@@ -1932,7 +1933,7 @@ mod tests {
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
- use crate::metadata::TablePath;
+ use crate::metadata::{PhysicalTablePath, TablePath};
use crate::row::GenericRow;
use tempfile::NamedTempFile;
@@ -1941,7 +1942,9 @@ mod tests {
DataField::new("id".to_string(), DataTypes::int(), None),
DataField::new("name".to_string(), DataTypes::string(), None),
]);
- let table_path = Arc::new(TablePath::new("db".to_string(),
"tbl".to_string()));
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
+ let physical_table_path =
Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
let mut builder = MemoryLogRecordsArrowBuilder::new(
1,
@@ -1956,13 +1959,15 @@ mod tests {
let mut row = GenericRow::new(2);
row.set_field(0, 1_i32);
row.set_field(1, "alice");
- let record = WriteRecord::for_append(table_path.clone(), 1, row);
+ let record =
+ WriteRecord::for_append(Arc::clone(&table_info),
physical_table_path.clone(), 1, row);
builder.append(&record)?;
let mut row2 = GenericRow::new(2);
row2.set_field(0, 2_i32);
row2.set_field(1, "bob");
- let record2 = WriteRecord::for_append(table_path, 2, row2);
+ let record2 =
+ WriteRecord::for_append(Arc::clone(&table_info),
physical_table_path, 2, row2);
builder.append(&record2)?;
let data = builder.build()?;
diff --git a/crates/fluss/src/rpc/message/update_metadata.rs
b/crates/fluss/src/rpc/message/update_metadata.rs
index a6e6288..1f0d88c 100644
--- a/crates/fluss/src/rpc/message/update_metadata.rs
+++ b/crates/fluss/src/rpc/message/update_metadata.rs
@@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use crate::proto::{MetadataResponse, PbTablePath};
+use crate::metadata::{PhysicalTablePath, TablePath};
+use crate::proto::{MetadataResponse, PbPhysicalTablePath, PbTablePath};
use crate::rpc::api_key::ApiKey;
use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::ReadError;
use crate::rpc::frame::WriteError;
use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
-
-use crate::metadata::TablePath;
-use crate::rpc::frame::ReadError;
+use std::collections::HashSet;
+use std::sync::Arc;
use crate::{impl_read_version_type, impl_write_version_type, proto};
use bytes::{Buf, BufMut};
@@ -33,7 +34,11 @@ pub struct UpdateMetadataRequest {
}
impl UpdateMetadataRequest {
- pub fn new(table_paths: &[&TablePath]) -> Self {
+ pub fn new(
+ table_paths: &HashSet<&TablePath>,
+ physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
+ partition_ids: Vec<i64>,
+ ) -> Self {
UpdateMetadataRequest {
inner_request: proto::MetadataRequest {
table_path: table_paths
@@ -43,8 +48,15 @@ impl UpdateMetadataRequest {
table_name: path.table().to_string(),
})
.collect(),
- partitions_path: vec![],
- partitions_id: vec![],
+ partitions_path: physical_table_paths
+ .iter()
+ .map(|path| PbPhysicalTablePath {
+ database_name: path.get_database_name().to_string(),
+ table_name: path.get_table_name().to_string(),
+ partition_name: path.get_partition_name().map(|pn|
pn.to_string()),
+ })
+ .collect(),
+ partitions_id: partition_ids,
},
}
}
diff --git a/crates/fluss/src/test_utils.rs b/crates/fluss/src/test_utils.rs
index d1cd3ec..8e8fbe4 100644
--- a/crates/fluss/src/test_utils.rs
+++ b/crates/fluss/src/test_utils.rs
@@ -17,7 +17,8 @@
use crate::cluster::{BucketLocation, Cluster, ServerNode, ServerType};
use crate::metadata::{
- DataField, DataTypes, Schema, TableBucket, TableDescriptor, TableInfo,
TablePath,
+ DataField, DataTypes, PhysicalTablePath, Schema, TableBucket,
TableDescriptor, TableInfo,
+ TablePath,
};
use std::collections::HashMap;
use std::sync::Arc;
@@ -53,12 +54,15 @@ pub(crate) fn build_cluster(table_path: &TablePath,
table_id: i64, buckets: i32)
let bucket_location = BucketLocation::new(
table_bucket.clone(),
Some(server.clone()),
- table_path.clone(),
+ Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))),
);
bucket_locations.push(bucket_location.clone());
locations_by_bucket.insert(table_bucket, bucket_location);
}
- locations_by_path.insert(table_path.clone(), bucket_locations);
+ locations_by_path.insert(
+ Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))),
+ bucket_locations,
+ );
let mut table_id_by_path = HashMap::new();
table_id_by_path.insert(table_path.clone(), table_id);
@@ -76,6 +80,7 @@ pub(crate) fn build_cluster(table_path: &TablePath, table_id:
i64, buckets: i32)
locations_by_bucket,
table_id_by_path,
table_info_by_path,
+ HashMap::new(),
)
}
diff --git a/crates/fluss/tests/integration/kv_table.rs
b/crates/fluss/tests/integration/kv_table.rs
index a4f2961..b2263c2 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -36,7 +36,7 @@ mod kv_table_test {
use crate::integration::fluss_cluster::FlussTestingCluster;
use crate::integration::utils::{create_table, get_cluster, start_cluster,
stop_cluster};
use fluss::client::UpsertWriter;
- use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
+ use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor,
TablePath};
use fluss::row::{GenericRow, InternalRow};
use std::sync::Arc;
@@ -438,6 +438,179 @@ mod kv_table_test {
.expect("Failed to drop table");
}
+ #[tokio::test]
+ async fn partitioned_table_upsert_and_lookup() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path =
+ TablePath::new("fluss".to_string(),
"test_partitioned_kv_table".to_string());
+
+ // Create a partitioned KV table with region as partition key
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("region", DataTypes::string())
+ .column("user_id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .column("score", DataTypes::bigint())
+ .primary_key(vec!["region".to_string(),
"user_id".to_string()])
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .partitioned_by(vec!["region".to_string()])
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ // Create partitions for each region before inserting data
+ for region in &["US", "EU", "APAC"] {
+ let mut partition_map = std::collections::HashMap::new();
+ partition_map.insert("region".to_string(), region.to_string());
+ let partition_spec = PartitionSpec::new(partition_map);
+ admin
+ .create_partition(&table_path, &partition_spec, false)
+ .await
+ .expect("Failed to create partition");
+ }
+
+ let connection = cluster.get_fluss_connection().await;
+
+ let table = connection
+ .get_table(&table_path)
+ .await
+ .expect("Failed to get table");
+
+ let table_upsert = table.new_upsert().expect("Failed to create
upsert");
+
+ let mut upsert_writer = table_upsert
+ .create_writer()
+ .expect("Failed to create writer");
+
+ // Insert records with different partitions
+ let test_data = [
+ ("US", 1, "Gustave", 100i64),
+ ("US", 2, "Lune", 200i64),
+ ("EU", 1, "Sciel", 150i64),
+ ("EU", 2, "Maelle", 250i64),
+ ("APAC", 1, "Noco", 300i64),
+ ];
+
+ for (region, user_id, name, score) in &test_data {
+ let mut row = GenericRow::new(4);
+ row.set_field(0, *region);
+ row.set_field(1, *user_id);
+ row.set_field(2, *name);
+ row.set_field(3, *score);
+ upsert_writer.upsert(&row).await.expect("Failed to upsert");
+ }
+
+ // Create lookuper
+ let mut lookuper = table
+ .new_lookup()
+ .expect("Failed to create lookup")
+ .create_lookuper()
+ .expect("Failed to create lookuper");
+
+ // Lookup records - the lookup key includes partition key columns
+ for (region, user_id, expected_name, expected_score) in &test_data {
+ let mut key = GenericRow::new(4);
+ key.set_field(0, *region);
+ key.set_field(1, *user_id);
+
+ let result = lookuper.lookup(&key).await.expect("Failed to
lookup");
+ let row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+
+ assert_eq!(row.get_string(0), *region, "region mismatch");
+ assert_eq!(row.get_int(1), *user_id, "user_id mismatch");
+ assert_eq!(row.get_string(2), *expected_name, "name mismatch");
+ assert_eq!(row.get_long(3), *expected_score, "score mismatch");
+ }
+
+ // Test update within a partition
+ let mut updated_row = GenericRow::new(4);
+ updated_row.set_field(0, "US");
+ updated_row.set_field(1, 1);
+ updated_row.set_field(2, "Gustave Updated");
+ updated_row.set_field(3, 999i64);
+ upsert_writer
+ .upsert(&updated_row)
+ .await
+ .expect("Failed to upsert updated row");
+
+ // Verify the update
+ let mut key = GenericRow::new(4);
+ key.set_field(0, "US");
+ key.set_field(1, 1);
+ let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+ let row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+ assert_eq!(row.get_string(2), "Gustave Updated");
+ assert_eq!(row.get_long(3), 999);
+
+ // Lookup in non-existent partition should return empty result
+ let mut non_existent_key = GenericRow::new(4);
+ non_existent_key.set_field(0, "UNKNOWN_REGION");
+ non_existent_key.set_field(1, 1);
+ let result = lookuper
+ .lookup(&non_existent_key)
+ .await
+ .expect("Failed to lookup non-existent partition");
+ assert!(
+ result
+ .get_single_row()
+ .expect("Failed to get row")
+ .is_none(),
+ "Lookup in non-existent partition should return None"
+ );
+
+ // Delete a record within a partition
+ let mut delete_key = GenericRow::new(4);
+ delete_key.set_field(0, "EU");
+ delete_key.set_field(1, 1);
+ upsert_writer
+ .delete(&delete_key)
+ .await
+ .expect("Failed to delete");
+
+ // Verify deletion
+ let mut key = GenericRow::new(4);
+ key.set_field(0, "EU");
+ key.set_field(1, 1);
+ let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+ assert!(
+ result
+ .get_single_row()
+ .expect("Failed to get row")
+ .is_none(),
+ "Deleted record should not exist"
+ );
+
+ // Verify other records in the same partition still exist
+ let mut key = GenericRow::new(4);
+ key.set_field(0, "EU");
+ key.set_field(1, 2);
+ let result = lookuper.lookup(&key).await.expect("Failed to lookup");
+ let row = result
+ .get_single_row()
+ .expect("Failed to get row")
+ .expect("Row should exist");
+ assert_eq!(row.get_string(2), "Maelle");
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
+
/// Integration test covering put and get operations for all supported
datatypes.
#[tokio::test]
async fn all_supported_datatypes() {