This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7d794f7 feat: add table partition scanning (#222)
7d794f7 is described below
commit 7d794f710fe80ee9a43e274c2e538112309406af
Author: Miao <[email protected]>
AuthorDate: Wed Feb 4 12:06:55 2026 +0800
feat: add table partition scanning (#222)
---
bindings/python/src/metadata.rs | 6 +-
crates/fluss/src/client/admin.rs | 6 +-
crates/fluss/src/client/metadata.rs | 4 +-
crates/fluss/src/client/table/scanner.rs | 153 ++++++++++++++++++++--------
crates/fluss/src/metadata/table.rs | 2 +-
crates/fluss/tests/integration/log_table.rs | 66 +++++++++++-
6 files changed, 185 insertions(+), 52 deletions(-)
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index 235df56..f422696 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -530,7 +530,11 @@ impl TableBucket {
/// Convert to core TableBucket (internal use)
pub fn to_core(&self) -> fcore::metadata::TableBucket {
- fcore::metadata::TableBucket::new(self.table_id, self.bucket)
+ fcore::metadata::TableBucket::new_with_partition(
+ self.table_id,
+ self.partition_id,
+ self.bucket,
+ )
}
}
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index 9061169..737ead3 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -273,7 +273,11 @@ impl FlussAdmin {
// Convert proto response to LakeSnapshot
let mut table_buckets_offset = HashMap::new();
for bucket_snapshot in response.bucket_snapshots {
- let table_bucket = TableBucket::new(response.table_id,
bucket_snapshot.bucket_id);
+ let table_bucket = TableBucket::new_with_partition(
+ response.table_id,
+ bucket_snapshot.partition_id,
+ bucket_snapshot.bucket_id,
+ );
if let Some(log_offset) = bucket_snapshot.log_offset {
table_buckets_offset.insert(table_bucket, log_offset);
}
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index c6244cd..ce00ced 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -259,8 +259,8 @@ mod tests {
let leader = metadata
.leader_for(&table_path, &TableBucket::new(1, 0))
.await
- .expect("leader request should be Ok")
- .expect("leader should exist");
+ .unwrap()
+ .expect("leader");
assert_eq!(leader.id(), 1);
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index d30c5d5..a88964e 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -26,7 +26,6 @@ use std::{
};
use tempfile::TempDir;
-use crate::TableId;
use crate::client::connection::FlussConnection;
use crate::client::credentials::SecurityTokenManager;
use crate::client::metadata::Metadata;
@@ -43,6 +42,7 @@ use crate::record::{
};
use crate::rpc::{RpcClient, RpcError, message};
use crate::util::FairBucketStatusMap;
+use crate::{PartitionId, TableId};
const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
@@ -88,7 +88,7 @@ impl<'a> TableScan<'a> {
/// # pub async fn example() -> Result<()> {
/// let mut config = Config::default();
/// config.bootstrap_server = "127.0.0.1:9123".to_string();
- /// let conn = FlussConnection::new(config).await;
+ /// let conn = FlussConnection::new(config).await?;
///
/// let table_descriptor = TableDescriptor::builder()
/// .schema(
@@ -164,7 +164,7 @@ impl<'a> TableScan<'a> {
/// # pub async fn example() -> Result<()> {
/// let mut config = Config::default();
/// config.bootstrap_server = "127.0.0.1:9123".to_string();
- /// let conn = FlussConnection::new(config).await;
+ /// let conn = FlussConnection::new(config).await?;
///
/// let table_descriptor = TableDescriptor::builder()
/// .schema(
@@ -270,6 +270,7 @@ struct LogScannerInner {
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
log_fetcher: LogFetcher,
+ is_partitioned_table: bool,
}
impl LogScannerInner {
@@ -284,6 +285,7 @@ impl LogScannerInner {
Ok(Self {
table_path: table_info.table_path.clone(),
table_id: table_info.table_id,
+ is_partitioned_table: table_info.is_partitioned(),
metadata: metadata.clone(),
log_scanner_status: log_scanner_status.clone(),
log_fetcher: LogFetcher::new(
@@ -337,6 +339,13 @@ impl LogScannerInner {
}
async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+ if self.is_partitioned_table {
+ return Err(Error::UnsupportedOperation {
+ message: "The table is a partitioned table, please use
\"subscribe_partition\" to \
+ subscribe a partitioned bucket instead."
+ .to_string(),
+ });
+ }
let table_bucket = TableBucket::new(self.table_id, bucket);
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
@@ -347,6 +356,13 @@ impl LogScannerInner {
}
async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
+ if self.is_partitioned_table {
+ return Err(Error::UnsupportedOperation {
+ message:
+ "The table is a partitioned table, subscribe_batch is not
supported currently."
+ .to_string(),
+ });
+ }
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
@@ -368,6 +384,29 @@ impl LogScannerInner {
Ok(())
}
+ async fn subscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket: i32,
+ offset: i64,
+ ) -> Result<()> {
+ if !self.is_partitioned_table {
+ return Err(Error::UnsupportedOperation {
+ message: "The table is not a partitioned table, please use
\"subscribe\" to \
+ subscribe a non-partitioned bucket instead."
+ .to_string(),
+ });
+ }
+ let table_bucket =
+ TableBucket::new_with_partition(self.table_id, Some(partition_id),
bucket);
+ self.metadata
+ .check_and_update_table_metadata(from_ref(&self.table_path))
+ .await?;
+ self.log_scanner_status
+ .assign_scan_bucket(table_bucket, offset);
+ Ok(())
+ }
+
async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket,
Vec<ScanRecord>>> {
let result = self.log_fetcher.collect_fetches()?;
if !result.is_empty() {
@@ -435,6 +474,17 @@ impl LogScanner {
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
}
+
+ pub async fn subscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket: i32,
+ offset: i64,
+ ) -> Result<()> {
+ self.inner
+ .subscribe_partition(partition_id, bucket, offset)
+ .await
+ }
}
// Implementation for RecordBatchLogScanner (batches mode)
@@ -451,6 +501,17 @@ impl RecordBatchLogScanner {
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
}
+
+ pub async fn subscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket: i32,
+ offset: i64,
+ ) -> Result<()> {
+ self.inner
+ .subscribe_partition(partition_id, bucket, offset)
+ .await
+ }
}
struct LogFetcher {
@@ -617,55 +678,55 @@ impl LogFetcher {
)
}
- async fn check_and_update_metadata(&self) -> Result<()> {
- let need_update = self
- .fetchable_buckets()
- .iter()
- .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
+ async fn check_and_update_metadata(&self, table_buckets: &[TableBucket])
-> Result<()> {
+ let mut partition_ids = Vec::new();
+ let mut need_update = false;
- if !need_update {
- return Ok(());
+ for tb in table_buckets {
+ if self.get_table_bucket_leader(tb).is_some() {
+ continue;
+ }
+
+ if self.is_partitioned {
+ partition_ids.push(tb.partition_id().unwrap());
+ } else {
+ need_update = true;
+ break;
+ }
}
- if self.is_partitioned {
- // Fallback to full table metadata refresh until partition-aware
updates are available.
+ let update_result = if self.is_partitioned &&
!partition_ids.is_empty() {
self.metadata
- .update_tables_metadata(&HashSet::from([&self.table_path]),
&HashSet::new(), vec![])
+ .update_tables_metadata(
+ &HashSet::from([&self.table_path]),
+ &HashSet::new(),
+ partition_ids,
+ )
.await
- .or_else(|e| {
- if let Error::RpcError { source, .. } = &e
- && matches!(source, RpcError::ConnectionError(_) |
RpcError::Poisoned(_))
- {
- warn!(
- "Retrying after encountering error while updating
table metadata: {e}"
- );
- Ok(())
- } else {
- Err(e)
- }
- })?;
- return Ok(());
- }
+ } else if need_update {
+ self.metadata.update_table_metadata(&self.table_path).await
+ } else {
+ Ok(())
+ };
- // TODO: Handle PartitionNotExist error
- self.metadata
- .update_tables_metadata(&HashSet::from([&self.table_path]),
&HashSet::new(), vec![])
- .await
- .or_else(|e| {
- if let Error::RpcError { source, .. } = &e
- && matches!(source, RpcError::ConnectionError(_) |
RpcError::Poisoned(_))
- {
- warn!("Retrying after encountering error while updating
table metadata: {e}");
- Ok(())
- } else {
- Err(e)
- }
- })
+ // TODO: Handle PartitionNotExist error like java side
+ update_result.or_else(|e| {
+ if let Error::RpcError { source, .. } = &e
+ && matches!(source, RpcError::ConnectionError(_) |
RpcError::Poisoned(_))
+ {
+ warn!("Retrying after encountering error while updating table
metadata: {e}");
+ Ok(())
+ } else {
+ Err(e)
+ }
+ })?;
+ Ok(())
}
/// Send fetch requests asynchronously without waiting for responses
async fn send_fetches(&self) -> Result<()> {
- self.check_and_update_metadata().await?;
+ self.check_and_update_metadata(self.fetchable_buckets().as_slice())
+ .await?;
let fetch_request = self.prepare_fetch_log_requests().await;
for (leader, fetch_request) in fetch_request {
@@ -774,7 +835,11 @@ impl LogFetcher {
for fetch_log_for_bucket in fetch_log_for_buckets {
let bucket: i32 = fetch_log_for_bucket.bucket_id;
- let table_bucket = TableBucket::new(table_id, bucket);
+ let table_bucket = TableBucket::new_with_partition(
+ table_id,
+ fetch_log_for_bucket.partition_id,
+ bucket,
+ );
// todo: check fetch result code for per-bucket
let Some(fetch_offset) =
log_scanner_status.get_bucket_offset(&table_bucket) else {
@@ -1302,7 +1367,7 @@ impl LogFetcher {
)
} else {
let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
- partition_id: None,
+ partition_id: bucket.partition_id(),
bucket_id: bucket.bucket_id(),
fetch_offset: offset,
// 1M
diff --git a/crates/fluss/src/metadata/table.rs
b/crates/fluss/src/metadata/table.rs
index 908f446..4e0a525 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -1206,7 +1206,7 @@ pub struct TableBucket {
impl TableBucket {
pub fn new(table_id: TableId, bucket: BucketId) -> Self {
- TableBucket {
+ Self {
table_id,
partition_id: None,
bucket,
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 514df82..cbfcbe5 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -974,7 +974,7 @@ mod table_test {
}
#[tokio::test]
- async fn partitioned_table_append() {
+ async fn partitioned_table_append_scan() {
let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;
@@ -1098,11 +1098,71 @@ mod table_test {
"Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)'
does not exist."
));
+ let log_scanner = table
+ .new_scan()
+ .create_log_scanner()
+ .expect("Failed to create log scanner");
+ let partition_info = admin
+ .list_partition_infos(&table_path)
+ .await
+ .expect("Failed to list partition infos");
+ for partition_info in partition_info {
+ log_scanner
+ .subscribe_partition(partition_info.get_partition_id(), 0, 0)
+ .await
+ .expect("Failed to subscribe to partition");
+ }
+
+ let expected_records = vec![
+ (1, "US", 100i64),
+ (2, "US", 200i64),
+ (3, "EU", 300i64),
+ (4, "EU", 400),
+ (5, "US", 500i64),
+ (6, "US", 600i64),
+ (7, "EU", 700i64),
+ (8, "EU", 800i64),
+ ];
+ let expected_records: Vec<(i32, String, i64)> = expected_records
+ .into_iter()
+ .map(|(id, region, val)| (id, region.to_string(), val))
+ .collect();
+
+ let mut collected_records: Vec<(i32, String, i64)> = Vec::new();
+ let start_time = std::time::Instant::now();
+ while collected_records.len() < expected_records.len()
+ && start_time.elapsed() < Duration::from_secs(10)
+ {
+ let records = log_scanner
+ .poll(Duration::from_millis(500))
+ .await
+ .expect("Failed to poll log scanner");
+ for rec in records {
+ let row = rec.row();
+ collected_records.push((
+ row.get_int(0),
+ row.get_string(1).to_string(),
+ row.get_long(2),
+ ));
+ }
+ }
+
+ assert_eq!(
+ collected_records.len(),
+ expected_records.len(),
+ "Did not receive all records in time, expect receive {} records,
but got {} records",
+ expected_records.len(),
+ collected_records.len()
+ );
+ collected_records.sort_by_key(|r| r.0);
+ assert_eq!(
+ collected_records, expected_records,
+ "Data mismatch between sent and received"
+ );
+
admin
.drop_table(&table_path, false)
.await
.expect("Failed to drop table");
-
- // todo: add scan test in 203
}
}