This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new b18cffb  chore: refator to builder pattern to create log scanner for 
CPP (#249)
b18cffb is described below

commit b18cffb30387ea9e091adac55d2b3c1db09e4d67
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Feb 5 23:09:36 2026 +0000

    chore: refator to builder pattern to create log scanner for CPP (#249)
---
 bindings/cpp/examples/example.cpp | 18 +++++------
 bindings/cpp/include/fluss.hpp    | 28 ++++++++++++++---
 bindings/cpp/src/table.cpp        | 66 +++++++++++++++++----------------------
 3 files changed, 61 insertions(+), 51 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 45f7f9e..f35f37e 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -104,7 +104,7 @@ int main() {
 
     // 6) Scan
     fluss::LogScanner scanner;
-    check("new_log_scanner", table.NewLogScanner(scanner));
+    check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));
 
     auto info = table.GetTableInfo();
     int buckets = info.num_buckets;
@@ -126,8 +126,8 @@ int main() {
     // 7) Project only id (0) and name (1) columns
     std::vector<size_t> projected_columns = {0, 1};
     fluss::LogScanner projected_scanner;
-    check("new_log_scanner_with_projection", 
-          table.NewLogScannerWithProjection(projected_columns, 
projected_scanner));
+    check("new_log_scanner_with_projection",
+          
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));
     
     for (int b = 0; b < buckets; ++b) {
         check("subscribe_projected", projected_scanner.Subscribe(b, 0));
@@ -226,7 +226,7 @@ int main() {
     // 8.4) Use batch subscribe with offsets from list_offsets
     std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
     fluss::LogScanner batch_scanner;
-    check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner));
+    check("new_log_scanner_for_batch", 
table.NewScan().CreateLogScanner(batch_scanner));
     
     std::vector<fluss::BucketSubscription> subscriptions;
     for (const auto& [bucket_id, offset] : earliest_offsets) {
@@ -255,9 +255,9 @@ int main() {
 
     // 9) Test the new Arrow record batch polling functionality
     std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
-    
+
     fluss::LogScanner arrow_scanner;
-    check("new_record_batch_log_scanner", 
table.NewRecordBatchLogScanner(arrow_scanner));
+    check("new_record_batch_log_scanner", 
table.NewScan().CreateRecordBatchScanner(arrow_scanner));
     
     // Subscribe to all buckets starting from offset 0
     for (int b = 0; b < buckets; ++b) {
@@ -279,10 +279,10 @@ int main() {
     
     // 10) Test the new Arrow record batch polling with projection
     std::cout << "\n=== Testing Arrow Record Batch Polling with Projection 
===" << std::endl;
-    
+
     fluss::LogScanner projected_arrow_scanner;
-    check("new_record_batch_log_scanner_with_projection", 
-          table.NewRecordBatchLogScannerWithProjection(projected_columns, 
projected_arrow_scanner));
+    check("new_record_batch_log_scanner_with_projection",
+          
table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner));
     
     // Subscribe to all buckets starting from offset 0
     for (int b = 0; b < buckets; ++b) {
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index d35ece2..901b90c 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -407,6 +407,7 @@ class AppendWriter;
 class LogScanner;
 class Admin;
 class Table;
+class TableScan;
 
 class Connection {
 public:
@@ -490,10 +491,7 @@ public:
     bool Available() const;
 
     Result NewAppendWriter(AppendWriter& out);
-    Result NewLogScanner(LogScanner& out);
-    Result NewLogScannerWithProjection(const std::vector<size_t>& 
column_indices, LogScanner& out);
-    Result NewRecordBatchLogScanner(LogScanner& out);
-    Result NewRecordBatchLogScannerWithProjection(const std::vector<size_t>& 
column_indices, LogScanner& out);
+    TableScan NewScan();
 
     TableInfo GetTableInfo() const;
     TablePath GetTablePath() const;
@@ -501,12 +499,33 @@ public:
 
 private:
     friend class Connection;
+    friend class TableScan;
     Table(ffi::Table* table) noexcept;
 
     void Destroy() noexcept;
     ffi::Table* table_{nullptr};
 };
 
+class TableScan {
+public:
+    TableScan(const TableScan&) = delete;
+    TableScan& operator=(const TableScan&) = delete;
+    TableScan(TableScan&&) noexcept = default;
+    TableScan& operator=(TableScan&&) noexcept = default;
+
+    TableScan& Project(std::vector<size_t> column_indices);
+
+    Result CreateLogScanner(LogScanner& out);
+    Result CreateRecordBatchScanner(LogScanner& out);
+
+private:
+    friend class Table;
+    explicit TableScan(ffi::Table* table) noexcept;
+
+    ffi::Table* table_{nullptr};
+    std::vector<size_t> projection_;
+};
+
 class AppendWriter {
 public:
     AppendWriter() noexcept;
@@ -550,6 +569,7 @@ public:
 
 private:
     friend class Table;
+    friend class TableScan;
     LogScanner(ffi::LogScanner* scanner) noexcept;
 
     void Destroy() noexcept;
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index b327dba..f943790 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -71,47 +71,33 @@ Result Table::NewAppendWriter(AppendWriter& out) {
     }
 }
 
-Result Table::NewLogScanner(LogScanner& out) {
-    if (!Available()) {
-        return utils::make_error(1, "Table not available");
-    }
-
-    try {
-        out.scanner_ = table_->new_log_scanner();
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_error(1, e.what());
-    } catch (const std::exception& e) {
-        return utils::make_error(1, e.what());
-    }
+TableScan Table::NewScan() {
+    return TableScan(table_);
 }
 
-Result Table::NewLogScannerWithProjection(const std::vector<size_t>& 
column_indices, LogScanner& out) {
-    if (!Available()) {
-        return utils::make_error(1, "Table not available");
-    }
+// TableScan implementation
+TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {}
 
-    try {
-        rust::Vec<size_t> rust_indices;
-        for (size_t idx : column_indices) {
-            rust_indices.push_back(idx);
-        }
-        out.scanner_ = 
table_->new_log_scanner_with_projection(std::move(rust_indices));
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_error(1, e.what());
-    } catch (const std::exception& e) {
-        return utils::make_error(1, e.what());
-    }
+TableScan& TableScan::Project(std::vector<size_t> column_indices) {
+    projection_ = std::move(column_indices);
+    return *this;
 }
 
-Result Table::NewRecordBatchLogScanner(LogScanner& out) {
-    if (!Available()) {
+Result TableScan::CreateLogScanner(LogScanner& out) {
+    if (table_ == nullptr) {
         return utils::make_error(1, "Table not available");
     }
 
     try {
-        out.scanner_ = table_->new_record_batch_log_scanner();
+        if (projection_.empty()) {
+            out.scanner_ = table_->new_log_scanner();
+        } else {
+            rust::Vec<size_t> rust_indices;
+            for (size_t idx : projection_) {
+                rust_indices.push_back(idx);
+            }
+            out.scanner_ = 
table_->new_log_scanner_with_projection(std::move(rust_indices));
+        }
         return utils::make_ok();
     } catch (const rust::Error& e) {
         return utils::make_error(1, e.what());
@@ -120,17 +106,21 @@ Result Table::NewRecordBatchLogScanner(LogScanner& out) {
     }
 }
 
-Result Table::NewRecordBatchLogScannerWithProjection(const 
std::vector<size_t>& column_indices, LogScanner& out) {
-    if (!Available()) {
+Result TableScan::CreateRecordBatchScanner(LogScanner& out) {
+    if (table_ == nullptr) {
         return utils::make_error(1, "Table not available");
     }
 
     try {
-        rust::Vec<size_t> rust_indices;
-        for (size_t idx : column_indices) {
-            rust_indices.push_back(idx);
+        if (projection_.empty()) {
+            out.scanner_ = table_->new_record_batch_log_scanner();
+        } else {
+            rust::Vec<size_t> rust_indices;
+            for (size_t idx : projection_) {
+                rust_indices.push_back(idx);
+            }
+            out.scanner_ = 
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
         }
-        out.scanner_ = 
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
         return utils::make_ok();
     } catch (const rust::Error& e) {
         return utils::make_error(1, e.what());

Reply via email to