This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f32411e  [chore] Move list offsets to admin (#35)
f32411e is described below

commit f32411e9fef40bb42b3237fffd029dd7def84681
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Oct 20 14:14:28 2025 +0800

    [chore] Move list offsets to admin (#35)
---
 bindings/python/src/table.rs             |  68 +++++++++++---------
 bindings/python/src/utils.rs             |   4 +-
 crates/fluss/src/client/admin.rs         | 104 ++++++++++++++++++++++++++++++-
 crates/fluss/src/client/metadata.rs      |   3 +-
 crates/fluss/src/client/table/mod.rs     |   2 +
 crates/fluss/src/client/table/scanner.rs | 101 ------------------------------
 crates/fluss/src/client/write/sender.rs  |   2 +-
 crates/fluss/src/record/mod.rs           |   4 ++
 crates/fluss/src/rpc/mod.rs              |   2 -
 9 files changed, 149 insertions(+), 141 deletions(-)

diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 98943b9..c255fa6 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -17,12 +17,11 @@
 
 use crate::TOKIO_RUNTIME;
 use crate::*;
+use fluss::client::EARLIEST_OFFSET;
+use fluss::rpc::message::OffsetSpec;
 use pyo3_async_runtimes::tokio::future_into_py;
-use std::collections::HashSet;
 use std::sync::Arc;
 
-const EARLIEST_OFFSET: i64 = -2;
-
 /// Represents a Fluss table for data operations
 #[pyclass]
 pub struct FlussTable {
@@ -70,8 +69,12 @@ impl FlussTable {
 
             let rust_scanner = table_scan.create_log_scanner();
 
-            let py_scanner = LogScanner::from_core(rust_scanner, 
table_info.clone());
+            let admin = conn
+                .get_admin()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
 
+            let py_scanner = LogScanner::from_core(rust_scanner, admin, 
table_info.clone());
             Python::with_gil(|py| Py::new(py, py_scanner))
         })
     }
@@ -275,6 +278,7 @@ impl AppendWriter {
 #[pyclass]
 pub struct LogScanner {
     inner: fcore::client::LogScanner,
+    admin: fcore::client::FlussAdmin,
     table_info: fcore::metadata::TableInfo,
     #[allow(dead_code)]
     start_timestamp: Option<i64>,
@@ -327,50 +331,50 @@ impl LogScanner {
         let bucket_ids: Vec<i32> = (0..num_buckets).collect();
 
         // todo: after supporting list_offsets with timestamp, we can use 
start_timestamp and end_timestamp here
-        let target_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
-            .block_on(async { self.inner.list_offsets_latest(bucket_ids).await 
})
+        let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
+            .block_on(async {
+                self.admin
+                    .list_offsets(
+                        &self.table_info.table_path,
+                        bucket_ids.as_slice(),
+                        OffsetSpec::Latest,
+                    )
+                    .await
+            })
             .map_err(|e| FlussError::new_err(e.to_string()))?;
 
-        let mut current_offsets: HashMap<i32, i64> = HashMap::new();
-        let mut completed_buckets: HashSet<i32> = HashSet::new();
-
-        if !target_offsets.is_empty() {
+        if !stopping_offsets.is_empty() {
             loop {
                 let batch_result = TOKIO_RUNTIME
                     .block_on(async { 
self.inner.poll(Duration::from_millis(500)).await });
 
                 match batch_result {
                     Ok(scan_records) => {
-                        let mut filtered_records: HashMap<
-                            fcore::metadata::TableBucket,
-                            Vec<fcore::record::ScanRecord>,
-                        > = HashMap::new();
-                        for (bucket, records) in 
scan_records.records_by_buckets() {
-                            let bucket_id = bucket.bucket_id();
-                            if completed_buckets.contains(&bucket_id) {
+                        let mut result_records: Vec<fcore::record::ScanRecord> 
= vec![];
+                        for (bucket, records) in 
scan_records.into_records_by_buckets() {
+                            let stopping_offset = 
stopping_offsets.get(&bucket.bucket_id());
+
+                            if stopping_offset.is_none() {
+                                // not to include this bucket, skip records 
for this bucket
+                                // since we already reach end offset for this 
bucket
                                 continue;
                             }
                             if let Some(last_record) = records.last() {
                                 let offset = last_record.offset();
-                                current_offsets.insert(bucket_id, offset);
-                                filtered_records.insert(bucket.clone(), 
records.clone());
-                                if offset >= target_offsets[&bucket_id] - 1 {
-                                    completed_buckets.insert(bucket_id);
+                                result_records.extend(records);
+                                if offset >= stopping_offset.unwrap() - 1 {
+                                    
stopping_offsets.remove(&bucket.bucket_id());
                                 }
                             }
                         }
 
-                        if !filtered_records.is_empty() {
-                            let filtered_scan_records =
-                                
fcore::record::ScanRecords::new(filtered_records);
-                            let arrow_batch =
-                                
Utils::convert_scan_records_to_arrow(filtered_scan_records);
+                        if !result_records.is_empty() {
+                            let arrow_batch = 
Utils::convert_scan_records_to_arrow(result_records);
                             all_batches.extend(arrow_batch);
                         }
 
-                        // completed bucket is equal to all target buckets,
-                        // we can break scan records
-                        if completed_buckets.len() == target_offsets.len() {
+                        // we have reach end offsets of all bucket
+                        if stopping_offsets.is_empty() {
                             break;
                         }
                     }
@@ -399,11 +403,13 @@ impl LogScanner {
 impl LogScanner {
     /// Create LogScanner from core LogScanner
     pub fn from_core(
-        inner: fcore::client::LogScanner,
+        inner_scanner: fcore::client::LogScanner,
+        admin: fcore::client::FlussAdmin,
         table_info: fcore::metadata::TableInfo,
     ) -> Self {
         Self {
-            inner,
+            inner: inner_scanner,
+            admin,
             table_info,
             start_timestamp: None,
             end_timestamp: None,
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index 9642e9d..93933b3 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -152,9 +152,9 @@ impl Utils {
             .map_err(|e| FlussError::new_err(format!("Invalid kv format 
'{format_str}': {e}")))
     }
 
-    /// Convert ScanRecords to Arrow RecordBatch
+    /// Convert Vec<ScanRecord> to Arrow RecordBatch
     pub fn convert_scan_records_to_arrow(
-        _scan_records: fcore::record::ScanRecords,
+        _scan_records: Vec<fcore::record::ScanRecord>,
     ) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
         let mut result = Vec::new();
         for record in _scan_records {
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index fd0f316..fefab43 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -25,13 +25,16 @@ use crate::rpc::message::{
     DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest, 
GetTableRequest,
     ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
 };
+use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
 use crate::rpc::{RpcClient, ServerConnection};
 
-use std::collections::HashMap;
-use std::sync::Arc;
-
+use crate::BucketId;
 use crate::error::Result;
 use crate::proto::GetTableInfoResponse;
+use std::collections::HashMap;
+use std::slice::from_ref;
+use std::sync::Arc;
+use tokio::task::JoinHandle;
 
 pub struct FlussAdmin {
     admin_gateway: ServerConnection,
@@ -216,4 +219,99 @@ impl FlussAdmin {
             table_buckets_offset,
         ))
     }
+
+    /// List offset for the specified buckets. This operation enables to find 
the beginning offset,
+    /// end offset as well as the offset matching a timestamp in buckets.
+    pub async fn list_offsets(
+        &self,
+        table_path: &TablePath,
+        buckets_id: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, i64>> {
+        self.metadata
+            .check_and_update_table_metadata(from_ref(table_path))
+            .await?;
+
+        let cluster = self.metadata.get_cluster();
+        let table_id = cluster.get_table(table_path).table_id;
+
+        // Prepare requests
+        let requests_by_server =
+            self.prepare_list_offsets_requests(table_id, None, buckets_id, 
offset_spec)?;
+
+        // Send Requests
+        let response_futures = 
self.send_list_offsets_request(requests_by_server).await?;
+
+        let mut results = HashMap::new();
+
+        for response_future in response_futures {
+            let offsets = response_future.await.map_err(
+                // todo: consider use suitable error
+                |e| crate::error::Error::WriteError(format!("Fail to get 
result: {e}")),
+            )?;
+            results.extend(offsets?);
+        }
+        Ok(results)
+    }
+
+    fn prepare_list_offsets_requests(
+        &self,
+        table_id: i64,
+        partition_id: Option<i64>,
+        buckets: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, ListOffsetsRequest>> {
+        let cluster = self.metadata.get_cluster();
+        let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
+
+        for bucket_id in buckets {
+            let table_bucket = TableBucket::new(table_id, *bucket_id);
+            let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
+                // todo: consider use another suitable error
+                crate::error::Error::InvalidTableError(format!(
+                    "No leader found for table bucket: table_id={table_id}, 
bucket_id={bucket_id}"
+                ))
+            })?;
+
+            node_for_bucket_list
+                .entry(leader.id())
+                .or_default()
+                .push(*bucket_id);
+        }
+
+        let mut list_offsets_requests = HashMap::new();
+        for (leader_id, bucket_ids) in node_for_bucket_list {
+            let request =
+                ListOffsetsRequest::new(table_id, partition_id, bucket_ids, 
offset_spec.clone());
+            list_offsets_requests.insert(leader_id, request);
+        }
+        Ok(list_offsets_requests)
+    }
+
+    async fn send_list_offsets_request(
+        &self,
+        request_map: HashMap<i32, ListOffsetsRequest>,
+    ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
+        let mut tasks = Vec::new();
+
+        for (leader_id, request) in request_map {
+            let rpc_client = self.rpc_client.clone();
+            let metadata = self.metadata.clone();
+
+            let task = tokio::spawn(async move {
+                let cluster = metadata.get_cluster();
+                let tablet_server = 
cluster.get_tablet_server(leader_id).ok_or_else(|| {
+                    // todo: consider use more suitable error
+                    crate::error::Error::InvalidTableError(format!(
+                        "Tablet server {leader_id} not found"
+                    ))
+                })?;
+                let connection = 
rpc_client.get_connection(tablet_server).await?;
+                let list_offsets_response = connection.request(request).await?;
+                list_offsets_response.offsets()
+            });
+            tasks.push(task);
+        }
+        Ok(tasks)
+    }
 }
diff --git a/crates/fluss/src/client/metadata.rs 
b/crates/fluss/src/client/metadata.rs
index ebfb959..3c3ba4b 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -17,7 +17,8 @@
 
 use crate::cluster::{Cluster, ServerNode, ServerType};
 use crate::metadata::{TableBucket, TablePath};
-use crate::rpc::{RpcClient, ServerConnection, UpdateMetadataRequest};
+use crate::rpc::message::UpdateMetadataRequest;
+use crate::rpc::{RpcClient, ServerConnection};
 use parking_lot::RwLock;
 use std::collections::HashSet;
 use std::net::SocketAddr;
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 07e6494..52ae700 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -22,6 +22,8 @@ use std::sync::Arc;
 
 use crate::error::Result;
 
+pub const EARLIEST_OFFSET: i64 = -2;
+
 mod append;
 
 mod scanner;
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index cbe7248..e1ab59f 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -22,14 +22,12 @@ use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
 use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
 use crate::rpc::RpcClient;
-use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
 use crate::util::FairBucketStatusMap;
 use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::slice::from_ref;
 use std::sync::Arc;
 use std::time::Duration;
-use tokio::task::JoinHandle;
 
 const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
 #[allow(dead_code)]
@@ -67,7 +65,6 @@ pub struct LogScanner {
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
     log_fetcher: LogFetcher,
-    conns: Arc<RpcClient>,
 }
 
 impl LogScanner {
@@ -88,7 +85,6 @@ impl LogScanner {
                 metadata.clone(),
                 log_scanner_status.clone(),
             ),
-            conns: connections.clone(),
         }
     }
 
@@ -106,103 +102,6 @@ impl LogScanner {
         Ok(())
     }
 
-    pub async fn list_offsets_latest(&self, buckets: Vec<i32>) -> 
Result<HashMap<i32, i64>> {
-        // TODO: support partition_id
-        let partition_id = None;
-        let offset_spec = OffsetSpec::Latest;
-
-        self.metadata
-            .check_and_update_table_metadata(from_ref(&self.table_path))
-            .await?;
-
-        let cluster = self.metadata.get_cluster();
-        let table_id = cluster.get_table(&self.table_path).table_id;
-
-        // Prepare requests
-        let requests_by_server = self.prepare_list_offsets_requests(
-            table_id,
-            partition_id,
-            buckets.clone(),
-            offset_spec,
-        )?;
-
-        // Send Requests
-        let response_futures = 
self.send_list_offsets_request(requests_by_server).await?;
-
-        let mut results = HashMap::new();
-
-        for response_future in response_futures {
-            let offsets = response_future.await.map_err(
-                // todo: consider use suitable error
-                |e| crate::error::Error::WriteError(format!("Fail to get 
result: {e}")),
-            )?;
-            results.extend(offsets?);
-        }
-        Ok(results)
-    }
-
-    fn prepare_list_offsets_requests(
-        &self,
-        table_id: i64,
-        partition_id: Option<i64>,
-        buckets: Vec<i32>,
-        offset_spec: OffsetSpec,
-    ) -> Result<HashMap<i32, ListOffsetsRequest>> {
-        let cluster = self.metadata.get_cluster();
-        let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
-
-        for bucket_id in buckets {
-            let table_bucket = TableBucket::new(table_id, bucket_id);
-            let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
-                // todo: consider use another suitable error
-                crate::error::Error::InvalidTableError(format!(
-                    "No leader found for table bucket: table_id={table_id}, 
bucket_id={bucket_id}"
-                ))
-            })?;
-
-            node_for_bucket_list
-                .entry(leader.id())
-                .or_default()
-                .push(bucket_id);
-        }
-
-        let mut list_offsets_requests = HashMap::new();
-        for (leader_id, bucket_ids) in node_for_bucket_list {
-            let request =
-                ListOffsetsRequest::new(table_id, partition_id, bucket_ids, 
offset_spec.clone());
-            list_offsets_requests.insert(leader_id, request);
-        }
-        Ok(list_offsets_requests)
-    }
-
-    async fn send_list_offsets_request(
-        &self,
-        request_map: HashMap<i32, ListOffsetsRequest>,
-    ) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
-        let mut tasks = Vec::new();
-
-        for (leader_id, request) in request_map {
-            let rpc_client = self.conns.clone();
-            let metadata = self.metadata.clone();
-
-            let task = tokio::spawn(async move {
-                let cluster = metadata.get_cluster();
-                let tablet_server = 
cluster.get_tablet_server(leader_id).ok_or_else(|| {
-                    // todo: consider use more suitable error
-                    crate::error::Error::InvalidTableError(format!(
-                        "Tablet server {leader_id} not found"
-                    ))
-                })?;
-                let connection = 
rpc_client.get_connection(tablet_server).await?;
-                let list_offsets_response = connection.request(request).await?;
-                list_offsets_response.offsets()
-            });
-            tasks.push(task);
-        }
-
-        Ok(tasks)
-    }
-
     async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
         self.log_fetcher.send_fetches_and_collect().await
     }
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index 381e10c..e25e2ba 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -21,7 +21,7 @@ use crate::error::Error::WriteError;
 use crate::error::Result;
 use crate::metadata::TableBucket;
 use crate::proto::ProduceLogResponse;
-use crate::rpc::ProduceLogRequest;
+use crate::rpc::message::ProduceLogRequest;
 use parking_lot::Mutex;
 use std::collections::HashMap;
 use std::sync::Arc;
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index 07fbe08..35928ea 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -163,6 +163,10 @@ impl ScanRecords {
     pub fn records_by_buckets(&self) -> &HashMap<TableBucket, Vec<ScanRecord>> 
{
         &self.records
     }
+
+    pub fn into_records_by_buckets(self) -> HashMap<TableBucket, 
Vec<ScanRecord>> {
+        self.records
+    }
 }
 
 impl IntoIterator for ScanRecords {
diff --git a/crates/fluss/src/rpc/mod.rs b/crates/fluss/src/rpc/mod.rs
index 496c015..b8705a3 100644
--- a/crates/fluss/src/rpc/mod.rs
+++ b/crates/fluss/src/rpc/mod.rs
@@ -26,6 +26,4 @@ pub use server_connection::*;
 mod convert;
 mod transport;
 
-pub use message::*;
-
 pub use convert::*;

Reply via email to