This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4d50eb2 feat: support list offset for partitioned table (#229)
4d50eb2 is described below
commit 4d50eb29c9c3f073317cce22302ed6f78b3528d6
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Feb 1 19:59:37 2026 +0800
feat: support list offset for partitioned table (#229)
---
crates/fluss/src/client/admin.rs | 63 ++++++++++++++++++++++++-----
crates/fluss/src/client/metadata.rs | 21 ++++++++++
crates/fluss/tests/integration/log_table.rs | 32 +++++++++++++++
3 files changed, 107 insertions(+), 9 deletions(-)
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index ea1efc3..9061169 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -18,7 +18,7 @@
use crate::client::metadata::Metadata;
use crate::metadata::{
DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo,
PartitionSpec,
- TableBucket, TableDescriptor, TableInfo, TablePath,
+ PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath,
};
use crate::rpc::message::{
CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest,
DatabaseExistsRequest,
@@ -33,7 +33,6 @@ use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
use crate::{BucketId, PartitionId, TableId};
use std::collections::{HashMap, HashSet};
-use std::slice::from_ref;
use std::sync::Arc;
use tokio::task::JoinHandle;
@@ -294,23 +293,69 @@ impl FlussAdmin {
buckets_id: &[BucketId],
offset_spec: OffsetSpec,
) -> Result<HashMap<i32, i64>> {
- self.metadata
- .check_and_update_table_metadata(from_ref(table_path))
- .await?;
+ self.do_list_offsets(table_path, None, buckets_id, offset_spec)
+ .await
+ }
+
+ /// List offset for the specified buckets in a partition. This operation
enables to find
+ /// the beginning offset, end offset as well as the offset matching a
timestamp in buckets.
+ pub async fn list_partition_offsets(
+ &self,
+ table_path: &TablePath,
+ partition_name: &str,
+ buckets_id: &[BucketId],
+ offset_spec: OffsetSpec,
+ ) -> Result<HashMap<i32, i64>> {
+ self.do_list_offsets(table_path, Some(partition_name), buckets_id,
offset_spec)
+ .await
+ }
+ async fn do_list_offsets(
+ &self,
+ table_path: &TablePath,
+ partition_name: Option<&str>,
+ buckets_id: &[BucketId],
+ offset_spec: OffsetSpec,
+ ) -> Result<HashMap<i32, i64>> {
if buckets_id.is_empty() {
- return Err(Error::UnexpectedError {
+ return Err(Error::IllegalArgument {
message: "Buckets are empty.".to_string(),
- source: None,
});
}
+ // force to update table metadata like java side
+ self.metadata.update_table_metadata(table_path).await?;
+
let cluster = self.metadata.get_cluster();
let table_id = cluster.get_table(table_path)?.table_id;
+ // Resolve partition_id from partition_name if provided
+ let partition_id = if let Some(name) = partition_name {
+ let physical_table_path =
Arc::new(PhysicalTablePath::of_partitioned(
+ Arc::new(table_path.clone()),
+ Some(name.to_string()),
+ ));
+
+ // Update partition metadata like java side
+ self.metadata
+
.update_physical_table_metadata(std::slice::from_ref(&physical_table_path))
+ .await?;
+
+ let cluster = self.metadata.get_cluster();
+ Some(
+ cluster
+ .get_partition_id(&physical_table_path)
+ .ok_or_else(|| Error::PartitionNotExist {
+ message: format!("Partition '{name}' not found for
table '{table_path}'"),
+ })?,
+ )
+ } else {
+ None
+ };
+
// Prepare requests
let requests_by_server =
- self.prepare_list_offsets_requests(table_id, None, buckets_id,
offset_spec)?;
+ self.prepare_list_offsets_requests(table_id, partition_id,
buckets_id, offset_spec)?;
// Send Requests
let response_futures =
self.send_list_offsets_request(requests_by_server).await?;
@@ -338,7 +383,7 @@ impl FlussAdmin {
let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> =
HashMap::new();
for bucket_id in buckets {
- let table_bucket = TableBucket::new(table_id, *bucket_id);
+ let table_bucket = TableBucket::new_with_partition(table_id,
partition_id, *bucket_id);
let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
// todo: consider retry?
Error::UnexpectedError {
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index 52ccd62..c6244cd 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -142,6 +142,27 @@ impl Metadata {
.await
}
+ pub async fn update_physical_table_metadata(
+ &self,
+ physical_table_paths: &[Arc<PhysicalTablePath>],
+ ) -> Result<()> {
+ let mut update_table_paths = HashSet::new();
+ let mut update_partition_paths = HashSet::new();
+ for physical_table_path in physical_table_paths {
+ match physical_table_path.get_partition_name() {
+ Some(_) => {
+ update_partition_paths.insert(physical_table_path);
+ }
+ None => {
+
update_table_paths.insert(physical_table_path.get_table_path());
+ }
+ }
+ }
+
+ self.update_tables_metadata(&update_table_paths,
&update_partition_paths, vec![])
+ .await
+ }
+
pub async fn check_and_update_table_metadata(&self, table_paths:
&[TablePath]) -> Result<()> {
let cluster_binding = self.cluster.read().clone();
let need_update_table_paths: HashSet<&TablePath> = table_paths
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 64e6289..27b4d83 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -1072,6 +1072,38 @@ mod table_test {
.await
.expect("Failed to flush batches");
+ // Test list_offsets_for_partition
+ // US partition has 4 records: 2 from row append + 2 from batch append
+ let us_offsets = admin
+ .list_partition_offsets(&table_path, "US", &[0],
OffsetSpec::Latest)
+ .await
+ .expect("Failed to list offsets for US partition");
+ assert_eq!(
+ us_offsets.get(&0),
+ Some(&4),
+ "US partition should have 4 records"
+ );
+
+ // EU partition has 4 records: 2 from row append + 2 from batch append
+ let eu_offsets = admin
+ .list_partition_offsets(&table_path, "EU", &[0],
OffsetSpec::Latest)
+ .await
+ .expect("Failed to list offsets for EU partition");
+ assert_eq!(
+ eu_offsets.get(&0),
+ Some(&4),
+ "EU partition should have 4 records"
+ );
+
+ // test list a not exist partition should return error
+ let result = admin
+ .list_partition_offsets(&table_path, "NOT Exists", &[0],
OffsetSpec::Latest)
+ .await;
+ assert!(result.is_err());
+ assert!(result.unwrap_err().to_string().contains(
+ "Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)'
does not exist."
+ ));
+
admin
.drop_table(&table_path, false)
.await