This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 10637b7  chore: Builder pattern, cleanup, consistent API (#296)
10637b7 is described below

commit 10637b7f13a3c18550e8c27d7ac8e49e36e34ab7
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 11 04:03:10 2026 +0000

    chore: Builder pattern, cleanup, consistent API (#296)
---
 bindings/cpp/examples/example.cpp    |   6 +-
 bindings/cpp/examples/kv_example.cpp | 109 ++++++-------
 bindings/cpp/include/fluss.hpp       |  72 ++++++++-
 bindings/cpp/src/admin.cpp           |  49 +++---
 bindings/cpp/src/connection.cpp      |   6 +-
 bindings/cpp/src/lib.rs              | 295 ++++++++++-------------------------
 bindings/cpp/src/table.cpp           | 190 +++++++++++-----------
 7 files changed, 328 insertions(+), 399 deletions(-)

diff --git a/bindings/cpp/examples/example.cpp 
b/bindings/cpp/examples/example.cpp
index 47087e5..59f1ed0 100644
--- a/bindings/cpp/examples/example.cpp
+++ b/bindings/cpp/examples/example.cpp
@@ -79,7 +79,7 @@ int main() {
 
     // 5) Write rows with scalar and temporal values
     fluss::AppendWriter writer;
-    check("new_append_writer", table.NewAppendWriter(writer));
+    check("new_append_writer", table.NewAppend().CreateWriter(writer));
 
     struct RowData {
         int id;
@@ -423,7 +423,7 @@ int main() {
     check("get_decimal_table", conn.GetTable(decimal_table_path, 
decimal_table));
 
     fluss::AppendWriter decimal_writer;
-    check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer));
+    check("new_decimal_writer", 
decimal_table.NewAppend().CreateWriter(decimal_writer));
 
     // Just provide the value — Rust resolves (p,s) from schema
     {
@@ -512,7 +512,7 @@ int main() {
     check("get_partitioned_table", conn.GetTable(partitioned_table_path, 
partitioned_table));
 
     fluss::AppendWriter partitioned_writer;
-    check("new_partitioned_writer", 
partitioned_table.NewAppendWriter(partitioned_writer));
+    check("new_partitioned_writer", 
partitioned_table.NewAppend().CreateWriter(partitioned_writer));
 
     struct PartitionedRow {
         int id;
diff --git a/bindings/cpp/examples/kv_example.cpp 
b/bindings/cpp/examples/kv_example.cpp
index daebfb2..2a40db3 100644
--- a/bindings/cpp/examples/kv_example.cpp
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -76,7 +76,7 @@ int main() {
     //    - Set("last_seen", ts) auto-routes to SetTimestampLtz (schema-aware)
     std::cout << "\n--- Upsert Rows ---" << std::endl;
     fluss::UpsertWriter upsert_writer;
-    check("new_upsert_writer", kv_table.NewUpsertWriter(upsert_writer));
+    check("new_upsert_writer", 
kv_table.NewUpsert().CreateWriter(upsert_writer));
 
     // Fire-and-forget upserts
     {
@@ -130,7 +130,7 @@ int main() {
     // 4) Lookup by primary key — verify all types round-trip
     std::cout << "\n--- Lookup by Primary Key ---" << std::endl;
     fluss::Lookuper lookuper;
-    check("new_lookuper", kv_table.NewLookuper(lookuper));
+    check("new_lookuper", kv_table.NewLookup().CreateLookuper(lookuper));
 
     // Lookup existing key
     {
@@ -242,9 +242,9 @@ int main() {
     // 7) Partial update by column names
     std::cout << "\n--- Partial Update by Column Names ---" << std::endl;
     fluss::UpsertWriter partial_writer;
-    check("new_partial_upsert_writer",
-          kv_table.NewUpsertWriter(partial_writer,
-                                   std::vector<std::string>{"user_id", 
"balance", "last_seen"}));
+    check("new_partial_upsert_writer", kv_table.NewUpsert()
+                                           .PartialUpdateByName({"user_id", 
"balance", "last_seen"})
+                                           .CreateWriter(partial_writer));
 
     {
         auto row = kv_table.NewRow();
@@ -282,7 +282,7 @@ int main() {
     fluss::UpsertWriter partial_writer_idx;
     // Columns: 0=user_id (PK), 1=name — update name only
     check("new_partial_upsert_writer_idx",
-          kv_table.NewUpsertWriter(partial_writer_idx, std::vector<size_t>{0, 
1}));
+          kv_table.NewUpsert().PartialUpdateByIndex({0, 
1}).CreateWriter(partial_writer_idx));
 
     {
         // Index-based setters: lighter than name-based, useful for hot paths
@@ -321,37 +321,39 @@ int main() {
 
     // 9) Partitioned KV table
     std::cout << "\n--- Partitioned KV Table ---" << std::endl;
-    fluss::TablePath part_kv_path("fluss", "partitioned_kv_cpp_v1");
-    admin.DropTable(part_kv_path, true);
-
-    auto part_kv_schema = fluss::Schema::NewBuilder()
-                              .AddColumn("region", fluss::DataType::String())
-                              .AddColumn("user_id", fluss::DataType::Int())
-                              .AddColumn("name", fluss::DataType::String())
-                              .AddColumn("score", fluss::DataType::BigInt())
-                              .SetPrimaryKeys({"region", "user_id"})
-                              .Build();
-
-    auto part_kv_descriptor = fluss::TableDescriptor::NewBuilder()
-                                  .SetSchema(part_kv_schema)
-                                  .SetPartitionKeys({"region"})
-                                  .SetComment("partitioned kv table example")
-                                  .Build();
-
-    check("create_part_kv", admin.CreateTable(part_kv_path, 
part_kv_descriptor, false));
+    fluss::TablePath partitioned_kv_path("fluss", "partitioned_kv_cpp_v1");
+    admin.DropTable(partitioned_kv_path, true);
+
+    auto partitioned_kv_schema = fluss::Schema::NewBuilder()
+                                     .AddColumn("region", 
fluss::DataType::String())
+                                     .AddColumn("user_id", 
fluss::DataType::Int())
+                                     .AddColumn("name", 
fluss::DataType::String())
+                                     .AddColumn("score", 
fluss::DataType::BigInt())
+                                     .SetPrimaryKeys({"region", "user_id"})
+                                     .Build();
+
+    auto partitioned_kv_descriptor = fluss::TableDescriptor::NewBuilder()
+                                         .SetSchema(partitioned_kv_schema)
+                                         .SetPartitionKeys({"region"})
+                                         .SetComment("partitioned kv table 
example")
+                                         .Build();
+
+    check("create_partitioned_kv",
+          admin.CreateTable(partitioned_kv_path, partitioned_kv_descriptor, 
false));
     std::cout << "Created partitioned KV table" << std::endl;
 
     // Create partitions
-    check("create_US", admin.CreatePartition(part_kv_path, {{"region", 
"US"}}));
-    check("create_EU", admin.CreatePartition(part_kv_path, {{"region", 
"EU"}}));
-    check("create_APAC", admin.CreatePartition(part_kv_path, {{"region", 
"APAC"}}));
+    check("create_US", admin.CreatePartition(partitioned_kv_path, {{"region", 
"US"}}));
+    check("create_EU", admin.CreatePartition(partitioned_kv_path, {{"region", 
"EU"}}));
+    check("create_APAC", admin.CreatePartition(partitioned_kv_path, 
{{"region", "APAC"}}));
     std::cout << "Created partitions: US, EU, APAC" << std::endl;
 
-    fluss::Table part_kv_table;
-    check("get_part_kv_table", conn.GetTable(part_kv_path, part_kv_table));
+    fluss::Table partitioned_kv_table;
+    check("get_partitioned_kv_table", conn.GetTable(partitioned_kv_path, 
partitioned_kv_table));
 
-    fluss::UpsertWriter part_writer;
-    check("new_part_writer", part_kv_table.NewUpsertWriter(part_writer));
+    fluss::UpsertWriter partitioned_writer;
+    check("new_partitioned_writer",
+          partitioned_kv_table.NewUpsert().CreateWriter(partitioned_writer));
 
     // Upsert rows across partitions
     struct TestRow {
@@ -366,28 +368,29 @@ int main() {
     };
 
     for (const auto& td : test_data) {
-        auto row = part_kv_table.NewRow();
+        auto row = partitioned_kv_table.NewRow();
         row.Set("region", td.region);
         row.Set("user_id", td.user_id);
         row.Set("name", td.name);
         row.Set("score", td.score);
-        check("part_upsert", part_writer.Upsert(row));
+        check("partitioned_upsert", partitioned_writer.Upsert(row));
     }
-    check("part_flush", part_writer.Flush());
+    check("partitioned_flush", partitioned_writer.Flush());
     std::cout << "Upserted 5 rows across 3 partitions" << std::endl;
 
     // Lookup all rows
-    fluss::Lookuper part_lookuper;
-    check("new_part_lookuper", part_kv_table.NewLookuper(part_lookuper));
+    fluss::Lookuper partitioned_lookuper;
+    check("new_partitioned_lookuper",
+          
partitioned_kv_table.NewLookup().CreateLookuper(partitioned_lookuper));
 
     for (const auto& td : test_data) {
-        auto pk = part_kv_table.NewRow();
+        auto pk = partitioned_kv_table.NewRow();
         pk.Set("region", td.region);
         pk.Set("user_id", td.user_id);
 
         bool found = false;
         fluss::GenericRow result;
-        check("part_lookup", part_lookuper.Lookup(pk, found, result));
+        check("partitioned_lookup", partitioned_lookuper.Lookup(pk, found, 
result));
         if (!found) {
             std::cerr << "ERROR: Expected to find region=" << td.region << " 
user_id=" << td.user_id
                       << std::endl;
@@ -403,22 +406,22 @@ int main() {
 
     // Update within a partition
     {
-        auto row = part_kv_table.NewRow();
+        auto row = partitioned_kv_table.NewRow();
         row.Set("region", "US");
         row.Set("user_id", 1);
         row.Set("name", "Gustave Updated");
         row.Set("score", static_cast<int64_t>(999));
         fluss::WriteResult wr;
-        check("part_update", part_writer.Upsert(row, wr));
-        check("part_update_wait", wr.Wait());
+        check("partitioned_update", partitioned_writer.Upsert(row, wr));
+        check("partitioned_update_wait", wr.Wait());
     }
     {
-        auto pk = part_kv_table.NewRow();
+        auto pk = partitioned_kv_table.NewRow();
         pk.Set("region", "US");
         pk.Set("user_id", 1);
         bool found = false;
         fluss::GenericRow result;
-        check("part_lookup_updated", part_lookuper.Lookup(pk, found, result));
+        check("partitioned_lookup_updated", partitioned_lookuper.Lookup(pk, 
found, result));
         if (!found || result.GetString(2) != "Gustave Updated" || 
result.GetInt64(3) != 999) {
             std::cerr << "ERROR: Partition update verification failed" << 
std::endl;
             std::exit(1);
@@ -429,12 +432,12 @@ int main() {
 
     // Lookup in non-existent partition
     {
-        auto pk = part_kv_table.NewRow();
+        auto pk = partitioned_kv_table.NewRow();
         pk.Set("region", "UNKNOWN");
         pk.Set("user_id", 1);
         bool found = false;
         fluss::GenericRow result;
-        check("part_lookup_unknown", part_lookuper.Lookup(pk, found, result));
+        check("partitioned_lookup_unknown", partitioned_lookuper.Lookup(pk, 
found, result));
         if (found) {
             std::cerr << "ERROR: Expected UNKNOWN partition lookup to return 
not found"
                       << std::endl;
@@ -445,20 +448,20 @@ int main() {
 
     // Delete within a partition
     {
-        auto pk = part_kv_table.NewRow();
+        auto pk = partitioned_kv_table.NewRow();
         pk.Set("region", "EU");
         pk.Set("user_id", 1);
         fluss::WriteResult wr;
-        check("part_delete", part_writer.Delete(pk, wr));
-        check("part_delete_wait", wr.Wait());
+        check("partitioned_delete", partitioned_writer.Delete(pk, wr));
+        check("partitioned_delete_wait", wr.Wait());
     }
     {
-        auto pk = part_kv_table.NewRow();
+        auto pk = partitioned_kv_table.NewRow();
         pk.Set("region", "EU");
         pk.Set("user_id", 1);
         bool found = false;
         fluss::GenericRow result;
-        check("part_lookup_deleted", part_lookuper.Lookup(pk, found, result));
+        check("partitioned_lookup_deleted", partitioned_lookuper.Lookup(pk, 
found, result));
         if (found) {
             std::cerr << "ERROR: Expected EU/1 to be deleted" << std::endl;
             std::exit(1);
@@ -468,12 +471,12 @@ int main() {
 
     // Verify other record in same partition still exists
     {
-        auto pk = part_kv_table.NewRow();
+        auto pk = partitioned_kv_table.NewRow();
         pk.Set("region", "EU");
         pk.Set("user_id", 2);
         bool found = false;
         fluss::GenericRow result;
-        check("part_lookup_eu2", part_lookuper.Lookup(pk, found, result));
+        check("partitioned_lookup_eu2", partitioned_lookuper.Lookup(pk, found, 
result));
         if (!found || result.GetString(2) != "Maelle") {
             std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" << 
std::endl;
             std::exit(1);
@@ -481,7 +484,7 @@ int main() {
         std::cout << "EU/2 still exists: name=" << result.GetString(2) << 
std::endl;
     }
 
-    check("drop_part_kv", admin.DropTable(part_kv_path, true));
+    check("drop_partitioned_kv", admin.DropTable(partitioned_kv_path, true));
     std::cout << "\nKV table example completed successfully!" << std::endl;
 
     return 0;
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 41aae67..1806616 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -804,6 +804,9 @@ class WriteResult;
 class LogScanner;
 class Admin;
 class Table;
+class TableAppend;
+class TableUpsert;
+class TableLookup;
 class TableScan;
 
 class Connection {
@@ -909,11 +912,9 @@ class Table {
 
     GenericRow NewRow() const;
 
-    Result NewAppendWriter(AppendWriter& out);
-    Result NewUpsertWriter(UpsertWriter& out);
-    Result NewUpsertWriter(UpsertWriter& out, const std::vector<std::string>& 
column_names);
-    Result NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>& 
column_indices);
-    Result NewLookuper(Lookuper& out);
+    TableAppend NewAppend();
+    TableUpsert NewUpsert();
+    TableLookup NewLookup();
     TableScan NewScan();
 
     TableInfo GetTableInfo() const;
@@ -922,6 +923,9 @@ class Table {
 
    private:
     friend class Connection;
+    friend class TableAppend;
+    friend class TableUpsert;
+    friend class TableLookup;
     friend class TableScan;
     Table(ffi::Table* table) noexcept;
 
@@ -932,6 +936,61 @@ class Table {
     mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
 };
 
+class TableAppend {
+   public:
+    TableAppend(const TableAppend&) = delete;
+    TableAppend& operator=(const TableAppend&) = delete;
+    TableAppend(TableAppend&&) noexcept = default;
+    TableAppend& operator=(TableAppend&&) noexcept = default;
+
+    Result CreateWriter(AppendWriter& out);
+
+   private:
+    friend class Table;
+    explicit TableAppend(ffi::Table* table) noexcept;
+
+    ffi::Table* table_{nullptr};
+};
+
+class TableUpsert {
+   public:
+    TableUpsert(const TableUpsert&) = delete;
+    TableUpsert& operator=(const TableUpsert&) = delete;
+    TableUpsert(TableUpsert&&) noexcept = default;
+    TableUpsert& operator=(TableUpsert&&) noexcept = default;
+
+    TableUpsert& PartialUpdateByIndex(std::vector<size_t> column_indices);
+    TableUpsert& PartialUpdateByName(std::vector<std::string> column_names);
+
+    Result CreateWriter(UpsertWriter& out);
+
+   private:
+    friend class Table;
+    explicit TableUpsert(ffi::Table* table) noexcept;
+
+    std::vector<size_t> ResolveNameProjection() const;
+
+    ffi::Table* table_{nullptr};
+    std::vector<size_t> column_indices_;
+    std::vector<std::string> column_names_;
+};
+
+class TableLookup {
+   public:
+    TableLookup(const TableLookup&) = delete;
+    TableLookup& operator=(const TableLookup&) = delete;
+    TableLookup(TableLookup&&) noexcept = default;
+    TableLookup& operator=(TableLookup&&) noexcept = default;
+
+    Result CreateLookuper(Lookuper& out);
+
+   private:
+    friend class Table;
+    explicit TableLookup(ffi::Table* table) noexcept;
+
+    ffi::Table* table_{nullptr};
+};
+
 class TableScan {
    public:
     TableScan(const TableScan&) = delete;
@@ -999,6 +1058,7 @@ class AppendWriter {
 
    private:
     friend class Table;
+    friend class TableAppend;
     AppendWriter(ffi::AppendWriter* writer) noexcept;
 
     void Destroy() noexcept;
@@ -1025,6 +1085,7 @@ class UpsertWriter {
 
    private:
     friend class Table;
+    friend class TableUpsert;
     UpsertWriter(ffi::UpsertWriter* writer) noexcept;
     void Destroy() noexcept;
     ffi::UpsertWriter* writer_{nullptr};
@@ -1046,6 +1107,7 @@ class Lookuper {
 
    private:
     friend class Table;
+    friend class TableLookup;
     Lookuper(ffi::Lookuper* lookuper) noexcept;
     void Destroy() noexcept;
     ffi::Lookuper* lookuper_{nullptr};
diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp
index 77c95d3..7925256 100644
--- a/bindings/cpp/src/admin.cpp
+++ b/bindings/cpp/src/admin.cpp
@@ -17,9 +17,9 @@
  * under the License.
  */
 
+#include "ffi_converter.hpp"
 #include "fluss.hpp"
 #include "lib.rs.h"
-#include "ffi_converter.hpp"
 #include "rust/cxx.h"
 
 namespace fluss {
@@ -37,9 +37,7 @@ void Admin::Destroy() noexcept {
     }
 }
 
-Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) {
-    other.admin_ = nullptr;
-}
+Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) { other.admin_ = 
nullptr; }
 
 Admin& Admin::operator=(Admin&& other) noexcept {
     if (this != &other) {
@@ -52,8 +50,7 @@ Admin& Admin::operator=(Admin&& other) noexcept {
 
 bool Admin::Available() const { return admin_ != nullptr; }
 
-Result Admin::CreateTable(const TablePath& table_path,
-                          const TableDescriptor& descriptor,
+Result Admin::CreateTable(const TablePath& table_path, const TableDescriptor& 
descriptor,
                           bool ignore_if_exists) {
     if (!Available()) {
         return utils::make_error(1, "Admin not available");
@@ -109,17 +106,16 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& 
table_path, LakeSnapshot& o
 }
 
 // function for common list offsets functionality
-Result Admin::DoListOffsets(const TablePath& table_path,
-                         const std::vector<int32_t>& bucket_ids,
-                         const OffsetQuery& offset_query,
-                         std::unordered_map<int32_t, int64_t>& out,
-                         const std::string* partition_name) {
+Result Admin::DoListOffsets(const TablePath& table_path, const 
std::vector<int32_t>& bucket_ids,
+                            const OffsetQuery& offset_query,
+                            std::unordered_map<int32_t, int64_t>& out,
+                            const std::string* partition_name) {
     if (!Available()) {
         return utils::make_error(1, "Admin not available");
     }
 
     auto ffi_path = utils::to_ffi_table_path(table_path);
-    
+
     rust::Vec<int32_t> rust_bucket_ids;
     for (int32_t id : bucket_ids) {
         rust_bucket_ids.push_back(id);
@@ -131,11 +127,12 @@ Result Admin::DoListOffsets(const TablePath& table_path,
 
     ffi::FfiListOffsetsResult ffi_result;
     if (partition_name != nullptr) {
-        ffi_result = admin_->list_partition_offsets(ffi_path, 
rust::String(*partition_name), std::move(rust_bucket_ids), ffi_query);
+        ffi_result = admin_->list_partition_offsets(ffi_path, 
rust::String(*partition_name),
+                                                    
std::move(rust_bucket_ids), ffi_query);
     } else {
         ffi_result = admin_->list_offsets(ffi_path, 
std::move(rust_bucket_ids), ffi_query);
     }
-    
+
     auto result = utils::from_ffi_result(ffi_result.result);
     if (result.Ok()) {
         out.clear();
@@ -147,23 +144,20 @@ Result Admin::DoListOffsets(const TablePath& table_path,
     return result;
 }
 
-Result Admin::ListOffsets(const TablePath& table_path,
-                          const std::vector<int32_t>& bucket_ids,
+Result Admin::ListOffsets(const TablePath& table_path, const 
std::vector<int32_t>& bucket_ids,
                           const OffsetQuery& offset_query,
                           std::unordered_map<int32_t, int64_t>& out) {
     return DoListOffsets(table_path, bucket_ids, offset_query, out);
 }
 
-Result Admin::ListPartitionOffsets(const TablePath& table_path,
-                                const std::string& partition_name,
-                                const std::vector<int32_t>& bucket_ids,
-                                const OffsetQuery& offset_query,
-                                std::unordered_map<int32_t, int64_t>& out) {
+Result Admin::ListPartitionOffsets(const TablePath& table_path, const 
std::string& partition_name,
+                                   const std::vector<int32_t>& bucket_ids,
+                                   const OffsetQuery& offset_query,
+                                   std::unordered_map<int32_t, int64_t>& out) {
     return DoListOffsets(table_path, bucket_ids, offset_query, out, 
&partition_name);
 }
 
-Result Admin::ListPartitionInfos(const TablePath& table_path,
-                                 std::vector<PartitionInfo>& out) {
+Result Admin::ListPartitionInfos(const TablePath& table_path, 
std::vector<PartitionInfo>& out) {
     if (!Available()) {
         return utils::make_error(1, "Admin not available");
     }
@@ -221,21 +215,18 @@ Result Admin::DropPartition(const TablePath& table_path,
         rust_spec.push_back(std::move(kv));
     }
 
-    auto ffi_result =
-        admin_->drop_partition(ffi_path, std::move(rust_spec), 
ignore_if_not_exists);
+    auto ffi_result = admin_->drop_partition(ffi_path, std::move(rust_spec), 
ignore_if_not_exists);
     return utils::from_ffi_result(ffi_result);
 }
 
-Result Admin::CreateDatabase(const std::string& database_name,
-                             const DatabaseDescriptor& descriptor,
+Result Admin::CreateDatabase(const std::string& database_name, const 
DatabaseDescriptor& descriptor,
                              bool ignore_if_exists) {
     if (!Available()) {
         return utils::make_error(1, "Admin not available");
     }
 
     auto ffi_desc = utils::to_ffi_database_descriptor(descriptor);
-    auto ffi_result =
-        admin_->create_database(rust::Str(database_name), ffi_desc, 
ignore_if_exists);
+    auto ffi_result = admin_->create_database(rust::Str(database_name), 
ffi_desc, ignore_if_exists);
     return utils::from_ffi_result(ffi_result);
 }
 
diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp
index ea884cd..4fbfafb 100644
--- a/bindings/cpp/src/connection.cpp
+++ b/bindings/cpp/src/connection.cpp
@@ -17,9 +17,9 @@
  * under the License.
  */
 
+#include "ffi_converter.hpp"
 #include "fluss.hpp"
 #include "lib.rs.h"
-#include "ffi_converter.hpp"
 #include "rust/cxx.h"
 
 namespace fluss {
@@ -35,9 +35,7 @@ void Connection::Destroy() noexcept {
     }
 }
 
-Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) {
-    other.conn_ = nullptr;
-}
+Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) { 
other.conn_ = nullptr; }
 
 Connection& Connection::operator=(Connection&& other) noexcept {
     if (this != &other) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index ee7f1d8..9b1b5ef 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -325,25 +325,15 @@ mod ffi {
         // Table
         unsafe fn delete_table(table: *mut Table);
         fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>;
-        fn new_log_scanner(self: &Table) -> Result<*mut LogScanner>;
-        fn new_log_scanner_with_projection(
-            self: &Table,
-            column_indices: Vec<usize>,
-        ) -> Result<*mut LogScanner>;
-        fn new_record_batch_log_scanner(self: &Table) -> Result<*mut 
LogScanner>;
-        fn new_record_batch_log_scanner_with_projection(
+        fn create_scanner(
             self: &Table,
             column_indices: Vec<usize>,
+            batch: bool,
         ) -> Result<*mut LogScanner>;
         fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
         fn get_table_path(self: &Table) -> FfiTablePath;
         fn has_primary_key(self: &Table) -> bool;
-        fn new_upsert_writer(self: &Table) -> Result<*mut UpsertWriter>;
-        fn new_upsert_writer_with_column_names(
-            self: &Table,
-            column_names: Vec<String>,
-        ) -> Result<*mut UpsertWriter>;
-        fn new_upsert_writer_with_column_indices(
+        fn create_upsert_writer(
             self: &Table,
             column_indices: Vec<usize>,
         ) -> Result<*mut UpsertWriter>;
@@ -919,153 +909,86 @@ unsafe fn delete_table(table: *mut Table) {
 }
 
 impl Table {
-    fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
-        let _enter = RUNTIME.enter();
-
-        let fluss_table = fcore::client::FlussTable::new(
+    fn fluss_table(&self) -> fcore::client::FlussTable<'_> {
+        fcore::client::FlussTable::new(
             &self.connection,
             self.metadata.clone(),
             self.table_info.clone(),
-        );
-
-        let table_append = match fluss_table.new_append() {
-            Ok(a) => a,
-            Err(e) => return Err(format!("Failed to create append: {e}")),
-        };
-
-        let writer = match table_append.create_writer() {
-            Ok(w) => w,
-            Err(e) => return Err(format!("Failed to create writer: {e}")),
-        };
-        let writer = Box::into_raw(Box::new(AppendWriter {
-            inner: writer,
-            table_info: self.table_info.clone(),
-        }));
-        Ok(writer)
+        )
     }
 
-    fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
-        RUNTIME.block_on(async {
-            let fluss_table = fcore::client::FlussTable::new(
-                &self.connection,
-                self.metadata.clone(),
-                self.table_info.clone(),
-            );
-
-            let scanner = fluss_table
-                .new_scan()
-                .create_log_scanner()
-                .map_err(|e| format!("Failed to create log scanner: {e}"))?;
-
-            let scanner_ptr = Box::into_raw(Box::new(LogScanner {
-                inner: Some(scanner),
-                inner_batch: None,
-                projected_columns: 
self.table_info.get_schema().columns().to_vec(),
-            }));
-
-            Ok(scanner_ptr)
-        })
-    }
-
-    fn new_log_scanner_with_projection(
+    fn resolve_projected_columns(
         &self,
-        column_indices: Vec<usize>,
-    ) -> Result<*mut LogScanner, String> {
-        RUNTIME.block_on(async {
-            let fluss_table = fcore::client::FlussTable::new(
-                &self.connection,
-                self.metadata.clone(),
-                self.table_info.clone(),
-            );
-
-            let all_columns = self.table_info.get_schema().columns();
-            let projected_columns: Vec<_> = column_indices
-                .iter()
-                .map(|&i| {
-                    all_columns.get(i).cloned().ok_or_else(|| {
-                        format!(
-                            "Invalid column index {i}: schema has {} columns",
-                            all_columns.len()
-                        )
-                    })
+        indices: &[usize],
+    ) -> Result<Vec<fcore::metadata::Column>, String> {
+        let all_columns = self.table_info.get_schema().columns();
+        indices
+            .iter()
+            .map(|&i| {
+                all_columns.get(i).cloned().ok_or_else(|| {
+                    format!(
+                        "Invalid column index {i}: schema has {} columns",
+                        all_columns.len()
+                    )
                 })
-                .collect::<Result<_, String>>()?;
-
-            let log_scanner = fluss_table
-                .new_scan()
-                .project(&column_indices)
-                .map_err(|e| format!("Failed to project columns: {e}"))?
-                .create_log_scanner()
-                .map_err(|e| format!("Failed to create log scanner: {e}"))?;
-
-            let scanner = Box::into_raw(Box::new(LogScanner {
-                inner: Some(log_scanner),
-                inner_batch: None,
-                projected_columns,
-            }));
-            Ok(scanner)
-        })
+            })
+            .collect()
     }
 
-    fn new_record_batch_log_scanner(&self) -> Result<*mut LogScanner, String> {
-        RUNTIME.block_on(async {
-            let fluss_table = fcore::client::FlussTable::new(
-                &self.connection,
-                self.metadata.clone(),
-                self.table_info.clone(),
-            );
-
-            let batch_scanner = fluss_table
-                .new_scan()
-                .create_record_batch_log_scanner()
-                .map_err(|e| format!("Failed to create record batch log 
scanner: {e}"))?;
-
-            let scanner = Box::into_raw(Box::new(LogScanner {
-                inner: None,
-                inner_batch: Some(batch_scanner),
-                projected_columns: 
self.table_info.get_schema().columns().to_vec(),
-            }));
-            Ok(scanner)
-        })
+    fn new_append_writer(&self) -> Result<*mut AppendWriter, String> {
+        let _enter = RUNTIME.enter();
+
+        let table_append = self
+            .fluss_table()
+            .new_append()
+            .map_err(|e| format!("Failed to create append: {e}"))?;
+
+        let writer = table_append
+            .create_writer()
+            .map_err(|e| format!("Failed to create writer: {e}"))?;
+
+        Ok(Box::into_raw(Box::new(AppendWriter {
+            inner: writer,
+            table_info: self.table_info.clone(),
+        })))
     }
 
-    fn new_record_batch_log_scanner_with_projection(
+    fn create_scanner(
         &self,
         column_indices: Vec<usize>,
+        batch: bool,
     ) -> Result<*mut LogScanner, String> {
         RUNTIME.block_on(async {
-            let fluss_table = fcore::client::FlussTable::new(
-                &self.connection,
-                self.metadata.clone(),
-                self.table_info.clone(),
-            );
-
-            let all_columns = self.table_info.get_schema().columns();
-            let projected_columns: Vec<_> = column_indices
-                .iter()
-                .map(|&i| {
-                    all_columns.get(i).cloned().ok_or_else(|| {
-                        format!(
-                            "Invalid column index {i}: schema has {} columns",
-                            all_columns.len()
-                        )
-                    })
-                })
-                .collect::<Result<_, String>>()?;
-
-            let batch_scanner = fluss_table
-                .new_scan()
-                .project(&column_indices)
-                .map_err(|e| format!("Failed to project columns: {e}"))?
-                .create_record_batch_log_scanner()
-                .map_err(|e| format!("Failed to create record batch log 
scanner: {e}"))?;
-
-            let scanner = Box::into_raw(Box::new(LogScanner {
-                inner: None,
-                inner_batch: Some(batch_scanner),
+            let fluss_table = self.fluss_table();
+            let scan = fluss_table.new_scan();
+
+            let (projected_columns, scan) = if column_indices.is_empty() {
+                (self.table_info.get_schema().columns().to_vec(), scan)
+            } else {
+                let cols = self.resolve_projected_columns(&column_indices)?;
+                let scan = scan
+                    .project(&column_indices)
+                    .map_err(|e| format!("Failed to project columns: {e}"))?;
+                (cols, scan)
+            };
+
+            let (inner, inner_batch) = if batch {
+                let batch_scanner = scan
+                    .create_record_batch_log_scanner()
+                    .map_err(|e| format!("Failed to create record batch log 
scanner: {e}"))?;
+                (None, Some(batch_scanner))
+            } else {
+                let log_scanner = scan
+                    .create_log_scanner()
+                    .map_err(|e| format!("Failed to create log scanner: 
{e}"))?;
+                (Some(log_scanner), None)
+            };
+
+            Ok(Box::into_raw(Box::new(LogScanner {
+                inner,
+                inner_batch,
                 projected_columns,
-            }));
-            Ok(scanner)
+            })))
         })
     }
 
@@ -1084,79 +1007,24 @@ impl Table {
         self.has_pk
     }
 
-    fn new_upsert_writer(&self) -> Result<*mut UpsertWriter, String> {
-        let _enter = RUNTIME.enter();
-
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
-
-        let table_upsert = fluss_table
-            .new_upsert()
-            .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
-        let writer = table_upsert
-            .create_writer()
-            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
-        Ok(Box::into_raw(Box::new(UpsertWriter {
-            inner: writer,
-            table_info: self.table_info.clone(),
-        })))
-    }
-
-    fn new_upsert_writer_with_column_names(
-        &self,
-        column_names: Vec<String>,
-    ) -> Result<*mut UpsertWriter, String> {
-        let _enter = RUNTIME.enter();
-
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
-
-        let table_upsert = fluss_table
-            .new_upsert()
-            .map_err(|e| format!("Failed to create upsert: {e}"))?;
-
-        let col_refs: Vec<&str> = column_names.iter().map(|s| 
s.as_str()).collect();
-        let table_upsert = table_upsert
-            .partial_update_with_column_names(&col_refs)
-            .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?;
-
-        let writer = table_upsert
-            .create_writer()
-            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
-
-        Ok(Box::into_raw(Box::new(UpsertWriter {
-            inner: writer,
-            table_info: self.table_info.clone(),
-        })))
-    }
-
-    fn new_upsert_writer_with_column_indices(
+    fn create_upsert_writer(
         &self,
         column_indices: Vec<usize>,
     ) -> Result<*mut UpsertWriter, String> {
         let _enter = RUNTIME.enter();
 
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
-
-        let table_upsert = fluss_table
+        let table_upsert = self
+            .fluss_table()
             .new_upsert()
             .map_err(|e| format!("Failed to create upsert: {e}"))?;
 
-        let table_upsert = table_upsert
-            .partial_update(Some(column_indices))
-            .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?;
+        let table_upsert = if column_indices.is_empty() {
+            table_upsert
+        } else {
+            table_upsert
+                .partial_update(Some(column_indices))
+                .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?
+        };
 
         let writer = table_upsert
             .create_writer()
@@ -1171,13 +1039,8 @@ impl Table {
     fn new_lookuper(&self) -> Result<*mut Lookuper, String> {
         let _enter = RUNTIME.enter();
 
-        let fluss_table = fcore::client::FlussTable::new(
-            &self.connection,
-            self.metadata.clone(),
-            self.table_info.clone(),
-        );
-
-        let table_lookup = fluss_table
+        let table_lookup = self
+            .fluss_table()
             .new_lookup()
             .map_err(|e| format!("Failed to create lookup: {e}"))?;
 
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index a266363..da4dc30 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -109,8 +109,19 @@ Table& Table::operator=(Table&& other) noexcept {
 
 bool Table::Available() const { return table_ != nullptr; }
 
-Result Table::NewAppendWriter(AppendWriter& out) {
-    if (!Available()) {
+TableAppend Table::NewAppend() { return TableAppend(table_); }
+
+TableUpsert Table::NewUpsert() { return TableUpsert(table_); }
+
+TableLookup Table::NewLookup() { return TableLookup(table_); }
+
+TableScan Table::NewScan() { return TableScan(table_); }
+
+// TableAppend implementation
+TableAppend::TableAppend(ffi::Table* table) noexcept : table_(table) {}
+
+Result TableAppend::CreateWriter(AppendWriter& out) {
+    if (table_ == nullptr) {
         return utils::make_error(1, "Table not available");
     }
 
@@ -124,7 +135,86 @@ Result Table::NewAppendWriter(AppendWriter& out) {
     }
 }
 
-TableScan Table::NewScan() { return TableScan(table_); }
+// TableUpsert implementation
+TableUpsert::TableUpsert(ffi::Table* table) noexcept : table_(table) {}
+
+TableUpsert& TableUpsert::PartialUpdateByIndex(std::vector<size_t> 
column_indices) {
+    if (column_indices.empty()) {
+        throw std::invalid_argument("PartialUpdateByIndex requires at least 
one column");
+    }
+    column_indices_ = std::move(column_indices);
+    column_names_.clear();
+    return *this;
+}
+
+TableUpsert& TableUpsert::PartialUpdateByName(std::vector<std::string> 
column_names) {
+    if (column_names.empty()) {
+        throw std::invalid_argument("PartialUpdateByName requires at least one 
column");
+    }
+    column_names_ = std::move(column_names);
+    column_indices_.clear();
+    return *this;
+}
+
+std::vector<size_t> TableUpsert::ResolveNameProjection() const {
+    auto ffi_info = table_->get_table_info_from_table();
+    const auto& columns = ffi_info.schema.columns;
+
+    std::vector<size_t> indices;
+    for (const auto& name : column_names_) {
+        bool found = false;
+        for (size_t i = 0; i < columns.size(); ++i) {
+            if (std::string(columns[i].name) == name) {
+                indices.push_back(i);
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            throw std::runtime_error("Column '" + name + "' not found");
+        }
+    }
+    return indices;
+}
+
+Result TableUpsert::CreateWriter(UpsertWriter& out) {
+    if (table_ == nullptr) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        auto resolved_indices = !column_names_.empty() ? 
ResolveNameProjection() : column_indices_;
+
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : resolved_indices) {
+            rust_indices.push_back(idx);
+        }
+        out = 
UpsertWriter(table_->create_upsert_writer(std::move(rust_indices)));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+// TableLookup implementation
+TableLookup::TableLookup(ffi::Table* table) noexcept : table_(table) {}
+
+Result TableLookup::CreateLookuper(Lookuper& out) {
+    if (table_ == nullptr) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out = Lookuper(table_->new_lookuper());
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
 
 // TableScan implementation
 TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {}
@@ -169,15 +259,11 @@ Result TableScan::CreateLogScanner(LogScanner& out) {
 
     try {
         auto resolved_indices = !name_projection_.empty() ? 
ResolveNameProjection() : projection_;
-        if (!resolved_indices.empty()) {
-            rust::Vec<size_t> rust_indices;
-            for (size_t idx : resolved_indices) {
-                rust_indices.push_back(idx);
-            }
-            out.scanner_ = 
table_->new_log_scanner_with_projection(std::move(rust_indices));
-        } else {
-            out.scanner_ = table_->new_log_scanner();
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : resolved_indices) {
+            rust_indices.push_back(idx);
         }
+        out.scanner_ = table_->create_scanner(std::move(rust_indices), false);
         return utils::make_ok();
     } catch (const rust::Error& e) {
         return utils::make_error(1, e.what());
@@ -193,16 +279,11 @@ Result TableScan::CreateRecordBatchScanner(LogScanner& 
out) {
 
     try {
         auto resolved_indices = !name_projection_.empty() ? 
ResolveNameProjection() : projection_;
-        if (!resolved_indices.empty()) {
-            rust::Vec<size_t> rust_indices;
-            for (size_t idx : resolved_indices) {
-                rust_indices.push_back(idx);
-            }
-            out.scanner_ =
-                
table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices));
-        } else {
-            out.scanner_ = table_->new_record_batch_log_scanner();
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : resolved_indices) {
+            rust_indices.push_back(idx);
         }
+        out.scanner_ = table_->create_scanner(std::move(rust_indices), true);
         return utils::make_ok();
     } catch (const rust::Error& e) {
         return utils::make_error(1, e.what());
@@ -489,75 +570,6 @@ Result Lookuper::Lookup(const GenericRow& pk_row, bool& 
found, GenericRow& out)
     }
 }
 
-// Table KV methods
-Result Table::NewUpsertWriter(UpsertWriter& out) {
-    if (!Available()) {
-        return utils::make_error(1, "Table not available");
-    }
-
-    try {
-        out = UpsertWriter(table_->new_upsert_writer());
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_error(1, e.what());
-    } catch (const std::exception& e) {
-        return utils::make_error(1, e.what());
-    }
-}
-
-Result Table::NewUpsertWriter(UpsertWriter& out, const 
std::vector<std::string>& column_names) {
-    if (!Available()) {
-        return utils::make_error(1, "Table not available");
-    }
-
-    try {
-        rust::Vec<rust::String> rust_names;
-        for (const auto& name : column_names) {
-            rust_names.push_back(rust::String(name));
-        }
-        out = 
UpsertWriter(table_->new_upsert_writer_with_column_names(std::move(rust_names)));
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_error(1, e.what());
-    } catch (const std::exception& e) {
-        return utils::make_error(1, e.what());
-    }
-}
-
-Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>& 
column_indices) {
-    if (!Available()) {
-        return utils::make_error(1, "Table not available");
-    }
-
-    try {
-        rust::Vec<size_t> rust_indices;
-        for (size_t idx : column_indices) {
-            rust_indices.push_back(idx);
-        }
-        out = 
UpsertWriter(table_->new_upsert_writer_with_column_indices(std::move(rust_indices)));
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_error(1, e.what());
-    } catch (const std::exception& e) {
-        return utils::make_error(1, e.what());
-    }
-}
-
-Result Table::NewLookuper(Lookuper& out) {
-    if (!Available()) {
-        return utils::make_error(1, "Table not available");
-    }
-
-    try {
-        out = Lookuper(table_->new_lookuper());
-        return utils::make_ok();
-    } catch (const rust::Error& e) {
-        return utils::make_error(1, e.what());
-    } catch (const std::exception& e) {
-        return utils::make_error(1, e.what());
-    }
-}
-
 // LogScanner implementation
 LogScanner::LogScanner() noexcept = default;
 


Reply via email to