This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d50eb2  feat: support list offset for partitioned table (#229)
4d50eb2 is described below

commit 4d50eb29c9c3f073317cce22302ed6f78b3528d6
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Feb 1 19:59:37 2026 +0800

    feat: support list offset for partitioned table (#229)
---
 crates/fluss/src/client/admin.rs            | 63 ++++++++++++++++++++++++-----
 crates/fluss/src/client/metadata.rs         | 21 ++++++++++
 crates/fluss/tests/integration/log_table.rs | 32 +++++++++++++++
 3 files changed, 107 insertions(+), 9 deletions(-)

diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index ea1efc3..9061169 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -18,7 +18,7 @@
 use crate::client::metadata::Metadata;
 use crate::metadata::{
     DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, 
PartitionSpec,
-    TableBucket, TableDescriptor, TableInfo, TablePath,
+    PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath,
 };
 use crate::rpc::message::{
     CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, 
DatabaseExistsRequest,
@@ -33,7 +33,6 @@ use crate::error::{Error, Result};
 use crate::proto::GetTableInfoResponse;
 use crate::{BucketId, PartitionId, TableId};
 use std::collections::{HashMap, HashSet};
-use std::slice::from_ref;
 use std::sync::Arc;
 use tokio::task::JoinHandle;
 
@@ -294,23 +293,69 @@ impl FlussAdmin {
         buckets_id: &[BucketId],
         offset_spec: OffsetSpec,
     ) -> Result<HashMap<i32, i64>> {
-        self.metadata
-            .check_and_update_table_metadata(from_ref(table_path))
-            .await?;
+        self.do_list_offsets(table_path, None, buckets_id, offset_spec)
+            .await
+    }
+
+    /// List offset for the specified buckets in a partition. This operation 
enables to find
+    /// the beginning offset, end offset as well as the offset matching a 
timestamp in buckets.
+    pub async fn list_partition_offsets(
+        &self,
+        table_path: &TablePath,
+        partition_name: &str,
+        buckets_id: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, i64>> {
+        self.do_list_offsets(table_path, Some(partition_name), buckets_id, 
offset_spec)
+            .await
+    }
 
+    async fn do_list_offsets(
+        &self,
+        table_path: &TablePath,
+        partition_name: Option<&str>,
+        buckets_id: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, i64>> {
         if buckets_id.is_empty() {
-            return Err(Error::UnexpectedError {
+            return Err(Error::IllegalArgument {
                 message: "Buckets are empty.".to_string(),
-                source: None,
             });
         }
 
+        // force to update table metadata like java side
+        self.metadata.update_table_metadata(table_path).await?;
+
         let cluster = self.metadata.get_cluster();
         let table_id = cluster.get_table(table_path)?.table_id;
 
+        // Resolve partition_id from partition_name if provided
+        let partition_id = if let Some(name) = partition_name {
+            let physical_table_path = 
Arc::new(PhysicalTablePath::of_partitioned(
+                Arc::new(table_path.clone()),
+                Some(name.to_string()),
+            ));
+
+            // Update partition metadata like java side
+            self.metadata
+                
.update_physical_table_metadata(std::slice::from_ref(&physical_table_path))
+                .await?;
+
+            let cluster = self.metadata.get_cluster();
+            Some(
+                cluster
+                    .get_partition_id(&physical_table_path)
+                    .ok_or_else(|| Error::PartitionNotExist {
+                        message: format!("Partition '{name}' not found for 
table '{table_path}'"),
+                    })?,
+            )
+        } else {
+            None
+        };
+
         // Prepare requests
         let requests_by_server =
-            self.prepare_list_offsets_requests(table_id, None, buckets_id, 
offset_spec)?;
+            self.prepare_list_offsets_requests(table_id, partition_id, 
buckets_id, offset_spec)?;
 
         // Send Requests
         let response_futures = 
self.send_list_offsets_request(requests_by_server).await?;
@@ -338,7 +383,7 @@ impl FlussAdmin {
         let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> = 
HashMap::new();
 
         for bucket_id in buckets {
-            let table_bucket = TableBucket::new(table_id, *bucket_id);
+            let table_bucket = TableBucket::new_with_partition(table_id, 
partition_id, *bucket_id);
             let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
                 // todo: consider retry?
                 Error::UnexpectedError {
diff --git a/crates/fluss/src/client/metadata.rs 
b/crates/fluss/src/client/metadata.rs
index 52ccd62..c6244cd 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -142,6 +142,27 @@ impl Metadata {
             .await
     }
 
+    pub async fn update_physical_table_metadata(
+        &self,
+        physical_table_paths: &[Arc<PhysicalTablePath>],
+    ) -> Result<()> {
+        let mut update_table_paths = HashSet::new();
+        let mut update_partition_paths = HashSet::new();
+        for physical_table_path in physical_table_paths {
+            match physical_table_path.get_partition_name() {
+                Some(_) => {
+                    update_partition_paths.insert(physical_table_path);
+                }
+                None => {
+                    
update_table_paths.insert(physical_table_path.get_table_path());
+                }
+            }
+        }
+
+        self.update_tables_metadata(&update_table_paths, 
&update_partition_paths, vec![])
+            .await
+    }
+
     pub async fn check_and_update_table_metadata(&self, table_paths: 
&[TablePath]) -> Result<()> {
         let cluster_binding = self.cluster.read().clone();
         let need_update_table_paths: HashSet<&TablePath> = table_paths
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 64e6289..27b4d83 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -1072,6 +1072,38 @@ mod table_test {
             .await
             .expect("Failed to flush batches");
 
+        // Test list_offsets_for_partition
+        // US partition has 4 records: 2 from row append + 2 from batch append
+        let us_offsets = admin
+            .list_partition_offsets(&table_path, "US", &[0], 
OffsetSpec::Latest)
+            .await
+            .expect("Failed to list offsets for US partition");
+        assert_eq!(
+            us_offsets.get(&0),
+            Some(&4),
+            "US partition should have 4 records"
+        );
+
+        // EU partition has 4 records: 2 from row append + 2 from batch append
+        let eu_offsets = admin
+            .list_partition_offsets(&table_path, "EU", &[0], 
OffsetSpec::Latest)
+            .await
+            .expect("Failed to list offsets for EU partition");
+        assert_eq!(
+            eu_offsets.get(&0),
+            Some(&4),
+            "EU partition should have 4 records"
+        );
+
+        // test list a not exist partition should return error
+        let result = admin
+            .list_partition_offsets(&table_path, "NOT Exists", &[0], 
OffsetSpec::Latest)
+            .await;
+        assert!(result.is_err());
+        assert!(result.unwrap_err().to_string().contains(
+            "Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' 
does not exist."
+        ));
+
         admin
             .drop_table(&table_path, false)
             .await

Reply via email to