This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b18cffb chore: refator to builder pattern to create log scanner for
CPP (#249)
b18cffb is described below
commit b18cffb30387ea9e091adac55d2b3c1db09e4d67
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Feb 5 23:09:36 2026 +0000
chore: refator to builder pattern to create log scanner for CPP (#249)
---
bindings/cpp/examples/example.cpp | 18 +++++------
bindings/cpp/include/fluss.hpp | 28 ++++++++++++++---
bindings/cpp/src/table.cpp | 66 +++++++++++++++++----------------------
3 files changed, 61 insertions(+), 51 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 45f7f9e..f35f37e 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -104,7 +104,7 @@ int main() {
// 6) Scan
fluss::LogScanner scanner;
- check("new_log_scanner", table.NewLogScanner(scanner));
+ check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));
auto info = table.GetTableInfo();
int buckets = info.num_buckets;
@@ -126,8 +126,8 @@ int main() {
// 7) Project only id (0) and name (1) columns
std::vector<size_t> projected_columns = {0, 1};
fluss::LogScanner projected_scanner;
- check("new_log_scanner_with_projection",
- table.NewLogScannerWithProjection(projected_columns,
projected_scanner));
+ check("new_log_scanner_with_projection",
+
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));
for (int b = 0; b < buckets; ++b) {
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
@@ -226,7 +226,7 @@ int main() {
// 8.4) Use batch subscribe with offsets from list_offsets
std::cout << "\n=== Batch Subscribe Example ===" << std::endl;
fluss::LogScanner batch_scanner;
- check("new_log_scanner_for_batch", table.NewLogScanner(batch_scanner));
+ check("new_log_scanner_for_batch",
table.NewScan().CreateLogScanner(batch_scanner));
std::vector<fluss::BucketSubscription> subscriptions;
for (const auto& [bucket_id, offset] : earliest_offsets) {
@@ -255,9 +255,9 @@ int main() {
// 9) Test the new Arrow record batch polling functionality
std::cout << "\n=== Testing Arrow Record Batch Polling ===" << std::endl;
-
+
fluss::LogScanner arrow_scanner;
- check("new_record_batch_log_scanner",
table.NewRecordBatchLogScanner(arrow_scanner));
+ check("new_record_batch_log_scanner",
table.NewScan().CreateRecordBatchScanner(arrow_scanner));
// Subscribe to all buckets starting from offset 0
for (int b = 0; b < buckets; ++b) {
@@ -279,10 +279,10 @@ int main() {
// 10) Test the new Arrow record batch polling with projection
std::cout << "\n=== Testing Arrow Record Batch Polling with Projection
===" << std::endl;
-
+
fluss::LogScanner projected_arrow_scanner;
- check("new_record_batch_log_scanner_with_projection",
- table.NewRecordBatchLogScannerWithProjection(projected_columns,
projected_arrow_scanner));
+ check("new_record_batch_log_scanner_with_projection",
+
table.NewScan().Project(projected_columns).CreateRecordBatchScanner(projected_arrow_scanner));
// Subscribe to all buckets starting from offset 0
for (int b = 0; b < buckets; ++b) {
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index d35ece2..901b90c 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -407,6 +407,7 @@ class AppendWriter;
class LogScanner;
class Admin;
class Table;
+class TableScan;
class Connection {
public:
@@ -490,10 +491,7 @@ public:
bool Available() const;
Result NewAppendWriter(AppendWriter& out);
- Result NewLogScanner(LogScanner& out);
- Result NewLogScannerWithProjection(const std::vector<size_t>&
column_indices, LogScanner& out);
- Result NewRecordBatchLogScanner(LogScanner& out);
- Result NewRecordBatchLogScannerWithProjection(const std::vector<size_t>&
column_indices, LogScanner& out);
+ TableScan NewScan();
TableInfo GetTableInfo() const;
TablePath GetTablePath() const;
@@ -501,12 +499,33 @@ public:
private:
friend class Connection;
+ friend class TableScan;
Table(ffi::Table* table) noexcept;
void Destroy() noexcept;
ffi::Table* table_{nullptr};
};
+class TableScan {
+public:
+ TableScan(const TableScan&) = delete;
+ TableScan& operator=(const TableScan&) = delete;
+ TableScan(TableScan&&) noexcept = default;
+ TableScan& operator=(TableScan&&) noexcept = default;
+
+ TableScan& Project(std::vector<size_t> column_indices);
+
+ Result CreateLogScanner(LogScanner& out);
+ Result CreateRecordBatchScanner(LogScanner& out);
+
+private:
+ friend class Table;
+ explicit TableScan(ffi::Table* table) noexcept;
+
+ ffi::Table* table_{nullptr};
+ std::vector<size_t> projection_;
+};
+
class AppendWriter {
public:
AppendWriter() noexcept;
@@ -550,6 +569,7 @@ public:
private:
friend class Table;
+ friend class TableScan;
LogScanner(ffi::LogScanner* scanner) noexcept;
void Destroy() noexcept;
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index b327dba..f943790 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -71,47 +71,33 @@ Result Table::NewAppendWriter(AppendWriter& out) {
}
}
-Result Table::NewLogScanner(LogScanner& out) {
- if (!Available()) {
- return utils::make_error(1, "Table not available");
- }
-
- try {
- out.scanner_ = table_->new_log_scanner();
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_error(1, e.what());
- } catch (const std::exception& e) {
- return utils::make_error(1, e.what());
- }
+TableScan Table::NewScan() {
+ return TableScan(table_);
}
-Result Table::NewLogScannerWithProjection(const std::vector<size_t>&
column_indices, LogScanner& out) {
- if (!Available()) {
- return utils::make_error(1, "Table not available");
- }
+// TableScan implementation
+TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {}
- try {
- rust::Vec<size_t> rust_indices;
- for (size_t idx : column_indices) {
- rust_indices.push_back(idx);
- }
- out.scanner_ =
table_->new_log_scanner_with_projection(std::move(rust_indices));
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_error(1, e.what());
- } catch (const std::exception& e) {
- return utils::make_error(1, e.what());
- }
+TableScan& TableScan::Project(std::vector<size_t> column_indices) {
+ projection_ = std::move(column_indices);
+ return *this;
}
-Result Table::NewRecordBatchLogScanner(LogScanner& out) {
- if (!Available()) {
+Result TableScan::CreateLogScanner(LogScanner& out) {
+ if (table_ == nullptr) {
return utils::make_error(1, "Table not available");
}
try {
- out.scanner_ = table_->new_record_batch_log_scanner();
+ if (projection_.empty()) {
+ out.scanner_ = table_->new_log_scanner();
+ } else {
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : projection_) {
+ rust_indices.push_back(idx);
+ }
+ out.scanner_ =
table_->new_log_scanner_with_projection(std::move(rust_indices));
+ }
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
@@ -120,17 +106,21 @@ Result Table::NewRecordBatchLogScanner(LogScanner& out) {
}
}
-Result Table::NewRecordBatchLogScannerWithProjection(const
std::vector<size_t>& column_indices, LogScanner& out) {
- if (!Available()) {
+Result TableScan::CreateRecordBatchScanner(LogScanner& out) {
+ if (table_ == nullptr) {
return utils::make_error(1, "Table not available");
}
try {
- rust::Vec<size_t> rust_indices;
- for (size_t idx : column_indices) {
- rust_indices.push_back(idx);
+ if (projection_.empty()) {
+ out.scanner_ = table_->new_record_batch_log_scanner();
+ } else {
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : projection_) {
+ rust_indices.push_back(idx);
+ }
+ out.scanner_ =
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
}
- out.scanner_ =
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());