This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 10637b7 chore: Builder pattern, cleanup, consistent API (#296)
10637b7 is described below
commit 10637b7f13a3c18550e8c27d7ac8e49e36e34ab7
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 11 04:03:10 2026 +0000
chore: Builder pattern, cleanup, consistent API (#296)
---
bindings/cpp/examples/example.cpp | 6 +-
bindings/cpp/examples/kv_example.cpp | 109 ++++++-------
bindings/cpp/include/fluss.hpp | 72 ++++++++-
bindings/cpp/src/admin.cpp | 49 +++---
bindings/cpp/src/connection.cpp | 6 +-
bindings/cpp/src/lib.rs | 295 ++++++++++-------------------------
bindings/cpp/src/table.cpp | 190 +++++++++++-----------
7 files changed, 328 insertions(+), 399 deletions(-)
diff --git a/bindings/cpp/examples/example.cpp
b/bindings/cpp/examples/example.cpp
index 47087e5..59f1ed0 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -79,7 +79,7 @@ int main() {
// 5) Write rows with scalar and temporal values
fluss::AppendWriter writer;
- check("new_append_writer", table.NewAppendWriter(writer));
+ check("new_append_writer", table.NewAppend().CreateWriter(writer));
struct RowData {
int id;
@@ -423,7 +423,7 @@ int main() {
check("get_decimal_table", conn.GetTable(decimal_table_path,
decimal_table));
fluss::AppendWriter decimal_writer;
- check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer));
+ check("new_decimal_writer",
decimal_table.NewAppend().CreateWriter(decimal_writer));
// Just provide the value — Rust resolves (p,s) from schema
{
@@ -512,7 +512,7 @@ int main() {
check("get_partitioned_table", conn.GetTable(partitioned_table_path,
partitioned_table));
fluss::AppendWriter partitioned_writer;
- check("new_partitioned_writer",
partitioned_table.NewAppendWriter(partitioned_writer));
+ check("new_partitioned_writer",
partitioned_table.NewAppend().CreateWriter(partitioned_writer));
struct PartitionedRow {
int id;
diff --git a/bindings/cpp/examples/kv_example.cpp
b/bindings/cpp/examples/kv_example.cpp
index daebfb2..2a40db3 100644
--- a/bindings/cpp/examples/kv_example.cpp
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -76,7 +76,7 @@ int main() {
// - Set("last_seen", ts) auto-routes to SetTimestampLtz (schema-aware)
std::cout << "\n--- Upsert Rows ---" << std::endl;
fluss::UpsertWriter upsert_writer;
- check("new_upsert_writer", kv_table.NewUpsertWriter(upsert_writer));
+ check("new_upsert_writer",
kv_table.NewUpsert().CreateWriter(upsert_writer));
// Fire-and-forget upserts
{
@@ -130,7 +130,7 @@ int main() {
// 4) Lookup by primary key — verify all types round-trip
std::cout << "\n--- Lookup by Primary Key ---" << std::endl;
fluss::Lookuper lookuper;
- check("new_lookuper", kv_table.NewLookuper(lookuper));
+ check("new_lookuper", kv_table.NewLookup().CreateLookuper(lookuper));
// Lookup existing key
{
@@ -242,9 +242,9 @@ int main() {
// 7) Partial update by column names
std::cout << "\n--- Partial Update by Column Names ---" << std::endl;
fluss::UpsertWriter partial_writer;
- check("new_partial_upsert_writer",
- kv_table.NewUpsertWriter(partial_writer,
- std::vector<std::string>{"user_id",
"balance", "last_seen"}));
+ check("new_partial_upsert_writer", kv_table.NewUpsert()
+ .PartialUpdateByName({"user_id",
"balance", "last_seen"})
+ .CreateWriter(partial_writer));
{
auto row = kv_table.NewRow();
@@ -282,7 +282,7 @@ int main() {
fluss::UpsertWriter partial_writer_idx;
// Columns: 0=user_id (PK), 1=name — update name only
check("new_partial_upsert_writer_idx",
- kv_table.NewUpsertWriter(partial_writer_idx, std::vector<size_t>{0,
1}));
+ kv_table.NewUpsert().PartialUpdateByIndex({0,
1}).CreateWriter(partial_writer_idx));
{
// Index-based setters: lighter than name-based, useful for hot paths
@@ -321,37 +321,39 @@ int main() {
// 9) Partitioned KV table
std::cout << "\n--- Partitioned KV Table ---" << std::endl;
- fluss::TablePath part_kv_path("fluss", "partitioned_kv_cpp_v1");
- admin.DropTable(part_kv_path, true);
-
- auto part_kv_schema = fluss::Schema::NewBuilder()
- .AddColumn("region", fluss::DataType::String())
- .AddColumn("user_id", fluss::DataType::Int())
- .AddColumn("name", fluss::DataType::String())
- .AddColumn("score", fluss::DataType::BigInt())
- .SetPrimaryKeys({"region", "user_id"})
- .Build();
-
- auto part_kv_descriptor = fluss::TableDescriptor::NewBuilder()
- .SetSchema(part_kv_schema)
- .SetPartitionKeys({"region"})
- .SetComment("partitioned kv table example")
- .Build();
-
- check("create_part_kv", admin.CreateTable(part_kv_path,
part_kv_descriptor, false));
+ fluss::TablePath partitioned_kv_path("fluss", "partitioned_kv_cpp_v1");
+ admin.DropTable(partitioned_kv_path, true);
+
+ auto partitioned_kv_schema = fluss::Schema::NewBuilder()
+ .AddColumn("region",
fluss::DataType::String())
+ .AddColumn("user_id",
fluss::DataType::Int())
+ .AddColumn("name",
fluss::DataType::String())
+ .AddColumn("score",
fluss::DataType::BigInt())
+ .SetPrimaryKeys({"region", "user_id"})
+ .Build();
+
+ auto partitioned_kv_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(partitioned_kv_schema)
+ .SetPartitionKeys({"region"})
+ .SetComment("partitioned kv table
example")
+ .Build();
+
+ check("create_partitioned_kv",
+ admin.CreateTable(partitioned_kv_path, partitioned_kv_descriptor,
false));
std::cout << "Created partitioned KV table" << std::endl;
// Create partitions
- check("create_US", admin.CreatePartition(part_kv_path, {{"region",
"US"}}));
- check("create_EU", admin.CreatePartition(part_kv_path, {{"region",
"EU"}}));
- check("create_APAC", admin.CreatePartition(part_kv_path, {{"region",
"APAC"}}));
+ check("create_US", admin.CreatePartition(partitioned_kv_path, {{"region",
"US"}}));
+ check("create_EU", admin.CreatePartition(partitioned_kv_path, {{"region",
"EU"}}));
+ check("create_APAC", admin.CreatePartition(partitioned_kv_path,
{{"region", "APAC"}}));
std::cout << "Created partitions: US, EU, APAC" << std::endl;
- fluss::Table part_kv_table;
- check("get_part_kv_table", conn.GetTable(part_kv_path, part_kv_table));
+ fluss::Table partitioned_kv_table;
+ check("get_partitioned_kv_table", conn.GetTable(partitioned_kv_path,
partitioned_kv_table));
- fluss::UpsertWriter part_writer;
- check("new_part_writer", part_kv_table.NewUpsertWriter(part_writer));
+ fluss::UpsertWriter partitioned_writer;
+ check("new_partitioned_writer",
+ partitioned_kv_table.NewUpsert().CreateWriter(partitioned_writer));
// Upsert rows across partitions
struct TestRow {
@@ -366,28 +368,29 @@ int main() {
};
for (const auto& td : test_data) {
- auto row = part_kv_table.NewRow();
+ auto row = partitioned_kv_table.NewRow();
row.Set("region", td.region);
row.Set("user_id", td.user_id);
row.Set("name", td.name);
row.Set("score", td.score);
- check("part_upsert", part_writer.Upsert(row));
+ check("partitioned_upsert", partitioned_writer.Upsert(row));
}
- check("part_flush", part_writer.Flush());
+ check("partitioned_flush", partitioned_writer.Flush());
std::cout << "Upserted 5 rows across 3 partitions" << std::endl;
// Lookup all rows
- fluss::Lookuper part_lookuper;
- check("new_part_lookuper", part_kv_table.NewLookuper(part_lookuper));
+ fluss::Lookuper partitioned_lookuper;
+ check("new_partitioned_lookuper",
+
partitioned_kv_table.NewLookup().CreateLookuper(partitioned_lookuper));
for (const auto& td : test_data) {
- auto pk = part_kv_table.NewRow();
+ auto pk = partitioned_kv_table.NewRow();
pk.Set("region", td.region);
pk.Set("user_id", td.user_id);
bool found = false;
fluss::GenericRow result;
- check("part_lookup", part_lookuper.Lookup(pk, found, result));
+ check("partitioned_lookup", partitioned_lookuper.Lookup(pk, found,
result));
if (!found) {
std::cerr << "ERROR: Expected to find region=" << td.region << "
user_id=" << td.user_id
<< std::endl;
@@ -403,22 +406,22 @@ int main() {
// Update within a partition
{
- auto row = part_kv_table.NewRow();
+ auto row = partitioned_kv_table.NewRow();
row.Set("region", "US");
row.Set("user_id", 1);
row.Set("name", "Gustave Updated");
row.Set("score", static_cast<int64_t>(999));
fluss::WriteResult wr;
- check("part_update", part_writer.Upsert(row, wr));
- check("part_update_wait", wr.Wait());
+ check("partitioned_update", partitioned_writer.Upsert(row, wr));
+ check("partitioned_update_wait", wr.Wait());
}
{
- auto pk = part_kv_table.NewRow();
+ auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "US");
pk.Set("user_id", 1);
bool found = false;
fluss::GenericRow result;
- check("part_lookup_updated", part_lookuper.Lookup(pk, found, result));
+ check("partitioned_lookup_updated", partitioned_lookuper.Lookup(pk,
found, result));
if (!found || result.GetString(2) != "Gustave Updated" ||
result.GetInt64(3) != 999) {
std::cerr << "ERROR: Partition update verification failed" <<
std::endl;
std::exit(1);
@@ -429,12 +432,12 @@ int main() {
// Lookup in non-existent partition
{
- auto pk = part_kv_table.NewRow();
+ auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "UNKNOWN");
pk.Set("user_id", 1);
bool found = false;
fluss::GenericRow result;
- check("part_lookup_unknown", part_lookuper.Lookup(pk, found, result));
+ check("partitioned_lookup_unknown", partitioned_lookuper.Lookup(pk,
found, result));
if (found) {
std::cerr << "ERROR: Expected UNKNOWN partition lookup to return
not found"
<< std::endl;
@@ -445,20 +448,20 @@ int main() {
// Delete within a partition
{
- auto pk = part_kv_table.NewRow();
+ auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "EU");
pk.Set("user_id", 1);
fluss::WriteResult wr;
- check("part_delete", part_writer.Delete(pk, wr));
- check("part_delete_wait", wr.Wait());
+ check("partitioned_delete", partitioned_writer.Delete(pk, wr));
+ check("partitioned_delete_wait", wr.Wait());
}
{
- auto pk = part_kv_table.NewRow();
+ auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "EU");
pk.Set("user_id", 1);
bool found = false;
fluss::GenericRow result;
- check("part_lookup_deleted", part_lookuper.Lookup(pk, found, result));
+ check("partitioned_lookup_deleted", partitioned_lookuper.Lookup(pk,
found, result));
if (found) {
std::cerr << "ERROR: Expected EU/1 to be deleted" << std::endl;
std::exit(1);
@@ -468,12 +471,12 @@ int main() {
// Verify other record in same partition still exists
{
- auto pk = part_kv_table.NewRow();
+ auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "EU");
pk.Set("user_id", 2);
bool found = false;
fluss::GenericRow result;
- check("part_lookup_eu2", part_lookuper.Lookup(pk, found, result));
+ check("partitioned_lookup_eu2", partitioned_lookuper.Lookup(pk, found,
result));
if (!found || result.GetString(2) != "Maelle") {
std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" <<
std::endl;
std::exit(1);
@@ -481,7 +484,7 @@ int main() {
std::cout << "EU/2 still exists: name=" << result.GetString(2) <<
std::endl;
}
- check("drop_part_kv", admin.DropTable(part_kv_path, true));
+ check("drop_partitioned_kv", admin.DropTable(partitioned_kv_path, true));
std::cout << "\nKV table example completed successfully!" << std::endl;
return 0;
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 41aae67..1806616 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -804,6 +804,9 @@ class WriteResult;
class LogScanner;
class Admin;
class Table;
+class TableAppend;
+class TableUpsert;
+class TableLookup;
class TableScan;
class Connection {
@@ -909,11 +912,9 @@ class Table {
GenericRow NewRow() const;
- Result NewAppendWriter(AppendWriter& out);
- Result NewUpsertWriter(UpsertWriter& out);
- Result NewUpsertWriter(UpsertWriter& out, const std::vector<std::string>&
column_names);
- Result NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>&
column_indices);
- Result NewLookuper(Lookuper& out);
+ TableAppend NewAppend();
+ TableUpsert NewUpsert();
+ TableLookup NewLookup();
TableScan NewScan();
TableInfo GetTableInfo() const;
@@ -922,6 +923,9 @@ class Table {
private:
friend class Connection;
+ friend class TableAppend;
+ friend class TableUpsert;
+ friend class TableLookup;
friend class TableScan;
Table(ffi::Table* table) noexcept;
@@ -932,6 +936,61 @@ class Table {
mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
};
+class TableAppend {
+ public:
+ TableAppend(const TableAppend&) = delete;
+ TableAppend& operator=(const TableAppend&) = delete;
+ TableAppend(TableAppend&&) noexcept = default;
+ TableAppend& operator=(TableAppend&&) noexcept = default;
+
+ Result CreateWriter(AppendWriter& out);
+
+ private:
+ friend class Table;
+ explicit TableAppend(ffi::Table* table) noexcept;
+
+ ffi::Table* table_{nullptr};
+};
+
+class TableUpsert {
+ public:
+ TableUpsert(const TableUpsert&) = delete;
+ TableUpsert& operator=(const TableUpsert&) = delete;
+ TableUpsert(TableUpsert&&) noexcept = default;
+ TableUpsert& operator=(TableUpsert&&) noexcept = default;
+
+ TableUpsert& PartialUpdateByIndex(std::vector<size_t> column_indices);
+ TableUpsert& PartialUpdateByName(std::vector<std::string> column_names);
+
+ Result CreateWriter(UpsertWriter& out);
+
+ private:
+ friend class Table;
+ explicit TableUpsert(ffi::Table* table) noexcept;
+
+ std::vector<size_t> ResolveNameProjection() const;
+
+ ffi::Table* table_{nullptr};
+ std::vector<size_t> column_indices_;
+ std::vector<std::string> column_names_;
+};
+
+class TableLookup {
+ public:
+ TableLookup(const TableLookup&) = delete;
+ TableLookup& operator=(const TableLookup&) = delete;
+ TableLookup(TableLookup&&) noexcept = default;
+ TableLookup& operator=(TableLookup&&) noexcept = default;
+
+ Result CreateLookuper(Lookuper& out);
+
+ private:
+ friend class Table;
+ explicit TableLookup(ffi::Table* table) noexcept;
+
+ ffi::Table* table_{nullptr};
+};
+
class TableScan {
public:
TableScan(const TableScan&) = delete;
@@ -999,6 +1058,7 @@ class AppendWriter {
private:
friend class Table;
+ friend class TableAppend;
AppendWriter(ffi::AppendWriter* writer) noexcept;
void Destroy() noexcept;
@@ -1025,6 +1085,7 @@ class UpsertWriter {
private:
friend class Table;
+ friend class TableUpsert;
UpsertWriter(ffi::UpsertWriter* writer) noexcept;
void Destroy() noexcept;
ffi::UpsertWriter* writer_{nullptr};
@@ -1046,6 +1107,7 @@ class Lookuper {
private:
friend class Table;
+ friend class TableLookup;
Lookuper(ffi::Lookuper* lookuper) noexcept;
void Destroy() noexcept;
ffi::Lookuper* lookuper_{nullptr};
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
index 77c95d3..7925256 100644
--- a/bindings/cpp/src/admin.cpp
+++ b/bindings/cpp/src/admin.cpp
@@ -17,9 +17,9 @@
* under the License.
*/
+#include "ffi_converter.hpp"
#include "fluss.hpp"
#include "lib.rs.h"
-#include "ffi_converter.hpp"
#include "rust/cxx.h"
namespace fluss {
@@ -37,9 +37,7 @@ void Admin::Destroy() noexcept {
}
}
-Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) {
- other.admin_ = nullptr;
-}
+Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) { other.admin_ =
nullptr; }
Admin& Admin::operator=(Admin&& other) noexcept {
if (this != &other) {
@@ -52,8 +50,7 @@ Admin& Admin::operator=(Admin&& other) noexcept {
bool Admin::Available() const { return admin_ != nullptr; }
-Result Admin::CreateTable(const TablePath& table_path,
- const TableDescriptor& descriptor,
+Result Admin::CreateTable(const TablePath& table_path, const TableDescriptor&
descriptor,
bool ignore_if_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
@@ -109,17 +106,16 @@ Result Admin::GetLatestLakeSnapshot(const TablePath&
table_path, LakeSnapshot& o
}
// function for common list offsets functionality
-Result Admin::DoListOffsets(const TablePath& table_path,
- const std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
- std::unordered_map<int32_t, int64_t>& out,
- const std::string* partition_name) {
+Result Admin::DoListOffsets(const TablePath& table_path, const
std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out,
+ const std::string* partition_name) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_path = utils::to_ffi_table_path(table_path);
-
+
rust::Vec<int32_t> rust_bucket_ids;
for (int32_t id : bucket_ids) {
rust_bucket_ids.push_back(id);
@@ -131,11 +127,12 @@ Result Admin::DoListOffsets(const TablePath& table_path,
ffi::FfiListOffsetsResult ffi_result;
if (partition_name != nullptr) {
- ffi_result = admin_->list_partition_offsets(ffi_path,
rust::String(*partition_name), std::move(rust_bucket_ids), ffi_query);
+ ffi_result = admin_->list_partition_offsets(ffi_path,
rust::String(*partition_name),
+
std::move(rust_bucket_ids), ffi_query);
} else {
ffi_result = admin_->list_offsets(ffi_path,
std::move(rust_bucket_ids), ffi_query);
}
-
+
auto result = utils::from_ffi_result(ffi_result.result);
if (result.Ok()) {
out.clear();
@@ -147,23 +144,20 @@ Result Admin::DoListOffsets(const TablePath& table_path,
return result;
}
-Result Admin::ListOffsets(const TablePath& table_path,
- const std::vector<int32_t>& bucket_ids,
+Result Admin::ListOffsets(const TablePath& table_path, const
std::vector<int32_t>& bucket_ids,
const OffsetQuery& offset_query,
std::unordered_map<int32_t, int64_t>& out) {
return DoListOffsets(table_path, bucket_ids, offset_query, out);
}
-Result Admin::ListPartitionOffsets(const TablePath& table_path,
- const std::string& partition_name,
- const std::vector<int32_t>& bucket_ids,
- const OffsetQuery& offset_query,
- std::unordered_map<int32_t, int64_t>& out) {
+Result Admin::ListPartitionOffsets(const TablePath& table_path, const
std::string& partition_name,
+ const std::vector<int32_t>& bucket_ids,
+ const OffsetQuery& offset_query,
+ std::unordered_map<int32_t, int64_t>& out) {
return DoListOffsets(table_path, bucket_ids, offset_query, out,
&partition_name);
}
-Result Admin::ListPartitionInfos(const TablePath& table_path,
- std::vector<PartitionInfo>& out) {
+Result Admin::ListPartitionInfos(const TablePath& table_path,
std::vector<PartitionInfo>& out) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
@@ -221,21 +215,18 @@ Result Admin::DropPartition(const TablePath& table_path,
rust_spec.push_back(std::move(kv));
}
- auto ffi_result =
- admin_->drop_partition(ffi_path, std::move(rust_spec),
ignore_if_not_exists);
+ auto ffi_result = admin_->drop_partition(ffi_path, std::move(rust_spec),
ignore_if_not_exists);
return utils::from_ffi_result(ffi_result);
}
-Result Admin::CreateDatabase(const std::string& database_name,
- const DatabaseDescriptor& descriptor,
+Result Admin::CreateDatabase(const std::string& database_name, const
DatabaseDescriptor& descriptor,
bool ignore_if_exists) {
if (!Available()) {
return utils::make_error(1, "Admin not available");
}
auto ffi_desc = utils::to_ffi_database_descriptor(descriptor);
- auto ffi_result =
- admin_->create_database(rust::Str(database_name), ffi_desc,
ignore_if_exists);
+ auto ffi_result = admin_->create_database(rust::Str(database_name),
ffi_desc, ignore_if_exists);
return utils::from_ffi_result(ffi_result);
}
diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
index ea884cd..4fbfafb 100644
--- a/bindings/cpp/src/connection.cpp
+++ b/bindings/cpp/src/connection.cpp
@@ -17,9 +17,9 @@
* under the License.
*/
+#include "ffi_converter.hpp"
#include "fluss.hpp"
#include "lib.rs.h"
-#include "ffi_converter.hpp"
#include "rust/cxx.h"
namespace fluss {
@@ -35,9 +35,7 @@ void Connection::Destroy() noexcept {
}
}
-Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) {
- other.conn_ = nullptr;
-}
+Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) {
other.conn_ = nullptr; }
Connection& Connection::operator=(Connection&& other) noexcept {
if (this != &other) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index ee7f1d8..9b1b5ef 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -325,25 +325,15 @@ mod ffi {
// Table
unsafe fn delete_table(table: *mut Table);
fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>;
- fn new_log_scanner(self: &Table) -> Result<*mut LogScanner>;
- fn new_log_scanner_with_projection(
- self: &Table,
- column_indices: Vec<usize>,
- ) -> Result<*mut LogScanner>;
- fn new_record_batch_log_scanner(self: &Table) -> Result<*mut
LogScanner>;
- fn new_record_batch_log_scanner_with_projection(
+ fn create_scanner(
self: &Table,
column_indices: Vec<usize>,
+ batch: bool,
) -> Result<*mut LogScanner>;
fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
fn get_table_path(self: &Table) -> FfiTablePath;
fn has_primary_key(self: &Table) -> bool;
- fn new_upsert_writer(self: &Table) -> Result<*mut UpsertWriter>;
- fn new_upsert_writer_with_column_names(
- self: &Table,
- column_names: Vec<String>,
- ) -> Result<*mut UpsertWriter>;
- fn new_upsert_writer_with_column_indices(
+ fn create_upsert_writer(
self: &Table,
column_indices: Vec<usize>,
) -> Result<*mut UpsertWriter>;
@@ -919,153 +909,86 @@ unsafe fn delete_table(table: *mut Table) {
}
impl Table {
- fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
- let _enter = RUNTIME.enter();
-
- let fluss_table = fcore::client::FlussTable::new(
+ fn fluss_table(&self) -> fcore::client::FlussTable<'_> {
+ fcore::client::FlussTable::new(
&self.connection,
self.metadata.clone(),
self.table_info.clone(),
- );
-
- let table_append = match fluss_table.new_append() {
- Ok(a) => a,
- Err(e) => return Err(format!("Failed to create append: {e}")),
- };
-
- let writer = match table_append.create_writer() {
- Ok(w) => w,
- Err(e) => return Err(format!("Failed to create writer: {e}")),
- };
- let writer = Box::into_raw(Box::new(AppendWriter {
- inner: writer,
- table_info: self.table_info.clone(),
- }));
- Ok(writer)
+ )
}
- fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
- RUNTIME.block_on(async {
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let scanner = fluss_table
- .new_scan()
- .create_log_scanner()
- .map_err(|e| format!("Failed to create log scanner: {e}"))?;
-
- let scanner_ptr = Box::into_raw(Box::new(LogScanner {
- inner: Some(scanner),
- inner_batch: None,
- projected_columns:
self.table_info.get_schema().columns().to_vec(),
- }));
-
- Ok(scanner_ptr)
- })
- }
-
- fn new_log_scanner_with_projection(
+ fn resolve_projected_columns(
&self,
- column_indices: Vec<usize>,
- ) -> Result<*mut LogScanner, String> {
- RUNTIME.block_on(async {
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let all_columns = self.table_info.get_schema().columns();
- let projected_columns: Vec<_> = column_indices
- .iter()
- .map(|&i| {
- all_columns.get(i).cloned().ok_or_else(|| {
- format!(
- "Invalid column index {i}: schema has {} columns",
- all_columns.len()
- )
- })
+ indices: &[usize],
+ ) -> Result<Vec<fcore::metadata::Column>, String> {
+ let all_columns = self.table_info.get_schema().columns();
+ indices
+ .iter()
+ .map(|&i| {
+ all_columns.get(i).cloned().ok_or_else(|| {
+ format!(
+ "Invalid column index {i}: schema has {} columns",
+ all_columns.len()
+ )
})
- .collect::<Result<_, String>>()?;
-
- let log_scanner = fluss_table
- .new_scan()
- .project(&column_indices)
- .map_err(|e| format!("Failed to project columns: {e}"))?
- .create_log_scanner()
- .map_err(|e| format!("Failed to create log scanner: {e}"))?;
-
- let scanner = Box::into_raw(Box::new(LogScanner {
- inner: Some(log_scanner),
- inner_batch: None,
- projected_columns,
- }));
- Ok(scanner)
- })
+ })
+ .collect()
}
- fn new_record_batch_log_scanner(&self) -> Result<*mut LogScanner, String> {
- RUNTIME.block_on(async {
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let batch_scanner = fluss_table
- .new_scan()
- .create_record_batch_log_scanner()
- .map_err(|e| format!("Failed to create record batch log
scanner: {e}"))?;
-
- let scanner = Box::into_raw(Box::new(LogScanner {
- inner: None,
- inner_batch: Some(batch_scanner),
- projected_columns:
self.table_info.get_schema().columns().to_vec(),
- }));
- Ok(scanner)
- })
+ fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
+ let _enter = RUNTIME.enter();
+
+ let table_append = self
+ .fluss_table()
+ .new_append()
+ .map_err(|e| format!("Failed to create append: {e}"))?;
+
+ let writer = table_append
+ .create_writer()
+ .map_err(|e| format!("Failed to create writer: {e}"))?;
+
+ Ok(Box::into_raw(Box::new(AppendWriter {
+ inner: writer,
+ table_info: self.table_info.clone(),
+ })))
}
- fn new_record_batch_log_scanner_with_projection(
+ fn create_scanner(
&self,
column_indices: Vec<usize>,
+ batch: bool,
) -> Result<*mut LogScanner, String> {
RUNTIME.block_on(async {
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let all_columns = self.table_info.get_schema().columns();
- let projected_columns: Vec<_> = column_indices
- .iter()
- .map(|&i| {
- all_columns.get(i).cloned().ok_or_else(|| {
- format!(
- "Invalid column index {i}: schema has {} columns",
- all_columns.len()
- )
- })
- })
- .collect::<Result<_, String>>()?;
-
- let batch_scanner = fluss_table
- .new_scan()
- .project(&column_indices)
- .map_err(|e| format!("Failed to project columns: {e}"))?
- .create_record_batch_log_scanner()
- .map_err(|e| format!("Failed to create record batch log
scanner: {e}"))?;
-
- let scanner = Box::into_raw(Box::new(LogScanner {
- inner: None,
- inner_batch: Some(batch_scanner),
+ let fluss_table = self.fluss_table();
+ let scan = fluss_table.new_scan();
+
+ let (projected_columns, scan) = if column_indices.is_empty() {
+ (self.table_info.get_schema().columns().to_vec(), scan)
+ } else {
+ let cols = self.resolve_projected_columns(&column_indices)?;
+ let scan = scan
+ .project(&column_indices)
+ .map_err(|e| format!("Failed to project columns: {e}"))?;
+ (cols, scan)
+ };
+
+ let (inner, inner_batch) = if batch {
+ let batch_scanner = scan
+ .create_record_batch_log_scanner()
+ .map_err(|e| format!("Failed to create record batch log
scanner: {e}"))?;
+ (None, Some(batch_scanner))
+ } else {
+ let log_scanner = scan
+ .create_log_scanner()
+ .map_err(|e| format!("Failed to create log scanner:
{e}"))?;
+ (Some(log_scanner), None)
+ };
+
+ Ok(Box::into_raw(Box::new(LogScanner {
+ inner,
+ inner_batch,
projected_columns,
- }));
- Ok(scanner)
+ })))
})
}
@@ -1084,79 +1007,24 @@ impl Table {
self.has_pk
}
- fn new_upsert_writer(&self) -> Result<*mut UpsertWriter, String> {
- let _enter = RUNTIME.enter();
-
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let table_upsert = fluss_table
- .new_upsert()
- .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
- let writer = table_upsert
- .create_writer()
- .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
- Ok(Box::into_raw(Box::new(UpsertWriter {
- inner: writer,
- table_info: self.table_info.clone(),
- })))
- }
-
- fn new_upsert_writer_with_column_names(
- &self,
- column_names: Vec<String>,
- ) -> Result<*mut UpsertWriter, String> {
- let _enter = RUNTIME.enter();
-
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let table_upsert = fluss_table
- .new_upsert()
- .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
- let col_refs: Vec<&str> = column_names.iter().map(|s|
s.as_str()).collect();
- let table_upsert = table_upsert
- .partial_update_with_column_names(&col_refs)
- .map_err(|e| format!("Failed to set partial update columns:
{e}"))?;
-
- let writer = table_upsert
- .create_writer()
- .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
- Ok(Box::into_raw(Box::new(UpsertWriter {
- inner: writer,
- table_info: self.table_info.clone(),
- })))
- }
-
- fn new_upsert_writer_with_column_indices(
+ fn create_upsert_writer(
&self,
column_indices: Vec<usize>,
) -> Result<*mut UpsertWriter, String> {
let _enter = RUNTIME.enter();
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let table_upsert = fluss_table
+ let table_upsert = self
+ .fluss_table()
.new_upsert()
.map_err(|e| format!("Failed to create upsert: {e}"))?;
- let table_upsert = table_upsert
- .partial_update(Some(column_indices))
- .map_err(|e| format!("Failed to set partial update columns:
{e}"))?;
+ let table_upsert = if column_indices.is_empty() {
+ table_upsert
+ } else {
+ table_upsert
+ .partial_update(Some(column_indices))
+ .map_err(|e| format!("Failed to set partial update columns:
{e}"))?
+ };
let writer = table_upsert
.create_writer()
@@ -1171,13 +1039,8 @@ impl Table {
fn new_lookuper(&self) -> Result<*mut Lookuper, String> {
let _enter = RUNTIME.enter();
- let fluss_table = fcore::client::FlussTable::new(
- &self.connection,
- self.metadata.clone(),
- self.table_info.clone(),
- );
-
- let table_lookup = fluss_table
+ let table_lookup = self
+ .fluss_table()
.new_lookup()
.map_err(|e| format!("Failed to create lookup: {e}"))?;
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index a266363..da4dc30 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -109,8 +109,19 @@ Table& Table::operator=(Table&& other) noexcept {
bool Table::Available() const { return table_ != nullptr; }
-Result Table::NewAppendWriter(AppendWriter& out) {
- if (!Available()) {
+TableAppend Table::NewAppend() { return TableAppend(table_); }
+
+TableUpsert Table::NewUpsert() { return TableUpsert(table_); }
+
+TableLookup Table::NewLookup() { return TableLookup(table_); }
+
+TableScan Table::NewScan() { return TableScan(table_); }
+
+// TableAppend implementation
+TableAppend::TableAppend(ffi::Table* table) noexcept : table_(table) {}
+
+Result TableAppend::CreateWriter(AppendWriter& out) {
+ if (table_ == nullptr) {
return utils::make_error(1, "Table not available");
}
@@ -124,7 +135,86 @@ Result Table::NewAppendWriter(AppendWriter& out) {
}
}
-TableScan Table::NewScan() { return TableScan(table_); }
+// TableUpsert implementation
+TableUpsert::TableUpsert(ffi::Table* table) noexcept : table_(table) {}
+
+TableUpsert& TableUpsert::PartialUpdateByIndex(std::vector<size_t>
column_indices) {
+ if (column_indices.empty()) {
+ throw std::invalid_argument("PartialUpdateByIndex requires at least
one column");
+ }
+ column_indices_ = std::move(column_indices);
+ column_names_.clear();
+ return *this;
+}
+
+TableUpsert& TableUpsert::PartialUpdateByName(std::vector<std::string>
column_names) {
+ if (column_names.empty()) {
+ throw std::invalid_argument("PartialUpdateByName requires at least one
column");
+ }
+ column_names_ = std::move(column_names);
+ column_indices_.clear();
+ return *this;
+}
+
+std::vector<size_t> TableUpsert::ResolveNameProjection() const {
+ auto ffi_info = table_->get_table_info_from_table();
+ const auto& columns = ffi_info.schema.columns;
+
+ std::vector<size_t> indices;
+ for (const auto& name : column_names_) {
+ bool found = false;
+ for (size_t i = 0; i < columns.size(); ++i) {
+ if (std::string(columns[i].name) == name) {
+ indices.push_back(i);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw std::runtime_error("Column '" + name + "' not found");
+ }
+ }
+ return indices;
+}
+
+Result TableUpsert::CreateWriter(UpsertWriter& out) {
+ if (table_ == nullptr) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ auto resolved_indices = !column_names_.empty() ?
ResolveNameProjection() : column_indices_;
+
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : resolved_indices) {
+ rust_indices.push_back(idx);
+ }
+ out =
UpsertWriter(table_->create_upsert_writer(std::move(rust_indices)));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+// TableLookup implementation
+TableLookup::TableLookup(ffi::Table* table) noexcept : table_(table) {}
+
+Result TableLookup::CreateLookuper(Lookuper& out) {
+ if (table_ == nullptr) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out = Lookuper(table_->new_lookuper());
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
// TableScan implementation
TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {}
@@ -169,15 +259,11 @@ Result TableScan::CreateLogScanner(LogScanner& out) {
try {
auto resolved_indices = !name_projection_.empty() ?
ResolveNameProjection() : projection_;
- if (!resolved_indices.empty()) {
- rust::Vec<size_t> rust_indices;
- for (size_t idx : resolved_indices) {
- rust_indices.push_back(idx);
- }
- out.scanner_ =
table_->new_log_scanner_with_projection(std::move(rust_indices));
- } else {
- out.scanner_ = table_->new_log_scanner();
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : resolved_indices) {
+ rust_indices.push_back(idx);
}
+ out.scanner_ = table_->create_scanner(std::move(rust_indices), false);
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
@@ -193,16 +279,11 @@ Result TableScan::CreateRecordBatchScanner(LogScanner&
out) {
try {
auto resolved_indices = !name_projection_.empty() ?
ResolveNameProjection() : projection_;
- if (!resolved_indices.empty()) {
- rust::Vec<size_t> rust_indices;
- for (size_t idx : resolved_indices) {
- rust_indices.push_back(idx);
- }
- out.scanner_ =
-
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
- } else {
- out.scanner_ = table_->new_record_batch_log_scanner();
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : resolved_indices) {
+ rust_indices.push_back(idx);
}
+ out.scanner_ = table_->create_scanner(std::move(rust_indices), true);
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
@@ -489,75 +570,6 @@ Result Lookuper::Lookup(const GenericRow& pk_row, bool&
found, GenericRow& out)
}
}
-// Table KV methods
-Result Table::NewUpsertWriter(UpsertWriter& out) {
- if (!Available()) {
- return utils::make_error(1, "Table not available");
- }
-
- try {
- out = UpsertWriter(table_->new_upsert_writer());
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_error(1, e.what());
- } catch (const std::exception& e) {
- return utils::make_error(1, e.what());
- }
-}
-
-Result Table::NewUpsertWriter(UpsertWriter& out, const
std::vector<std::string>& column_names) {
- if (!Available()) {
- return utils::make_error(1, "Table not available");
- }
-
- try {
- rust::Vec<rust::String> rust_names;
- for (const auto& name : column_names) {
- rust_names.push_back(rust::String(name));
- }
- out =
UpsertWriter(table_->new_upsert_writer_with_column_names(std::move(rust_names)));
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_error(1, e.what());
- } catch (const std::exception& e) {
- return utils::make_error(1, e.what());
- }
-}
-
-Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>&
column_indices) {
- if (!Available()) {
- return utils::make_error(1, "Table not available");
- }
-
- try {
- rust::Vec<size_t> rust_indices;
- for (size_t idx : column_indices) {
- rust_indices.push_back(idx);
- }
- out =
UpsertWriter(table_->new_upsert_writer_with_column_indices(std::move(rust_indices)));
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_error(1, e.what());
- } catch (const std::exception& e) {
- return utils::make_error(1, e.what());
- }
-}
-
-Result Table::NewLookuper(Lookuper& out) {
- if (!Available()) {
- return utils::make_error(1, "Table not available");
- }
-
- try {
- out = Lookuper(table_->new_lookuper());
- return utils::make_ok();
- } catch (const rust::Error& e) {
- return utils::make_error(1, e.what());
- } catch (const std::exception& e) {
- return utils::make_error(1, e.what());
- }
-}
-
// LogScanner implementation
LogScanner::LogScanner() noexcept = default;