This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 79f16ac feat: support projection by names in CPP (#294)
79f16ac is described below
commit 79f16ace0ed4e4cbf6cc42c959ddf5f8d73514c9
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Feb 10 11:43:37 2026 +0000
feat: support projection by names in CPP (#294)
---
bindings/cpp/examples/example.cpp | 44 +++++++++++++++++++++++++++++++----
bindings/cpp/include/fluss.hpp | 6 ++++-
bindings/cpp/src/table.cpp | 48 +++++++++++++++++++++++++++++++--------
3 files changed, 84 insertions(+), 14 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index f568422..47087e5 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -192,12 +192,12 @@ int main() {
std::exit(1);
}
- // 7) Projected scan — project [id, updated_at(TimestampLtz)] to verify
- // NTZ/LTZ disambiguation works with column index remapping
+ // 7a) Projected scan by index — project [id, updated_at(TimestampLtz)] to
verify
+ // NTZ/LTZ disambiguation works with column index remapping
std::vector<size_t> projected_columns = {0, 7};
fluss::LogScanner projected_scanner;
check("new_log_scanner_with_projection",
-
table.NewScan().Project(projected_columns).CreateLogScanner(projected_scanner));
+
table.NewScan().ProjectByIndex(projected_columns).CreateLogScanner(projected_scanner));
for (int b = 0; b < buckets; ++b) {
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
@@ -229,6 +229,42 @@ int main() {
<< ts.nano_of_millisecond << "ns" << std::endl;
}
+ // 7b) Projected scan by column names — same columns as above but using
names
+ fluss::LogScanner name_projected_scanner;
+ check("project_by_name_scanner", table.NewScan()
+ .ProjectByName({"id", "updated_at"})
+
.CreateLogScanner(name_projected_scanner));
+
+ for (int b = 0; b < buckets; ++b) {
+ check("subscribe_name_projected", name_projected_scanner.Subscribe(b,
0));
+ }
+
+ fluss::ScanRecords name_projected_records;
+ check("poll_name_projected", name_projected_scanner.Poll(5000,
name_projected_records));
+
+ std::cout << "Name-projected records: " << name_projected_records.Size()
<< std::endl;
+ for (const auto& rec : name_projected_records.records) {
+ if (rec.row.FieldCount() != 2) {
+ std::cerr << "ERROR: expected 2 fields, got " <<
rec.row.FieldCount() << std::endl;
+ scan_ok = false;
+ continue;
+ }
+ if (rec.row.GetType(0) != fluss::DatumType::Int32) {
+ std::cerr << "ERROR: name-projected field 0 expected Int32, got "
+ << static_cast<int>(rec.row.GetType(0)) << std::endl;
+ scan_ok = false;
+ }
+ if (rec.row.GetType(1) != fluss::DatumType::TimestampLtz) {
+ std::cerr << "ERROR: name-projected field 1 expected TimestampLtz,
got "
+ << static_cast<int>(rec.row.GetType(1)) << std::endl;
+ scan_ok = false;
+ }
+
+ auto ts = rec.row.GetTimestamp(1);
+ std::cout << " id=" << rec.row.GetInt32(0) << " updated_at=" <<
ts.epoch_millis << "+"
+ << ts.nano_of_millisecond << "ns" << std::endl;
+ }
+
if (scan_ok) {
std::cout << "Scan verification passed!" << std::endl;
} else {
@@ -336,7 +372,7 @@ int main() {
fluss::LogScanner projected_arrow_scanner;
check("new_record_batch_log_scanner_with_projection",
table.NewScan()
- .Project(projected_columns)
+ .ProjectByIndex(projected_columns)
.CreateRecordBatchScanner(projected_arrow_scanner));
for (int b = 0; b < buckets; ++b) {
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index c635c81..41aae67 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -939,7 +939,8 @@ class TableScan {
TableScan(TableScan&&) noexcept = default;
TableScan& operator=(TableScan&&) noexcept = default;
- TableScan& Project(std::vector<size_t> column_indices);
+ TableScan& ProjectByIndex(std::vector<size_t> column_indices);
+ TableScan& ProjectByName(std::vector<std::string> column_names);
Result CreateLogScanner(LogScanner& out);
Result CreateRecordBatchScanner(LogScanner& out);
@@ -948,8 +949,11 @@ class TableScan {
friend class Table;
explicit TableScan(ffi::Table* table) noexcept;
+ std::vector<size_t> ResolveNameProjection() const;
+
ffi::Table* table_{nullptr};
std::vector<size_t> projection_;
+ std::vector<std::string> name_projection_;
};
class WriteResult {
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index 5b2f66c..a266363 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -129,25 +129,54 @@ TableScan Table::NewScan() { return TableScan(table_); }
// TableScan implementation
TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {}
-TableScan& TableScan::Project(std::vector<size_t> column_indices) {
+TableScan& TableScan::ProjectByIndex(std::vector<size_t> column_indices) {
projection_ = std::move(column_indices);
+ name_projection_.clear();
return *this;
}
+TableScan& TableScan::ProjectByName(std::vector<std::string> column_names) {
+ name_projection_ = std::move(column_names);
+ projection_.clear();
+ return *this;
+}
+
+std::vector<size_t> TableScan::ResolveNameProjection() const {
+ auto ffi_info = table_->get_table_info_from_table();
+ const auto& columns = ffi_info.schema.columns;
+
+ std::vector<size_t> indices;
+ for (const auto& name : name_projection_) {
+ bool found = false;
+ for (size_t i = 0; i < columns.size(); ++i) {
+ if (std::string(columns[i].name) == name) {
+ indices.push_back(i);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw std::runtime_error("Column '" + name + "' not found");
+ }
+ }
+ return indices;
+}
+
Result TableScan::CreateLogScanner(LogScanner& out) {
if (table_ == nullptr) {
return utils::make_error(1, "Table not available");
}
try {
- if (projection_.empty()) {
- out.scanner_ = table_->new_log_scanner();
- } else {
+ auto resolved_indices = !name_projection_.empty() ?
ResolveNameProjection() : projection_;
+ if (!resolved_indices.empty()) {
rust::Vec<size_t> rust_indices;
- for (size_t idx : projection_) {
+ for (size_t idx : resolved_indices) {
rust_indices.push_back(idx);
}
out.scanner_ =
table_->new_log_scanner_with_projection(std::move(rust_indices));
+ } else {
+ out.scanner_ = table_->new_log_scanner();
}
return utils::make_ok();
} catch (const rust::Error& e) {
@@ -163,15 +192,16 @@ Result TableScan::CreateRecordBatchScanner(LogScanner&
out) {
}
try {
- if (projection_.empty()) {
- out.scanner_ = table_->new_record_batch_log_scanner();
- } else {
+ auto resolved_indices = !name_projection_.empty() ?
ResolveNameProjection() : projection_;
+ if (!resolved_indices.empty()) {
rust::Vec<size_t> rust_indices;
- for (size_t idx : projection_) {
+ for (size_t idx : resolved_indices) {
rust_indices.push_back(idx);
}
out.scanner_ =
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
+ } else {
+ out.scanner_ = table_->new_record_batch_log_scanner();
}
return utils::make_ok();
} catch (const rust::Error& e) {