This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d794f7  feat: add table partition scanning (#222)
7d794f7 is described below

commit 7d794f710fe80ee9a43e274c2e538112309406af
Author: Miao <[email protected]>
AuthorDate: Wed Feb 4 12:06:55 2026 +0800

    feat: add table partition scanning (#222)
---
 bindings/python/src/metadata.rs             |   6 +-
 crates/fluss/src/client/admin.rs            |   6 +-
 crates/fluss/src/client/metadata.rs         |   4 +-
 crates/fluss/src/client/table/scanner.rs    | 153 ++++++++++++++++++++--------
 crates/fluss/src/metadata/table.rs          |   2 +-
 crates/fluss/tests/integration/log_table.rs |  66 +++++++++++-
 6 files changed, 185 insertions(+), 52 deletions(-)

diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index 235df56..f422696 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -530,7 +530,11 @@ impl TableBucket {
 
     /// Convert to core TableBucket (internal use)
     pub fn to_core(&self) -> fcore::metadata::TableBucket {
-        fcore::metadata::TableBucket::new(self.table_id, self.bucket)
+        fcore::metadata::TableBucket::new_with_partition(
+            self.table_id,
+            self.partition_id,
+            self.bucket,
+        )
     }
 }
 
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 9061169..737ead3 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -273,7 +273,11 @@ impl FlussAdmin {
         // Convert proto response to LakeSnapshot
         let mut table_buckets_offset = HashMap::new();
         for bucket_snapshot in response.bucket_snapshots {
-            let table_bucket = TableBucket::new(response.table_id, 
bucket_snapshot.bucket_id);
+            let table_bucket = TableBucket::new_with_partition(
+                response.table_id,
+                bucket_snapshot.partition_id,
+                bucket_snapshot.bucket_id,
+            );
             if let Some(log_offset) = bucket_snapshot.log_offset {
                 table_buckets_offset.insert(table_bucket, log_offset);
             }
diff --git a/crates/fluss/src/client/metadata.rs 
b/crates/fluss/src/client/metadata.rs
index c6244cd..ce00ced 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -259,8 +259,8 @@ mod tests {
         let leader = metadata
             .leader_for(&table_path, &TableBucket::new(1, 0))
             .await
-            .expect("leader request should be Ok")
-            .expect("leader should exist");
+            .unwrap()
+            .expect("leader");
         assert_eq!(leader.id(), 1);
     }
 
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index d30c5d5..a88964e 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -26,7 +26,6 @@ use std::{
 };
 use tempfile::TempDir;
 
-use crate::TableId;
 use crate::client::connection::FlussConnection;
 use crate::client::credentials::SecurityTokenManager;
 use crate::client::metadata::Metadata;
@@ -43,6 +42,7 @@ use crate::record::{
 };
 use crate::rpc::{RpcClient, RpcError, message};
 use crate::util::FairBucketStatusMap;
+use crate::{PartitionId, TableId};
 
 const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
 #[allow(dead_code)]
@@ -88,7 +88,7 @@ impl<'a> TableScan<'a> {
     /// # pub async fn example() -> Result<()> {
     ///     let mut config = Config::default();
     ///     config.bootstrap_server = "127.0.0.1:9123".to_string();
-    ///     let conn = FlussConnection::new(config).await;
+    ///     let conn = FlussConnection::new(config).await?;
     ///
     ///     let table_descriptor = TableDescriptor::builder()
     ///         .schema(
@@ -164,7 +164,7 @@ impl<'a> TableScan<'a> {
     /// # pub async fn example() -> Result<()> {
     ///     let mut config = Config::default();
     ///     config.bootstrap_server = "127.0.0.1:9123".to_string();
-    ///     let conn = FlussConnection::new(config).await;
+    ///     let conn = FlussConnection::new(config).await?;
     ///
     ///     let table_descriptor = TableDescriptor::builder()
     ///         .schema(
@@ -270,6 +270,7 @@ struct LogScannerInner {
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
     log_fetcher: LogFetcher,
+    is_partitioned_table: bool,
 }
 
 impl LogScannerInner {
@@ -284,6 +285,7 @@ impl LogScannerInner {
         Ok(Self {
             table_path: table_info.table_path.clone(),
             table_id: table_info.table_id,
+            is_partitioned_table: table_info.is_partitioned(),
             metadata: metadata.clone(),
             log_scanner_status: log_scanner_status.clone(),
             log_fetcher: LogFetcher::new(
@@ -337,6 +339,13 @@ impl LogScannerInner {
     }
 
     async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+        if self.is_partitioned_table {
+            return Err(Error::UnsupportedOperation {
+                message: "The table is a partitioned table, please use 
\"subscribe_partition\" to \
+                subscribe a partitioned bucket instead."
+                    .to_string(),
+            });
+        }
         let table_bucket = TableBucket::new(self.table_id, bucket);
         self.metadata
             .check_and_update_table_metadata(from_ref(&self.table_path))
@@ -347,6 +356,13 @@ impl LogScannerInner {
     }
 
     async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
+        if self.is_partitioned_table {
+            return Err(Error::UnsupportedOperation {
+                message:
+                    "The table is a partitioned table, subscribe_batch is not 
supported currently."
+                        .to_string(),
+            });
+        }
         self.metadata
             .check_and_update_table_metadata(from_ref(&self.table_path))
             .await?;
@@ -368,6 +384,29 @@ impl LogScannerInner {
         Ok(())
     }
 
+    async fn subscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+        offset: i64,
+    ) -> Result<()> {
+        if !self.is_partitioned_table {
+            return Err(Error::UnsupportedOperation {
+                message: "The table is not a partitioned table, please use 
\"subscribe\" to \
+                subscribe a non-partitioned bucket instead."
+                    .to_string(),
+            });
+        }
+        let table_bucket =
+            TableBucket::new_with_partition(self.table_id, Some(partition_id), 
bucket);
+        self.metadata
+            .check_and_update_table_metadata(from_ref(&self.table_path))
+            .await?;
+        self.log_scanner_status
+            .assign_scan_bucket(table_bucket, offset);
+        Ok(())
+    }
+
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
         let result = self.log_fetcher.collect_fetches()?;
         if !result.is_empty() {
@@ -435,6 +474,17 @@ impl LogScanner {
     pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
         self.inner.subscribe_batch(bucket_offsets).await
     }
+
+    pub async fn subscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+        offset: i64,
+    ) -> Result<()> {
+        self.inner
+            .subscribe_partition(partition_id, bucket, offset)
+            .await
+    }
 }
 
 // Implementation for RecordBatchLogScanner (batches mode)
@@ -451,6 +501,17 @@ impl RecordBatchLogScanner {
     pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
         self.inner.subscribe_batch(bucket_offsets).await
     }
+
+    pub async fn subscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+        offset: i64,
+    ) -> Result<()> {
+        self.inner
+            .subscribe_partition(partition_id, bucket, offset)
+            .await
+    }
 }
 
 struct LogFetcher {
@@ -617,55 +678,55 @@ impl LogFetcher {
         )
     }
 
-    async fn check_and_update_metadata(&self) -> Result<()> {
-        let need_update = self
-            .fetchable_buckets()
-            .iter()
-            .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
+    async fn check_and_update_metadata(&self, table_buckets: &[TableBucket]) 
-> Result<()> {
+        let mut partition_ids = Vec::new();
+        let mut need_update = false;
 
-        if !need_update {
-            return Ok(());
+        for tb in table_buckets {
+            if self.get_table_bucket_leader(tb).is_some() {
+                continue;
+            }
+
+            if self.is_partitioned {
+                partition_ids.push(tb.partition_id().unwrap());
+            } else {
+                need_update = true;
+                break;
+            }
         }
 
-        if self.is_partitioned {
-            // Fallback to full table metadata refresh until partition-aware 
updates are available.
+        let update_result = if self.is_partitioned && 
!partition_ids.is_empty() {
             self.metadata
-                .update_tables_metadata(&HashSet::from([&self.table_path]), 
&HashSet::new(), vec![])
+                .update_tables_metadata(
+                    &HashSet::from([&self.table_path]),
+                    &HashSet::new(),
+                    partition_ids,
+                )
                 .await
-                .or_else(|e| {
-                    if let Error::RpcError { source, .. } = &e
-                        && matches!(source, RpcError::ConnectionError(_) | 
RpcError::Poisoned(_))
-                    {
-                        warn!(
-                            "Retrying after encountering error while updating 
table metadata: {e}"
-                        );
-                        Ok(())
-                    } else {
-                        Err(e)
-                    }
-                })?;
-            return Ok(());
-        }
+        } else if need_update {
+            self.metadata.update_table_metadata(&self.table_path).await
+        } else {
+            Ok(())
+        };
 
-        // TODO: Handle PartitionNotExist error
-        self.metadata
-            .update_tables_metadata(&HashSet::from([&self.table_path]), 
&HashSet::new(), vec![])
-            .await
-            .or_else(|e| {
-                if let Error::RpcError { source, .. } = &e
-                    && matches!(source, RpcError::ConnectionError(_) | 
RpcError::Poisoned(_))
-                {
-                    warn!("Retrying after encountering error while updating 
table metadata: {e}");
-                    Ok(())
-                } else {
-                    Err(e)
-                }
-            })
+        // TODO: Handle PartitionNotExist error like java side
+        update_result.or_else(|e| {
+            if let Error::RpcError { source, .. } = &e
+                && matches!(source, RpcError::ConnectionError(_) | 
RpcError::Poisoned(_))
+            {
+                warn!("Retrying after encountering error while updating table 
metadata: {e}");
+                Ok(())
+            } else {
+                Err(e)
+            }
+        })?;
+        Ok(())
     }
 
     /// Send fetch requests asynchronously without waiting for responses
     async fn send_fetches(&self) -> Result<()> {
-        self.check_and_update_metadata().await?;
+        self.check_and_update_metadata(self.fetchable_buckets().as_slice())
+            .await?;
         let fetch_request = self.prepare_fetch_log_requests().await;
 
         for (leader, fetch_request) in fetch_request {
@@ -774,7 +835,11 @@ impl LogFetcher {
 
             for fetch_log_for_bucket in fetch_log_for_buckets {
                 let bucket: i32 = fetch_log_for_bucket.bucket_id;
-                let table_bucket = TableBucket::new(table_id, bucket);
+                let table_bucket = TableBucket::new_with_partition(
+                    table_id,
+                    fetch_log_for_bucket.partition_id,
+                    bucket,
+                );
 
                 // todo: check fetch result code for per-bucket
                 let Some(fetch_offset) = 
log_scanner_status.get_bucket_offset(&table_bucket) else {
@@ -1302,7 +1367,7 @@ impl LogFetcher {
                         )
                     } else {
                         let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
-                            partition_id: None,
+                            partition_id: bucket.partition_id(),
                             bucket_id: bucket.bucket_id(),
                             fetch_offset: offset,
                             // 1M
diff --git a/crates/fluss/src/metadata/table.rs 
b/crates/fluss/src/metadata/table.rs
index 908f446..4e0a525 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -1206,7 +1206,7 @@ pub struct TableBucket {
 
 impl TableBucket {
     pub fn new(table_id: TableId, bucket: BucketId) -> Self {
-        TableBucket {
+        Self {
             table_id,
             partition_id: None,
             bucket,
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 514df82..cbfcbe5 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -974,7 +974,7 @@ mod table_test {
     }
 
     #[tokio::test]
-    async fn partitioned_table_append() {
+    async fn partitioned_table_append_scan() {
         let cluster = get_fluss_cluster();
         let connection = cluster.get_fluss_connection().await;
 
@@ -1098,11 +1098,71 @@ mod table_test {
             "Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' 
does not exist."
         ));
 
+        let log_scanner = table
+            .new_scan()
+            .create_log_scanner()
+            .expect("Failed to create log scanner");
+        let partition_info = admin
+            .list_partition_infos(&table_path)
+            .await
+            .expect("Failed to list partition infos");
+        for partition_info in partition_info {
+            log_scanner
+                .subscribe_partition(partition_info.get_partition_id(), 0, 0)
+                .await
+                .expect("Failed to subscribe to partition");
+        }
+
+        let expected_records = vec![
+            (1, "US", 100i64),
+            (2, "US", 200i64),
+            (3, "EU", 300i64),
+            (4, "EU", 400),
+            (5, "US", 500i64),
+            (6, "US", 600i64),
+            (7, "EU", 700i64),
+            (8, "EU", 800i64),
+        ];
+        let expected_records: Vec<(i32, String, i64)> = expected_records
+            .into_iter()
+            .map(|(id, region, val)| (id, region.to_string(), val))
+            .collect();
+
+        let mut collected_records: Vec<(i32, String, i64)> = Vec::new();
+        let start_time = std::time::Instant::now();
+        while collected_records.len() < expected_records.len()
+            && start_time.elapsed() < Duration::from_secs(10)
+        {
+            let records = log_scanner
+                .poll(Duration::from_millis(500))
+                .await
+                .expect("Failed to poll log scanner");
+            for rec in records {
+                let row = rec.row();
+                collected_records.push((
+                    row.get_int(0),
+                    row.get_string(1).to_string(),
+                    row.get_long(2),
+                ));
+            }
+        }
+
+        assert_eq!(
+            collected_records.len(),
+            expected_records.len(),
+            "Did not receive all records in time, expect receive {} records, 
but got {} records",
+            expected_records.len(),
+            collected_records.len()
+        );
+        collected_records.sort_by_key(|r| r.0);
+        assert_eq!(
+            collected_records, expected_records,
+            "Data mismatch between sent and received"
+        );
+
         admin
             .drop_table(&table_path, false)
             .await
             .expect("Failed to drop table");
-
-        // todo: add scan test in 203
     }
 }

Reply via email to