This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 61aa7e1  feat: support kv tables in CPP (#288)
61aa7e1 is described below

commit 61aa7e1d4abbd6008fd84d3d1c3bfe095b36c85a
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 9 12:41:14 2026 +0000

    feat: support kv tables in CPP (#288)
---
 bindings/cpp/BUILD.bazel                |  33 +++
 bindings/cpp/CMakeLists.txt             |   6 +
 bindings/cpp/examples/admin_example.cpp |   5 +-
 bindings/cpp/examples/kv_example.cpp    | 488 ++++++++++++++++++++++++++++++++
 bindings/cpp/include/fluss.hpp          | 134 ++++++++-
 bindings/cpp/src/ffi_converter.hpp      |   3 +-
 bindings/cpp/src/lib.rs                 | 295 ++++++++++++++++++-
 bindings/cpp/src/table.cpp              | 235 ++++++++++++++-
 bindings/cpp/src/types.rs               | 432 +++++++++++++++++++---------
 9 files changed, 1483 insertions(+), 148 deletions(-)

diff --git a/bindings/cpp/BUILD.bazel b/bindings/cpp/BUILD.bazel
index aff8f50..0ae2ce3 100644
--- a/bindings/cpp/BUILD.bazel
+++ b/bindings/cpp/BUILD.bazel
@@ -373,3 +373,36 @@ cc_binary(
     visibility = ["//visibility:public"],
 )
 
+cc_binary(
+    name = "fluss_cpp_kv_example",
+    srcs = [
+        "examples/kv_example.cpp",
+    ],
+    deps = [":fluss_cpp"],
+    copts = [
+        "-std=c++17",
+    ] + select({
+        ":debug_mode": [
+            "-g3",
+            "-O0",
+            "-ggdb",
+            "-fno-omit-frame-pointer",
+            "-DDEBUG",
+        ],
+        ":fastbuild_mode": [
+            "-g",
+            "-O0",
+        ],
+        ":release_mode": [
+            "-O2",
+            "-DNDEBUG",
+        ],
+    }),
+    linkopts = select({
+        ":debug_mode": ["-g"],
+        ":fastbuild_mode": ["-g"],
+        ":release_mode": [],
+    }),
+    visibility = ["//visibility:public"],
+)
+
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index ae70842..05c58ea 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -108,6 +108,12 @@ target_link_libraries(fluss_cpp_admin_example PRIVATE 
Arrow::arrow_shared)
 target_compile_definitions(fluss_cpp_admin_example PRIVATE ARROW_FOUND)
 target_include_directories(fluss_cpp_admin_example PUBLIC ${CPP_INCLUDE_DIR})
 
+add_executable(fluss_cpp_kv_example examples/kv_example.cpp)
+target_link_libraries(fluss_cpp_kv_example PRIVATE fluss_cpp)
+target_link_libraries(fluss_cpp_kv_example PRIVATE Arrow::arrow_shared)
+target_compile_definitions(fluss_cpp_kv_example PRIVATE ARROW_FOUND)
+target_include_directories(fluss_cpp_kv_example PUBLIC ${CPP_INCLUDE_DIR})
+
 set_target_properties(fluss_cpp
     PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
 )
diff --git a/bindings/cpp/examples/admin_example.cpp 
b/bindings/cpp/examples/admin_example.cpp
index 7b7a333..196fe97 100644
--- a/bindings/cpp/examples/admin_example.cpp
+++ b/bindings/cpp/examples/admin_example.cpp
@@ -61,9 +61,8 @@ int main() {
 
     fluss::DatabaseInfo db_info;
     check("get_database_info", admin.GetDatabaseInfo(db_name, db_info));
-    std::cout << "Database info: name=" << db_info.database_name
-              << " comment=" << db_info.comment << " created_time=" << 
db_info.created_time
-              << std::endl;
+    std::cout << "Database info: name=" << db_info.database_name << " 
comment=" << db_info.comment
+              << " created_time=" << db_info.created_time << std::endl;
 
     std::vector<std::string> databases;
     check("list_databases", admin.ListDatabases(databases));
diff --git a/bindings/cpp/examples/kv_example.cpp 
b/bindings/cpp/examples/kv_example.cpp
new file mode 100644
index 0000000..daebfb2
--- /dev/null
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -0,0 +1,488 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include "fluss.hpp"
+
+static void check(const char* step, const fluss::Result& r) {
+    if (!r.Ok()) {
+        std::cerr << step << " failed: code=" << r.error_code << " msg=" << 
r.error_message
+                  << std::endl;
+        std::exit(1);
+    }
+}
+
+int main() {
+    const std::string bootstrap = "127.0.0.1:9123";
+
+    // 1) Connect and get Admin
+    fluss::Connection conn;
+    check("connect", fluss::Connection::Connect(bootstrap, conn));
+
+    fluss::Admin admin;
+    check("get_admin", conn.GetAdmin(admin));
+
+    fluss::TablePath kv_table_path("fluss", "kv_table_cpp_v1");
+
+    // Drop if exists
+    admin.DropTable(kv_table_path, true);
+
+    // 2) Create a KV table with primary key, including decimal and temporal 
types
+    auto kv_schema = fluss::Schema::NewBuilder()
+                         .AddColumn("user_id", fluss::DataType::Int())
+                         .AddColumn("name", fluss::DataType::String())
+                         .AddColumn("email", fluss::DataType::String())
+                         .AddColumn("score", fluss::DataType::Float())
+                         .AddColumn("balance", fluss::DataType::Decimal(10, 2))
+                         .AddColumn("birth_date", fluss::DataType::Date())
+                         .AddColumn("login_time", fluss::DataType::Time())
+                         .AddColumn("created_at", fluss::DataType::Timestamp())
+                         .AddColumn("last_seen", 
fluss::DataType::TimestampLtz())
+                         .SetPrimaryKeys({"user_id"})
+                         .Build();
+
+    auto kv_descriptor = fluss::TableDescriptor::NewBuilder()
+                             .SetSchema(kv_schema)
+                             .SetBucketCount(3)
+                             .SetComment("cpp kv table example")
+                             .Build();
+
+    check("create_kv_table", admin.CreateTable(kv_table_path, kv_descriptor, 
false));
+    std::cout << "Created KV table with primary key" << std::endl;
+
+    fluss::Table kv_table;
+    check("get_kv_table", conn.GetTable(kv_table_path, kv_table));
+
+    // 3) Upsert rows using name-based Set()
+    //    - Set("balance", "1234.56") auto-routes to SetDecimal (schema-aware)
+    //    - Set("created_at", ts) auto-routes to SetTimestampNtz (schema-aware)
+    //    - Set("last_seen", ts) auto-routes to SetTimestampLtz (schema-aware)
+    std::cout << "\n--- Upsert Rows ---" << std::endl;
+    fluss::UpsertWriter upsert_writer;
+    check("new_upsert_writer", kv_table.NewUpsertWriter(upsert_writer));
+
+    // Fire-and-forget upserts
+    {
+        auto row = kv_table.NewRow();
+        row.Set("user_id", 1);
+        row.Set("name", "Alice");
+        row.Set("email", "[email protected]");
+        row.Set("score", 95.5f);
+        row.Set("balance", "1234.56");
+        row.Set("birth_date", fluss::Date::FromYMD(1990, 3, 15));
+        row.Set("login_time", fluss::Time::FromHMS(9, 30, 0));
+        row.Set("created_at", fluss::Timestamp::FromMillis(1700000000000));
+        row.Set("last_seen", fluss::Timestamp::FromMillis(1700000060000));
+        check("upsert_1", upsert_writer.Upsert(row));
+    }
+    {
+        auto row = kv_table.NewRow();
+        row.Set("user_id", 2);
+        row.Set("name", "Bob");
+        row.Set("email", "[email protected]");
+        row.Set("score", 87.3f);
+        row.Set("balance", "567.89");
+        row.Set("birth_date", fluss::Date::FromYMD(1985, 7, 22));
+        row.Set("login_time", fluss::Time::FromHMS(14, 15, 30));
+        row.Set("created_at", fluss::Timestamp::FromMillis(1700000100000));
+        row.Set("last_seen", fluss::Timestamp::FromMillis(1700000200000));
+        check("upsert_2", upsert_writer.Upsert(row));
+    }
+
+    // Per-record acknowledgment
+    {
+        auto row = kv_table.NewRow();
+        row.Set("user_id", 3);
+        row.Set("name", "Charlie");
+        row.Set("email", "[email protected]");
+        row.Set("score", 92.0f);
+        row.Set("balance", "99999.99");
+        row.Set("birth_date", fluss::Date::FromYMD(2000, 1, 1));
+        row.Set("login_time", fluss::Time::FromHMS(23, 59, 59));
+        row.Set("created_at", fluss::Timestamp::FromMillis(1700000300000));
+        row.Set("last_seen", fluss::Timestamp::FromMillis(1700000400000));
+        fluss::WriteResult wr;
+        check("upsert_3", upsert_writer.Upsert(row, wr));
+        check("upsert_3_wait", wr.Wait());
+        std::cout << "Upsert acknowledged by server" << std::endl;
+    }
+
+    check("upsert_flush", upsert_writer.Flush());
+    std::cout << "Upserted 3 rows" << std::endl;
+
+    // 4) Lookup by primary key — verify all types round-trip
+    std::cout << "\n--- Lookup by Primary Key ---" << std::endl;
+    fluss::Lookuper lookuper;
+    check("new_lookuper", kv_table.NewLookuper(lookuper));
+
+    // Lookup existing key
+    {
+        auto pk_row = kv_table.NewRow();
+        pk_row.Set("user_id", 1);
+
+        bool found = false;
+        fluss::GenericRow result_row;
+        check("lookup_1", lookuper.Lookup(pk_row, found, result_row));
+        if (found) {
+            auto date = result_row.GetDate(5);
+            auto time = result_row.GetTime(6);
+            auto created = result_row.GetTimestamp(7);
+            auto seen = result_row.GetTimestamp(8);
+            std::cout << "Found user_id=1:"
+                      << "\n  name=" << result_row.GetString(1)
+                      << "\n  email=" << result_row.GetString(2)
+                      << "\n  score=" << result_row.GetFloat32(3)
+                      << "\n  balance=" << result_row.DecimalToString(4)
+                      << "\n  birth_date=" << date.Year() << "-" << 
date.Month() << "-"
+                      << date.Day() << "\n  login_time=" << time.Hour() << ":" 
<< time.Minute()
+                      << ":" << time.Second() << "\n  created_at(ms)=" << 
created.epoch_millis
+                      << "\n  last_seen(ms)=" << seen.epoch_millis << 
std::endl;
+        } else {
+            std::cerr << "ERROR: Expected to find user_id=1" << std::endl;
+            std::exit(1);
+        }
+    }
+
+    // Lookup non-existing key
+    {
+        auto pk_row = kv_table.NewRow();
+        pk_row.Set("user_id", 999);
+
+        bool found = false;
+        fluss::GenericRow result_row;
+        check("lookup_999", lookuper.Lookup(pk_row, found, result_row));
+        if (!found) {
+            std::cout << "user_id=999 not found (expected)" << std::endl;
+        } else {
+            std::cerr << "ERROR: Expected user_id=999 to not be found" << 
std::endl;
+            std::exit(1);
+        }
+    }
+
+    // 5) Update via upsert (overwrite existing key)
+    std::cout << "\n--- Update via Upsert ---" << std::endl;
+    {
+        auto row = kv_table.NewRow();
+        row.Set("user_id", 1);
+        row.Set("name", "Alice Updated");
+        row.Set("email", "[email protected]");
+        row.Set("score", 99.0f);
+        row.Set("balance", "9999.00");
+        row.Set("birth_date", fluss::Date::FromYMD(1990, 3, 15));
+        row.Set("login_time", fluss::Time::FromHMS(10, 0, 0));
+        row.Set("created_at", fluss::Timestamp::FromMillis(1700000000000));
+        row.Set("last_seen", fluss::Timestamp::FromMillis(1700000500000));
+        fluss::WriteResult wr;
+        check("upsert_update", upsert_writer.Upsert(row, wr));
+        check("upsert_update_wait", wr.Wait());
+    }
+
+    // Verify update
+    {
+        auto pk_row = kv_table.NewRow();
+        pk_row.Set("user_id", 1);
+
+        bool found = false;
+        fluss::GenericRow result_row;
+        check("lookup_updated", lookuper.Lookup(pk_row, found, result_row));
+        if (found && result_row.GetString(1) == "Alice Updated") {
+            std::cout << "Update verified: name=" << result_row.GetString(1)
+                      << " balance=" << result_row.DecimalToString(4)
+                      << " last_seen(ms)=" << 
result_row.GetTimestamp(8).epoch_millis << std::endl;
+        } else {
+            std::cerr << "ERROR: Update verification failed" << std::endl;
+            std::exit(1);
+        }
+    }
+
+    // 6) Delete by primary key
+    std::cout << "\n--- Delete by Primary Key ---" << std::endl;
+    {
+        auto pk_row = kv_table.NewRow();
+        pk_row.Set("user_id", 2);
+        fluss::WriteResult wr;
+        check("delete_2", upsert_writer.Delete(pk_row, wr));
+        check("delete_2_wait", wr.Wait());
+        std::cout << "Deleted user_id=2" << std::endl;
+    }
+
+    // Verify deletion
+    {
+        auto pk_row = kv_table.NewRow();
+        pk_row.Set("user_id", 2);
+
+        bool found = false;
+        fluss::GenericRow result_row;
+        check("lookup_deleted", lookuper.Lookup(pk_row, found, result_row));
+        if (!found) {
+            std::cout << "Delete verified: user_id=2 not found" << std::endl;
+        } else {
+            std::cerr << "ERROR: Expected user_id=2 to be deleted" << 
std::endl;
+            std::exit(1);
+        }
+    }
+
+    // 7) Partial update by column names
+    std::cout << "\n--- Partial Update by Column Names ---" << std::endl;
+    fluss::UpsertWriter partial_writer;
+    check("new_partial_upsert_writer",
+          kv_table.NewUpsertWriter(partial_writer,
+                                   std::vector<std::string>{"user_id", 
"balance", "last_seen"}));
+
+    {
+        auto row = kv_table.NewRow();
+        row.Set("user_id", 3);
+        row.Set("balance", "50000.00");
+        row.Set("last_seen", fluss::Timestamp::FromMillis(1700000999000));
+        fluss::WriteResult wr;
+        check("partial_upsert", partial_writer.Upsert(row, wr));
+        check("partial_upsert_wait", wr.Wait());
+        std::cout << "Partial update: set balance=50000.00, last_seen for 
user_id=3" << std::endl;
+    }
+
+    // Verify partial update (other fields unchanged)
+    {
+        auto pk_row = kv_table.NewRow();
+        pk_row.Set("user_id", 3);
+
+        bool found = false;
+        fluss::GenericRow result_row;
+        check("lookup_partial", lookuper.Lookup(pk_row, found, result_row));
+        if (found) {
+            std::cout << "Partial update verified:"
+                      << "\n  name=" << result_row.GetString(1) << " 
(unchanged)"
+                      << "\n  balance=" << result_row.DecimalToString(4) << " 
(updated)"
+                      << "\n  last_seen(ms)=" << 
result_row.GetTimestamp(8).epoch_millis
+                      << " (updated)" << std::endl;
+        } else {
+            std::cerr << "ERROR: Expected to find user_id=3" << std::endl;
+            std::exit(1);
+        }
+    }
+
+    // 8) Partial update by column indices (using index-based setters for 
lower overhead)
+    std::cout << "\n--- Partial Update by Column Indices ---" << std::endl;
+    fluss::UpsertWriter partial_writer_idx;
+    // Columns: 0=user_id (PK), 1=name — update name only
+    check("new_partial_upsert_writer_idx",
+          kv_table.NewUpsertWriter(partial_writer_idx, std::vector<size_t>{0, 
1}));
+
+    {
+        // Index-based setters: lighter than name-based, useful for hot paths
+        fluss::GenericRow row;
+        row.SetInt32(0, 3);                   // user_id (PK)
+        row.SetString(1, "Charlie Updated");  // name
+        fluss::WriteResult wr;
+        check("partial_upsert_idx", partial_writer_idx.Upsert(row, wr));
+        check("partial_upsert_idx_wait", wr.Wait());
+        std::cout << "Partial update by indices: set name='Charlie Updated' 
for user_id=3"
+                  << std::endl;
+    }
+
+    // Verify: name changed, balance/last_seen unchanged from previous partial 
update
+    {
+        auto pk_row = kv_table.NewRow();
+        pk_row.Set("user_id", 3);
+
+        bool found = false;
+        fluss::GenericRow result_row;
+        check("lookup_partial_idx", lookuper.Lookup(pk_row, found, 
result_row));
+        if (found) {
+            std::cout << "Partial update by indices verified:"
+                      << "\n  name=" << result_row.GetString(1) << " (updated)"
+                      << "\n  balance=" << result_row.DecimalToString(4) << " 
(unchanged)"
+                      << "\n  last_seen(ms)=" << 
result_row.GetTimestamp(8).epoch_millis
+                      << " (unchanged)" << std::endl;
+        } else {
+            std::cerr << "ERROR: Expected to find user_id=3" << std::endl;
+            std::exit(1);
+        }
+    }
+
+    // Cleanup
+    check("drop_kv_table", admin.DropTable(kv_table_path, true));
+
+    // 9) Partitioned KV table
+    std::cout << "\n--- Partitioned KV Table ---" << std::endl;
+    fluss::TablePath part_kv_path("fluss", "partitioned_kv_cpp_v1");
+    admin.DropTable(part_kv_path, true);
+
+    auto part_kv_schema = fluss::Schema::NewBuilder()
+                              .AddColumn("region", fluss::DataType::String())
+                              .AddColumn("user_id", fluss::DataType::Int())
+                              .AddColumn("name", fluss::DataType::String())
+                              .AddColumn("score", fluss::DataType::BigInt())
+                              .SetPrimaryKeys({"region", "user_id"})
+                              .Build();
+
+    auto part_kv_descriptor = fluss::TableDescriptor::NewBuilder()
+                                  .SetSchema(part_kv_schema)
+                                  .SetPartitionKeys({"region"})
+                                  .SetComment("partitioned kv table example")
+                                  .Build();
+
+    check("create_part_kv", admin.CreateTable(part_kv_path, 
part_kv_descriptor, false));
+    std::cout << "Created partitioned KV table" << std::endl;
+
+    // Create partitions
+    check("create_US", admin.CreatePartition(part_kv_path, {{"region", 
"US"}}));
+    check("create_EU", admin.CreatePartition(part_kv_path, {{"region", 
"EU"}}));
+    check("create_APAC", admin.CreatePartition(part_kv_path, {{"region", 
"APAC"}}));
+    std::cout << "Created partitions: US, EU, APAC" << std::endl;
+
+    fluss::Table part_kv_table;
+    check("get_part_kv_table", conn.GetTable(part_kv_path, part_kv_table));
+
+    fluss::UpsertWriter part_writer;
+    check("new_part_writer", part_kv_table.NewUpsertWriter(part_writer));
+
+    // Upsert rows across partitions
+    struct TestRow {
+        const char* region;
+        int32_t user_id;
+        const char* name;
+        int64_t score;
+    };
+    TestRow test_data[] = {
+        {"US", 1, "Gustave", 100}, {"US", 2, "Lune", 200},   {"EU", 1, 
"Sciel", 150},
+        {"EU", 2, "Maelle", 250},  {"APAC", 1, "Noco", 300},
+    };
+
+    for (const auto& td : test_data) {
+        auto row = part_kv_table.NewRow();
+        row.Set("region", td.region);
+        row.Set("user_id", td.user_id);
+        row.Set("name", td.name);
+        row.Set("score", td.score);
+        check("part_upsert", part_writer.Upsert(row));
+    }
+    check("part_flush", part_writer.Flush());
+    std::cout << "Upserted 5 rows across 3 partitions" << std::endl;
+
+    // Lookup all rows
+    fluss::Lookuper part_lookuper;
+    check("new_part_lookuper", part_kv_table.NewLookuper(part_lookuper));
+
+    for (const auto& td : test_data) {
+        auto pk = part_kv_table.NewRow();
+        pk.Set("region", td.region);
+        pk.Set("user_id", td.user_id);
+
+        bool found = false;
+        fluss::GenericRow result;
+        check("part_lookup", part_lookuper.Lookup(pk, found, result));
+        if (!found) {
+            std::cerr << "ERROR: Expected to find region=" << td.region << " 
user_id=" << td.user_id
+                      << std::endl;
+            std::exit(1);
+        }
+        if (result.GetString(2) != td.name || result.GetInt64(3) != td.score) {
+            std::cerr << "ERROR: Data mismatch for region=" << td.region
+                      << " user_id=" << td.user_id << std::endl;
+            std::exit(1);
+        }
+    }
+    std::cout << "All 5 rows verified across partitions" << std::endl;
+
+    // Update within a partition
+    {
+        auto row = part_kv_table.NewRow();
+        row.Set("region", "US");
+        row.Set("user_id", 1);
+        row.Set("name", "Gustave Updated");
+        row.Set("score", static_cast<int64_t>(999));
+        fluss::WriteResult wr;
+        check("part_update", part_writer.Upsert(row, wr));
+        check("part_update_wait", wr.Wait());
+    }
+    {
+        auto pk = part_kv_table.NewRow();
+        pk.Set("region", "US");
+        pk.Set("user_id", 1);
+        bool found = false;
+        fluss::GenericRow result;
+        check("part_lookup_updated", part_lookuper.Lookup(pk, found, result));
+        if (!found || result.GetString(2) != "Gustave Updated" || 
result.GetInt64(3) != 999) {
+            std::cerr << "ERROR: Partition update verification failed" << 
std::endl;
+            std::exit(1);
+        }
+        std::cout << "Update verified: US/1 name=" << result.GetString(2)
+                  << " score=" << result.GetInt64(3) << std::endl;
+    }
+
+    // Lookup in non-existent partition
+    {
+        auto pk = part_kv_table.NewRow();
+        pk.Set("region", "UNKNOWN");
+        pk.Set("user_id", 1);
+        bool found = false;
+        fluss::GenericRow result;
+        check("part_lookup_unknown", part_lookuper.Lookup(pk, found, result));
+        if (found) {
+            std::cerr << "ERROR: Expected UNKNOWN partition lookup to return 
not found"
+                      << std::endl;
+            std::exit(1);
+        }
+        std::cout << "UNKNOWN partition lookup: not found (expected)" << 
std::endl;
+    }
+
+    // Delete within a partition
+    {
+        auto pk = part_kv_table.NewRow();
+        pk.Set("region", "EU");
+        pk.Set("user_id", 1);
+        fluss::WriteResult wr;
+        check("part_delete", part_writer.Delete(pk, wr));
+        check("part_delete_wait", wr.Wait());
+    }
+    {
+        auto pk = part_kv_table.NewRow();
+        pk.Set("region", "EU");
+        pk.Set("user_id", 1);
+        bool found = false;
+        fluss::GenericRow result;
+        check("part_lookup_deleted", part_lookuper.Lookup(pk, found, result));
+        if (found) {
+            std::cerr << "ERROR: Expected EU/1 to be deleted" << std::endl;
+            std::exit(1);
+        }
+        std::cout << "Delete verified: EU/1 not found" << std::endl;
+    }
+
+    // Verify other record in same partition still exists
+    {
+        auto pk = part_kv_table.NewRow();
+        pk.Set("region", "EU");
+        pk.Set("user_id", 2);
+        bool found = false;
+        fluss::GenericRow result;
+        check("part_lookup_eu2", part_lookuper.Lookup(pk, found, result));
+        if (!found || result.GetString(2) != "Maelle") {
+            std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" << 
std::endl;
+            std::exit(1);
+        }
+        std::cout << "EU/2 still exists: name=" << result.GetString(2) << 
std::endl;
+    }
+
+    check("drop_part_kv", admin.DropTable(part_kv_path, true));
+    std::cout << "\nKV table example completed successfully!" << std::endl;
+
+    return 0;
+}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 50dffae..c635c81 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -41,6 +41,8 @@ struct Table;
 struct AppendWriter;
 struct WriteResult;
 struct LogScanner;
+struct UpsertWriter;
+struct Lookuper;
 }  // namespace ffi
 
 struct Date {
@@ -490,6 +492,7 @@ struct GenericRow {
 
     size_t FieldCount() const { return fields.size(); }
 
+    // ── Index-based getters ──────────────────────────────────────────
     DatumType GetType(size_t idx) const { return GetField(idx).GetType(); }
     bool IsNull(size_t idx) const { return GetField(idx).IsNull(); }
     bool GetBool(size_t idx) const { return GetTypedField(idx, 
DatumType::Bool).GetBool(); }
@@ -528,6 +531,7 @@ struct GenericRow {
         return d.DecimalToString();
     }
 
+    // ── Index-based setters ──────────────────────────────────────────
     void SetNull(size_t idx) {
         EnsureSize(idx);
         fields[idx] = Datum::Null();
@@ -593,8 +597,77 @@ struct GenericRow {
         fields[idx] = Datum::DecimalString(value);
     }
 
+    // ── Name-based setters (require schema — see Table::NewRow()) ───
+    void Set(const std::string& name, std::nullptr_t) { 
SetNull(Resolve(name)); }
+    void Set(const std::string& name, bool v) { SetBool(Resolve(name), v); }
+    void Set(const std::string& name, int32_t v) { SetInt32(Resolve(name), v); 
}
+    void Set(const std::string& name, int64_t v) { SetInt64(Resolve(name), v); 
}
+    void Set(const std::string& name, float v) { SetFloat32(Resolve(name), v); 
}
+    void Set(const std::string& name, double v) { SetFloat64(Resolve(name), 
v); }
+    // const char* overload to prevent "string literal" → bool conversion
+    void Set(const std::string& name, const char* v) {
+        auto [idx, type] = ResolveColumn(name);
+        if (type == TypeId::Decimal) {
+            SetDecimal(idx, v);
+        } else if (type == TypeId::String) {
+            SetString(idx, v);
+        } else {
+            throw std::runtime_error("GenericRow::Set: column '" + name +
+                                     "' is not a string or decimal column");
+        }
+    }
+    void Set(const std::string& name, std::string v) {
+        auto [idx, type] = ResolveColumn(name);
+        if (type == TypeId::Decimal) {
+            SetDecimal(idx, v);
+        } else if (type == TypeId::String) {
+            SetString(idx, std::move(v));
+        } else {
+            throw std::runtime_error("GenericRow::Set: column '" + name +
+                                     "' is not a string or decimal column");
+        }
+    }
+    void Set(const std::string& name, std::vector<uint8_t> v) {
+        SetBytes(Resolve(name), std::move(v));
+    }
+    void Set(const std::string& name, fluss::Date d) { SetDate(Resolve(name), 
d); }
+    void Set(const std::string& name, fluss::Time t) { SetTime(Resolve(name), 
t); }
+    void Set(const std::string& name, fluss::Timestamp ts) {
+        auto [idx, type] = ResolveColumn(name);
+        if (type == TypeId::TimestampLtz) {
+            SetTimestampLtz(idx, ts);
+        } else if (type == TypeId::Timestamp) {
+            SetTimestampNtz(idx, ts);
+        } else {
+            throw std::runtime_error("GenericRow::Set: column '" + name +
+                                     "' is not a timestamp column");
+        }
+    }
+
    private:
+    friend class Table;
+    struct ColumnInfo {
+        size_t index;
+        TypeId type_id;
+    };
+    using ColumnMap = std::unordered_map<std::string, ColumnInfo>;
     std::vector<Datum> fields;
+    std::shared_ptr<ColumnMap> column_map_;
+
+    size_t Resolve(const std::string& name) const { return 
ResolveColumn(name).index; }
+
+    const ColumnInfo& ResolveColumn(const std::string& name) const {
+        if (!column_map_) {
+            throw std::runtime_error(
+                "GenericRow: name-based Set() requires a schema. "
+                "Use Table::NewRow() to create a schema-aware row.");
+        }
+        auto it = column_map_->find(name);
+        if (it == column_map_->end()) {
+            throw std::runtime_error("GenericRow: unknown column '" + name + 
"'");
+        }
+        return it->second;
+    }
 
     const Datum& GetField(size_t idx) const {
         if (idx >= fields.size()) {
@@ -725,6 +798,8 @@ struct DatabaseInfo {
 };
 
 class AppendWriter;
+class UpsertWriter;
+class Lookuper;
 class WriteResult;
 class LogScanner;
 class Admin;
@@ -792,8 +867,7 @@ class Admin {
                          const std::unordered_map<std::string, std::string>& 
partition_spec,
                          bool ignore_if_not_exists = false);
 
-    Result CreateDatabase(const std::string& database_name,
-                          const DatabaseDescriptor& descriptor,
+    Result CreateDatabase(const std::string& database_name, const 
DatabaseDescriptor& descriptor,
                           bool ignore_if_exists = false);
 
     Result DropDatabase(const std::string& database_name, bool 
ignore_if_not_exists = false,
@@ -833,7 +907,13 @@ class Table {
 
     bool Available() const;
 
+    GenericRow NewRow() const;
+
     Result NewAppendWriter(AppendWriter& out);
+    Result NewUpsertWriter(UpsertWriter& out);
+    Result NewUpsertWriter(UpsertWriter& out, const std::vector<std::string>& 
column_names);
+    Result NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>& 
column_indices);
+    Result NewLookuper(Lookuper& out);
     TableScan NewScan();
 
     TableInfo GetTableInfo() const;
@@ -846,7 +926,10 @@ class Table {
     Table(ffi::Table* table) noexcept;
 
     void Destroy() noexcept;
+    const std::shared_ptr<GenericRow::ColumnMap>& GetColumnMap() const;
+
     ffi::Table* table_{nullptr};
+    mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
 };
 
 class TableScan {
@@ -887,6 +970,7 @@ class WriteResult {
 
    private:
     friend class AppendWriter;
+    friend class UpsertWriter;
     WriteResult(ffi::WriteResult* inner) noexcept;
 
     void Destroy() noexcept;
@@ -917,6 +1001,52 @@ class AppendWriter {
     ffi::AppendWriter* writer_{nullptr};
 };
 
+class UpsertWriter {
+   public:
+    UpsertWriter() noexcept;
+    ~UpsertWriter() noexcept;
+
+    UpsertWriter(const UpsertWriter&) = delete;
+    UpsertWriter& operator=(const UpsertWriter&) = delete;
+    UpsertWriter(UpsertWriter&& other) noexcept;
+    UpsertWriter& operator=(UpsertWriter&& other) noexcept;
+
+    bool Available() const;
+
+    Result Upsert(const GenericRow& row);
+    Result Upsert(const GenericRow& row, WriteResult& out);
+    Result Delete(const GenericRow& row);
+    Result Delete(const GenericRow& row, WriteResult& out);
+    Result Flush();
+
+   private:
+    friend class Table;
+    UpsertWriter(ffi::UpsertWriter* writer) noexcept;
+    void Destroy() noexcept;
+    ffi::UpsertWriter* writer_{nullptr};
+};
+
+class Lookuper {
+   public:
+    Lookuper() noexcept;
+    ~Lookuper() noexcept;
+
+    Lookuper(const Lookuper&) = delete;
+    Lookuper& operator=(const Lookuper&) = delete;
+    Lookuper(Lookuper&& other) noexcept;
+    Lookuper& operator=(Lookuper&& other) noexcept;
+
+    bool Available() const;
+
+    Result Lookup(const GenericRow& pk_row, bool& found, GenericRow& out);
+
+   private:
+    friend class Table;
+    Lookuper(ffi::Lookuper* lookuper) noexcept;
+    void Destroy() noexcept;
+    ffi::Lookuper* lookuper_{nullptr};
+};
+
 class LogScanner {
    public:
     LogScanner() noexcept;
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 8fc8415..40676e5 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -290,8 +290,7 @@ inline LakeSnapshot from_ffi_lake_snapshot(const 
ffi::FfiLakeSnapshot& ffi_snaps
     return snapshot;
 }
 
-inline ffi::FfiDatabaseDescriptor to_ffi_database_descriptor(
-    const DatabaseDescriptor& desc) {
+inline ffi::FfiDatabaseDescriptor to_ffi_database_descriptor(const 
DatabaseDescriptor& desc) {
     ffi::FfiDatabaseDescriptor ffi_desc;
     ffi_desc.comment = rust::String(desc.comment);
     for (const auto& [k, v] : desc.properties) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 5f3e7e9..ee7f1d8 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -182,6 +182,12 @@ mod ffi {
         bucket_offsets: Vec<FfiBucketOffsetPair>,
     }
 
+    struct FfiLookupResult {
+        result: FfiResult,
+        found: bool,
+        row: FfiGenericRow,
+    }
+
     struct FfiLakeSnapshotResult {
         result: FfiResult,
         lake_snapshot: FfiLakeSnapshot,
@@ -242,6 +248,8 @@ mod ffi {
         type AppendWriter;
         type WriteResult;
         type LogScanner;
+        type UpsertWriter;
+        type Lookuper;
 
         // Connection
         fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>;
@@ -330,6 +338,16 @@ mod ffi {
         fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
         fn get_table_path(self: &Table) -> FfiTablePath;
         fn has_primary_key(self: &Table) -> bool;
+        fn new_upsert_writer(self: &Table) -> Result<*mut UpsertWriter>;
+        fn new_upsert_writer_with_column_names(
+            self: &Table,
+            column_names: Vec<String>,
+        ) -> Result<*mut UpsertWriter>;
+        fn new_upsert_writer_with_column_indices(
+            self: &Table,
+            column_indices: Vec<usize>,
+        ) -> Result<*mut UpsertWriter>;
+        fn new_lookuper(self: &Table) -> Result<*mut Lookuper>;
 
         // AppendWriter
         unsafe fn delete_append_writer(writer: *mut AppendWriter);
@@ -339,6 +357,16 @@ mod ffi {
         // WriteResult — dropped automatically via rust::Box, or call wait() 
for ack
         fn wait(self: &mut WriteResult) -> FfiResult;
 
+        // UpsertWriter
+        unsafe fn delete_upsert_writer(writer: *mut UpsertWriter);
+        fn upsert(self: &mut UpsertWriter, row: &FfiGenericRow) -> 
Result<Box<WriteResult>>;
+        fn delete_row(self: &mut UpsertWriter, row: &FfiGenericRow) -> 
Result<Box<WriteResult>>;
+        fn upsert_flush(self: &mut UpsertWriter) -> FfiResult;
+
+        // Lookuper
+        unsafe fn delete_lookuper(lookuper: *mut Lookuper);
+        fn lookup(self: &mut Lookuper, pk_row: &FfiGenericRow) -> 
FfiLookupResult;
+
         // LogScanner
         unsafe fn delete_log_scanner(scanner: *mut LogScanner);
         fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> 
FfiResult;
@@ -399,6 +427,16 @@ pub struct LogScanner {
     projected_columns: Vec<fcore::metadata::Column>,
 }
 
+pub struct UpsertWriter {
+    inner: fcore::client::UpsertWriter,
+    table_info: fcore::metadata::TableInfo,
+}
+
+pub struct Lookuper {
+    inner: fcore::client::Lookuper,
+    table_info: fcore::metadata::TableInfo,
+}
+
 fn ok_result() -> ffi::FfiResult {
     ffi::FfiResult {
         error_code: 0,
@@ -1045,6 +1083,113 @@ impl Table {
     fn has_primary_key(&self) -> bool {
         self.has_pk
     }
+
+    fn new_upsert_writer(&self) -> Result<*mut UpsertWriter, String> {
+        let _enter = RUNTIME.enter();
+
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let table_upsert = fluss_table
+            .new_upsert()
+            .map_err(|e| format!("Failed to create upsert: {e}"))?;
+
+        let writer = table_upsert
+            .create_writer()
+            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+
+        Ok(Box::into_raw(Box::new(UpsertWriter {
+            inner: writer,
+            table_info: self.table_info.clone(),
+        })))
+    }
+
+    fn new_upsert_writer_with_column_names(
+        &self,
+        column_names: Vec<String>,
+    ) -> Result<*mut UpsertWriter, String> {
+        let _enter = RUNTIME.enter();
+
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let table_upsert = fluss_table
+            .new_upsert()
+            .map_err(|e| format!("Failed to create upsert: {e}"))?;
+
+        let col_refs: Vec<&str> = column_names.iter().map(|s| 
s.as_str()).collect();
+        let table_upsert = table_upsert
+            .partial_update_with_column_names(&col_refs)
+            .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?;
+
+        let writer = table_upsert
+            .create_writer()
+            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+
+        Ok(Box::into_raw(Box::new(UpsertWriter {
+            inner: writer,
+            table_info: self.table_info.clone(),
+        })))
+    }
+
+    fn new_upsert_writer_with_column_indices(
+        &self,
+        column_indices: Vec<usize>,
+    ) -> Result<*mut UpsertWriter, String> {
+        let _enter = RUNTIME.enter();
+
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let table_upsert = fluss_table
+            .new_upsert()
+            .map_err(|e| format!("Failed to create upsert: {e}"))?;
+
+        let table_upsert = table_upsert
+            .partial_update(Some(column_indices))
+            .map_err(|e| format!("Failed to set partial update columns: 
{e}"))?;
+
+        let writer = table_upsert
+            .create_writer()
+            .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+
+        Ok(Box::into_raw(Box::new(UpsertWriter {
+            inner: writer,
+            table_info: self.table_info.clone(),
+        })))
+    }
+
+    fn new_lookuper(&self) -> Result<*mut Lookuper, String> {
+        let _enter = RUNTIME.enter();
+
+        let fluss_table = fcore::client::FlussTable::new(
+            &self.connection,
+            self.metadata.clone(),
+            self.table_info.clone(),
+        );
+
+        let table_lookup = fluss_table
+            .new_lookup()
+            .map_err(|e| format!("Failed to create lookup: {e}"))?;
+
+        let lookuper = table_lookup
+            .create_lookuper()
+            .map_err(|e| format!("Failed to create lookuper: {e}"))?;
+
+        Ok(Box::into_raw(Box::new(Lookuper {
+            inner: lookuper,
+            table_info: self.table_info.clone(),
+        })))
+    }
 }
 
 // AppendWriter implementation
@@ -1095,6 +1240,137 @@ impl WriteResult {
     }
 }
 
+// UpsertWriter implementation
+unsafe fn delete_upsert_writer(writer: *mut UpsertWriter) {
+    if !writer.is_null() {
+        unsafe {
+            drop(Box::from_raw(writer));
+        }
+    }
+}
+
+impl UpsertWriter {
+    /// Pad row with Null to full schema width.
+    /// This allows callers to only set the fields they care about.
+    fn pad_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) -> 
fcore::row::GenericRow<'a> {
+        let num_columns = self.table_info.get_schema().columns().len();
+        if row.values.len() < num_columns {
+            row.values.resize(num_columns, fcore::row::Datum::Null);
+        }
+        row
+    }
+
+    fn upsert(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>, 
String> {
+        let schema = self.table_info.get_schema();
+        let generic_row = types::ffi_row_to_core(row, 
Some(schema)).map_err(|e| e.to_string())?;
+        let generic_row = self.pad_row(generic_row);
+
+        let result_future = self
+            .inner
+            .upsert(&generic_row)
+            .map_err(|e| format!("Failed to upsert: {e}"))?;
+
+        Ok(Box::new(WriteResult {
+            inner: Some(result_future),
+        }))
+    }
+
+    fn delete_row(&mut self, row: &ffi::FfiGenericRow) -> 
Result<Box<WriteResult>, String> {
+        let schema = self.table_info.get_schema();
+        let generic_row = types::ffi_row_to_core(row, 
Some(schema)).map_err(|e| e.to_string())?;
+        let generic_row = self.pad_row(generic_row);
+
+        let result_future = self
+            .inner
+            .delete(&generic_row)
+            .map_err(|e| format!("Failed to delete: {e}"))?;
+
+        Ok(Box::new(WriteResult {
+            inner: Some(result_future),
+        }))
+    }
+
+    fn upsert_flush(&mut self) -> ffi::FfiResult {
+        let result = RUNTIME.block_on(async { self.inner.flush().await });
+
+        match result {
+            Ok(_) => ok_result(),
+            Err(e) => err_result(1, e.to_string()),
+        }
+    }
+}
+
+// Lookuper implementation
+unsafe fn delete_lookuper(lookuper: *mut Lookuper) {
+    if !lookuper.is_null() {
+        unsafe {
+            drop(Box::from_raw(lookuper));
+        }
+    }
+}
+
+impl Lookuper {
+    /// Pad row with Null to full schema width (same as UpsertWriter::pad_row).
+    /// Ensures the PK row is always full-width, matching Python's behavior.
+    fn pad_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) -> 
fcore::row::GenericRow<'a> {
+        let num_columns = self.table_info.get_schema().columns().len();
+        if row.values.len() < num_columns {
+            row.values.resize(num_columns, fcore::row::Datum::Null);
+        }
+        row
+    }
+
+    fn lookup(&mut self, pk_row: &ffi::FfiGenericRow) -> ffi::FfiLookupResult {
+        let schema = self.table_info.get_schema();
+        let generic_row = match types::ffi_row_to_core(pk_row, Some(schema)) {
+            Ok(r) => self.pad_row(r),
+            Err(e) => {
+                return ffi::FfiLookupResult {
+                    result: err_result(1, e.to_string()),
+                    found: false,
+                    row: ffi::FfiGenericRow { fields: vec![] },
+                };
+            }
+        };
+
+        let lookup_result = match 
RUNTIME.block_on(self.inner.lookup(&generic_row)) {
+            Ok(r) => r,
+            Err(e) => {
+                return ffi::FfiLookupResult {
+                    result: err_result(1, e.to_string()),
+                    found: false,
+                    row: ffi::FfiGenericRow { fields: vec![] },
+                };
+            }
+        };
+
+        match lookup_result.get_single_row() {
+            Ok(Some(row)) => match types::internal_row_to_ffi_row(&row, 
&self.table_info) {
+                Ok(ffi_row) => ffi::FfiLookupResult {
+                    result: ok_result(),
+                    found: true,
+                    row: ffi_row,
+                },
+                Err(e) => ffi::FfiLookupResult {
+                    result: err_result(1, e.to_string()),
+                    found: false,
+                    row: ffi::FfiGenericRow { fields: vec![] },
+                },
+            },
+            Ok(None) => ffi::FfiLookupResult {
+                result: ok_result(),
+                found: false,
+                row: ffi::FfiGenericRow { fields: vec![] },
+            },
+            Err(e) => ffi::FfiLookupResult {
+                result: err_result(1, e.to_string()),
+                found: false,
+                row: ffi::FfiGenericRow { fields: vec![] },
+            },
+        }
+    }
+}
+
 // LogScanner implementation
 unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
     if !scanner.is_null() {
@@ -1258,13 +1534,18 @@ impl LogScanner {
             let result = RUNTIME.block_on(async { inner.poll(timeout).await });
 
             match result {
-                Ok(records) => ffi::FfiScanRecordsResult {
-                    result: ok_result(),
-                    scan_records: types::core_scan_records_to_ffi(
-                        &records,
-                        &self.projected_columns,
-                    ),
-                },
+                Ok(records) => {
+                    match types::core_scan_records_to_ffi(&records, 
&self.projected_columns) {
+                        Ok(scan_records) => ffi::FfiScanRecordsResult {
+                            result: ok_result(),
+                            scan_records,
+                        },
+                        Err(e) => ffi::FfiScanRecordsResult {
+                            result: err_result(1, e.to_string()),
+                            scan_records: ffi::FfiScanRecords { records: 
vec![] },
+                        },
+                    }
+                }
                 Err(e) => ffi::FfiScanRecordsResult {
                     result: err_result(1, e.to_string()),
                     scan_records: ffi::FfiScanRecords { records: vec![] },
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index 4425b5f..5b2f66c 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -92,12 +92,16 @@ void Table::Destroy() noexcept {
     }
 }
 
-Table::Table(Table&& other) noexcept : table_(other.table_) { other.table_ = 
nullptr; }
+Table::Table(Table&& other) noexcept
+    : table_(other.table_), column_map_(std::move(other.column_map_)) {
+    other.table_ = nullptr;
+}
 
 Table& Table::operator=(Table&& other) noexcept {
     if (this != &other) {
         Destroy();
         table_ = other.table_;
+        column_map_ = std::move(other.column_map_);
         other.table_ = nullptr;
     }
     return *this;
@@ -111,7 +115,7 @@ Result Table::NewAppendWriter(AppendWriter& out) {
     }
 
     try {
-        out.writer_ = table_->new_append_writer();
+        out = AppendWriter(table_->new_append_writer());
         return utils::make_ok();
     } catch (const rust::Error& e) {
         return utils::make_error(1, e.what());
@@ -177,6 +181,24 @@ Result TableScan::CreateRecordBatchScanner(LogScanner& 
out) {
     }
 }
 
+const std::shared_ptr<GenericRow::ColumnMap>& Table::GetColumnMap() const {
+    if (!column_map_ && Available()) {
+        auto info = GetTableInfo();
+        column_map_ = std::make_shared<GenericRow::ColumnMap>();
+        for (size_t i = 0; i < info.schema.columns.size(); ++i) {
+            (*column_map_)[info.schema.columns[i].name] = {i,
+                                                           
info.schema.columns[i].data_type.id()};
+        }
+    }
+    return column_map_;
+}
+
+GenericRow Table::NewRow() const {
+    GenericRow row;
+    row.column_map_ = GetColumnMap();
+    return row;
+}
+
 TableInfo Table::GetTableInfo() const {
     if (!Available()) {
         return TableInfo{};
@@ -281,7 +303,7 @@ Result AppendWriter::Append(const GenericRow& row, 
WriteResult& out) {
     try {
         auto ffi_row = utils::to_ffi_generic_row(row);
         auto rust_box = writer_->append(ffi_row);
-        out.inner_ = rust_box.into_raw();
+        out = WriteResult(rust_box.into_raw());
         return utils::make_ok();
     } catch (const rust::Error& e) {
         return utils::make_error(1, e.what());
@@ -299,6 +321,213 @@ Result AppendWriter::Flush() {
     return utils::from_ffi_result(ffi_result);
 }
 
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::UpsertWriter(ffi::UpsertWriter* writer) noexcept : 
writer_(writer) {}
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+    if (writer_) {
+        ffi::delete_upsert_writer(writer_);
+        writer_ = nullptr;
+    }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept : 
writer_(other.writer_) {
+    other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        writer_ = other.writer_;
+        other.writer_ = nullptr;
+    }
+    return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+    WriteResult wr;
+    return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->upsert(ffi_row);
+        out = WriteResult(rust_box.into_raw());
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+    WriteResult wr;
+    return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(row);
+        auto rust_box = writer_->delete_row(ffi_row);
+        out = WriteResult(rust_box.into_raw());
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result UpsertWriter::Flush() {
+    if (!Available()) {
+        return utils::make_error(1, "UpsertWriter not available");
+    }
+
+    auto ffi_result = writer_->upsert_flush();
+    return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::Lookuper(ffi::Lookuper* lookuper) noexcept : lookuper_(lookuper) {}
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+    if (lookuper_) {
+        ffi::delete_lookuper(lookuper_);
+        lookuper_ = nullptr;
+    }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+    other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+    if (this != &other) {
+        Destroy();
+        lookuper_ = other.lookuper_;
+        other.lookuper_ = nullptr;
+    }
+    return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow& 
out) {
+    if (!Available()) {
+        return utils::make_error(1, "Lookuper not available");
+    }
+
+    try {
+        auto ffi_row = utils::to_ffi_generic_row(pk_row);
+        auto ffi_result = lookuper_->lookup(ffi_row);
+        auto result = utils::from_ffi_result(ffi_result.result);
+        if (!result.Ok()) {
+            found = false;
+            return result;
+        }
+        found = ffi_result.found;
+        if (found) {
+            out = utils::from_ffi_generic_row(ffi_result.row);
+        }
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        found = false;
+        return utils::make_error(1, e.what());
+    }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out = UpsertWriter(table_->new_upsert_writer());
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const 
std::vector<std::string>& column_names) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<rust::String> rust_names;
+        for (const auto& name : column_names) {
+            rust_names.push_back(rust::String(name));
+        }
+        out = 
UpsertWriter(table_->new_upsert_writer_with_column_names(std::move(rust_names)));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>& 
column_indices) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        rust::Vec<size_t> rust_indices;
+        for (size_t idx : column_indices) {
+            rust_indices.push_back(idx);
+        }
+        out = 
UpsertWriter(table_->new_upsert_writer_with_column_indices(std::move(rust_indices)));
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
+Result Table::NewLookuper(Lookuper& out) {
+    if (!Available()) {
+        return utils::make_error(1, "Table not available");
+    }
+
+    try {
+        out = Lookuper(table_->new_lookuper());
+        return utils::make_ok();
+    } catch (const rust::Error& e) {
+        return utils::make_error(1, e.what());
+    } catch (const std::exception& e) {
+        return utils::make_error(1, e.what());
+    }
+}
+
 // LogScanner implementation
 LogScanner::LogScanner() noexcept = default;
 
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 65b9b04..17aa872 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -316,7 +316,22 @@ pub fn ffi_row_to_core<'a>(
         let datum = match field.datum_type {
             DATUM_TYPE_NULL => Datum::Null,
             DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
-            DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+            DATUM_TYPE_INT32 => match schema
+                .and_then(|s| s.columns().get(idx))
+                .map(|c| c.data_type())
+            {
+                Some(fcore::metadata::DataType::TinyInt(_)) => {
+                    Datum::Int8(i8::try_from(field.i32_val).map_err(|_| {
+                        anyhow!("Column {idx}: {} overflows TinyInt", 
field.i32_val)
+                    })?)
+                }
+                Some(fcore::metadata::DataType::SmallInt(_)) => {
+                    Datum::Int16(i16::try_from(field.i32_val).map_err(|_| {
+                        anyhow!("Column {idx}: {} overflows SmallInt", 
field.i32_val)
+                    })?)
+                }
+                _ => Datum::Int32(field.i32_val),
+            },
             DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
             DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
             DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
@@ -360,11 +375,11 @@ pub fn ffi_row_to_core<'a>(
             DATUM_TYPE_TIME => 
Datum::Time(fcore::row::Time::new(field.i32_val)),
             DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz(
                 fcore::row::TimestampNtz::from_millis_nanos(field.i64_val, 
field.i32_val)
-                    .unwrap_or_else(|_| 
fcore::row::TimestampNtz::new(field.i64_val)),
+                    .map_err(|e| anyhow!("Column {idx}: {e}"))?,
             ),
             DATUM_TYPE_TIMESTAMP_LTZ => Datum::TimestampLtz(
                 fcore::row::TimestampLtz::from_millis_nanos(field.i64_val, 
field.i32_val)
-                    .unwrap_or_else(|_| 
fcore::row::TimestampLtz::new(field.i64_val)),
+                    .map_err(|e| anyhow!("Column {idx}: {e}"))?,
             ),
             other => return Err(anyhow!("Column {idx}: unknown datum type 
{other}")),
         };
@@ -377,7 +392,7 @@ pub fn ffi_row_to_core<'a>(
 pub fn core_scan_records_to_ffi(
     records: &fcore::record::ScanRecords,
     columns: &[fcore::metadata::Column],
-) -> ffi::FfiScanRecords {
+) -> Result<ffi::FfiScanRecords> {
     let mut ffi_records = Vec::new();
 
     // Iterate over all buckets and their records
@@ -385,7 +400,7 @@ pub fn core_scan_records_to_ffi(
         let bucket_id = table_bucket.bucket_id();
         for record in bucket_records {
             let row = record.row();
-            let fields = core_row_to_ffi_fields(row, columns);
+            let fields = core_row_to_ffi_fields(row, columns)?;
 
             ffi_records.push(ffi::FfiScanRecord {
                 bucket_id,
@@ -396,32 +411,15 @@ pub fn core_scan_records_to_ffi(
         }
     }
 
-    ffi::FfiScanRecords {
+    Ok(ffi::FfiScanRecords {
         records: ffi_records,
-    }
+    })
 }
 
 fn core_row_to_ffi_fields(
     row: &fcore::row::ColumnarRow,
     columns: &[fcore::metadata::Column],
-) -> Vec<ffi::FfiDatum> {
-    fn new_datum(datum_type: i32) -> ffi::FfiDatum {
-        ffi::FfiDatum {
-            datum_type,
-            bool_val: false,
-            i32_val: 0,
-            i64_val: 0,
-            f32_val: 0.0,
-            f64_val: 0.0,
-            string_val: String::new(),
-            bytes_val: vec![],
-            decimal_precision: 0,
-            decimal_scale: 0,
-            i128_hi: 0,
-            i128_lo: 0,
-        }
-    }
-
+) -> Result<Vec<ffi::FfiDatum>> {
     let record_batch = row.get_record_batch();
     let schema = record_batch.schema();
     let row_id = row.get_row_id();
@@ -430,124 +428,135 @@ fn core_row_to_ffi_fields(
 
     for (i, field) in schema.fields().iter().enumerate() {
         if row.is_null_at(i) {
-            fields.push(new_datum(DATUM_TYPE_NULL));
+            fields.push(ffi::FfiDatum::default());
             continue;
         }
 
         let datum = match field.data_type() {
-            ArrowDataType::Boolean => {
-                let mut datum = new_datum(DATUM_TYPE_BOOL);
-                datum.bool_val = row.get_boolean(i);
-                datum
-            }
-            ArrowDataType::Int8 => {
-                let mut datum = new_datum(DATUM_TYPE_INT32);
-                datum.i32_val = row.get_byte(i) as i32;
-                datum
-            }
-            ArrowDataType::Int16 => {
-                let mut datum = new_datum(DATUM_TYPE_INT32);
-                datum.i32_val = row.get_short(i) as i32;
-                datum
-            }
-            ArrowDataType::Int32 => {
-                let mut datum = new_datum(DATUM_TYPE_INT32);
-                datum.i32_val = row.get_int(i);
-                datum
-            }
-            ArrowDataType::Int64 => {
-                let mut datum = new_datum(DATUM_TYPE_INT64);
-                datum.i64_val = row.get_long(i);
-                datum
-            }
-            ArrowDataType::Float32 => {
-                let mut datum = new_datum(DATUM_TYPE_FLOAT32);
-                datum.f32_val = row.get_float(i);
-                datum
-            }
-            ArrowDataType::Float64 => {
-                let mut datum = new_datum(DATUM_TYPE_FLOAT64);
-                datum.f64_val = row.get_double(i);
-                datum
-            }
-            ArrowDataType::Utf8 => {
-                let mut datum = new_datum(DATUM_TYPE_STRING);
-                // todo: avoid copy string
-                datum.string_val = row.get_string(i).to_string();
-                datum
-            }
+            ArrowDataType::Boolean => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_BOOL,
+                bool_val: row.get_boolean(i),
+                ..Default::default()
+            },
+            ArrowDataType::Int8 => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT32,
+                i32_val: row.get_byte(i) as i32,
+                ..Default::default()
+            },
+            ArrowDataType::Int16 => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT32,
+                i32_val: row.get_short(i) as i32,
+                ..Default::default()
+            },
+            ArrowDataType::Int32 => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT32,
+                i32_val: row.get_int(i),
+                ..Default::default()
+            },
+            ArrowDataType::Int64 => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT64,
+                i64_val: row.get_long(i),
+                ..Default::default()
+            },
+            ArrowDataType::Float32 => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_FLOAT32,
+                f32_val: row.get_float(i),
+                ..Default::default()
+            },
+            ArrowDataType::Float64 => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_FLOAT64,
+                f64_val: row.get_double(i),
+                ..Default::default()
+            },
+            ArrowDataType::Utf8 => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_STRING,
+                string_val: row.get_string(i).to_string(),
+                ..Default::default()
+            },
             ArrowDataType::LargeUtf8 => {
                 let array = record_batch
                     .column(i)
                     .as_any()
                     .downcast_ref::<LargeStringArray>()
-                    .expect("LargeUtf8 column expected");
-                let mut datum = new_datum(DATUM_TYPE_STRING);
-                datum.string_val = array.value(row_id).to_string();
-                datum
-            }
-            ArrowDataType::Binary => {
-                let mut datum = new_datum(DATUM_TYPE_BYTES);
-                // todo: avoid copy bytes for blob
-                datum.bytes_val = row.get_bytes(i).to_vec();
-                datum
-            }
-            ArrowDataType::FixedSizeBinary(len) => {
-                let mut datum = new_datum(DATUM_TYPE_BYTES);
-                datum.bytes_val = row.get_binary(i, *len as usize).to_vec();
-                datum
+                    .ok_or_else(|| anyhow!("Column {i}: expected LargeUtf8 
array"))?;
+                ffi::FfiDatum {
+                    datum_type: DATUM_TYPE_STRING,
+                    string_val: array.value(row_id).to_string(),
+                    ..Default::default()
+                }
             }
+            ArrowDataType::Binary => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_BYTES,
+                bytes_val: row.get_bytes(i).to_vec(),
+                ..Default::default()
+            },
+            ArrowDataType::FixedSizeBinary(len) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_BYTES,
+                bytes_val: row.get_binary(i, *len as usize).to_vec(),
+                ..Default::default()
+            },
             ArrowDataType::LargeBinary => {
                 let array = record_batch
                     .column(i)
                     .as_any()
                     .downcast_ref::<LargeBinaryArray>()
-                    .expect("LargeBinary column expected");
-                let mut datum = new_datum(DATUM_TYPE_BYTES);
-                datum.bytes_val = array.value(row_id).to_vec();
-                datum
+                    .ok_or_else(|| anyhow!("Column {i}: expected LargeBinary 
array"))?;
+                ffi::FfiDatum {
+                    datum_type: DATUM_TYPE_BYTES,
+                    bytes_val: array.value(row_id).to_vec(),
+                    ..Default::default()
+                }
             }
             ArrowDataType::Date32 => {
                 let array = record_batch
                     .column(i)
                     .as_any()
                     .downcast_ref::<Date32Array>()
-                    .expect("Date32 column expected");
-                let mut datum = new_datum(DATUM_TYPE_DATE);
-                datum.i32_val = array.value(row_id);
-                datum
+                    .ok_or_else(|| anyhow!("Column {i}: expected Date32 
array"))?;
+                ffi::FfiDatum {
+                    datum_type: DATUM_TYPE_DATE,
+                    i32_val: array.value(row_id),
+                    ..Default::default()
+                }
             }
             ArrowDataType::Timestamp(unit, _tz) => {
                 let datum_type = match columns.get(i).map(|c| c.data_type()) {
                     Some(fcore::metadata::DataType::TimestampLTz(_)) => 
DATUM_TYPE_TIMESTAMP_LTZ,
                     _ => DATUM_TYPE_TIMESTAMP_NTZ,
                 };
-                let mut datum = new_datum(datum_type);
+                let mut datum = ffi::FfiDatum {
+                    datum_type,
+                    ..Default::default()
+                };
                 match unit {
                     TimeUnit::Second => {
                         let array = record_batch
                             .column(i)
                             .as_any()
                             .downcast_ref::<TimestampSecondArray>()
-                            .expect("Timestamp(second) column expected");
+                            .ok_or_else(|| {
+                                anyhow!("Column {i}: expected 
Timestamp(second) array")
+                            })?;
                         datum.i64_val = array.value(row_id) * 
MILLIS_PER_SECOND;
-                        datum.i32_val = 0;
                     }
                     TimeUnit::Millisecond => {
                         let array = record_batch
                             .column(i)
                             .as_any()
                             .downcast_ref::<TimestampMillisecondArray>()
-                            .expect("Timestamp(millisecond) column expected");
+                            .ok_or_else(|| {
+                                anyhow!("Column {i}: expected 
Timestamp(millisecond) array")
+                            })?;
                         datum.i64_val = array.value(row_id);
-                        datum.i32_val = 0;
                     }
                     TimeUnit::Microsecond => {
                         let array = record_batch
                             .column(i)
                             .as_any()
                             .downcast_ref::<TimestampMicrosecondArray>()
-                            .expect("Timestamp(microsecond) column expected");
+                            .ok_or_else(|| {
+                                anyhow!("Column {i}: expected 
Timestamp(microsecond) array")
+                            })?;
                         let micros = array.value(row_id);
                         datum.i64_val = micros.div_euclid(MICROS_PER_MILLI);
                         datum.i32_val =
@@ -558,7 +567,9 @@ fn core_row_to_ffi_fields(
                             .column(i)
                             .as_any()
                             .downcast_ref::<TimestampNanosecondArray>()
-                            .expect("Timestamp(nanosecond) column expected");
+                            .ok_or_else(|| {
+                                anyhow!("Column {i}: expected 
Timestamp(nanosecond) array")
+                            })?;
                         let nanos = array.value(row_id);
                         datum.i64_val = nanos.div_euclid(NANOS_PER_MILLI);
                         datum.i32_val = nanos.rem_euclid(NANOS_PER_MILLI) as 
i32;
@@ -572,22 +583,26 @@ fn core_row_to_ffi_fields(
                         .column(i)
                         .as_any()
                         .downcast_ref::<Time32SecondArray>()
-                        .expect("Time32(second) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_TIME);
-                    datum.i32_val = array.value(row_id) * MILLIS_PER_SECOND as 
i32;
-                    datum
+                        .ok_or_else(|| anyhow!("Column {i}: expected 
Time32(second) array"))?;
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_TIME,
+                        i32_val: array.value(row_id) * MILLIS_PER_SECOND as 
i32,
+                        ..Default::default()
+                    }
                 }
                 TimeUnit::Millisecond => {
                     let array = record_batch
                         .column(i)
                         .as_any()
                         .downcast_ref::<Time32MillisecondArray>()
-                        .expect("Time32(millisecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_TIME);
-                    datum.i32_val = array.value(row_id);
-                    datum
+                        .ok_or_else(|| anyhow!("Column {i}: expected 
Time32(millisecond) array"))?;
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_TIME,
+                        i32_val: array.value(row_id),
+                        ..Default::default()
+                    }
                 }
-                _ => panic!("Will never come here. Unsupported Time32 unit for 
column {i}"),
+                _ => return Err(anyhow!("Column {i}: unsupported Time32 
unit")),
             },
             ArrowDataType::Time64(unit) => match unit {
                 TimeUnit::Microsecond => {
@@ -595,55 +610,210 @@ fn core_row_to_ffi_fields(
                         .column(i)
                         .as_any()
                         .downcast_ref::<Time64MicrosecondArray>()
-                        .expect("Time64(microsecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_TIME);
-                    datum.i32_val = (array.value(row_id) / MICROS_PER_MILLI) 
as i32;
-                    datum
+                        .ok_or_else(|| anyhow!("Column {i}: expected 
Time64(microsecond) array"))?;
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_TIME,
+                        i32_val: (array.value(row_id) / MICROS_PER_MILLI) as 
i32,
+                        ..Default::default()
+                    }
                 }
                 TimeUnit::Nanosecond => {
                     let array = record_batch
                         .column(i)
                         .as_any()
                         .downcast_ref::<Time64NanosecondArray>()
-                        .expect("Time64(nanosecond) column expected");
-                    let mut datum = new_datum(DATUM_TYPE_TIME);
-                    datum.i32_val = (array.value(row_id) / NANOS_PER_MILLI) as 
i32;
-                    datum
+                        .ok_or_else(|| anyhow!("Column {i}: expected 
Time64(nanosecond) array"))?;
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_TIME,
+                        i32_val: (array.value(row_id) / NANOS_PER_MILLI) as 
i32,
+                        ..Default::default()
+                    }
                 }
-                _ => panic!("Will never come here. Unsupported Time64 unit for 
column {i}"),
+                _ => return Err(anyhow!("Column {i}: unsupported Time64 
unit")),
             },
             ArrowDataType::Decimal128(precision, scale) => {
                 let array = record_batch
                     .column(i)
                     .as_any()
                     .downcast_ref::<Decimal128Array>()
-                    .expect("Decimal128 column expected");
+                    .ok_or_else(|| anyhow!("Column {i}: expected Decimal128 
array"))?;
                 let i128_val = array.value(row_id);
 
                 if fcore::row::Decimal::is_compact_precision(*precision as 
u32) {
-                    let mut datum = new_datum(DATUM_TYPE_DECIMAL_I64);
-                    datum.i64_val = i128_val as i64;
-                    datum.decimal_precision = *precision as i32;
-                    datum.decimal_scale = *scale as i32;
-                    datum
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_DECIMAL_I64,
+                        i64_val: i128_val as i64,
+                        decimal_precision: *precision as i32,
+                        decimal_scale: *scale as i32,
+                        ..Default::default()
+                    }
                 } else {
-                    let mut datum = new_datum(DATUM_TYPE_DECIMAL_I128);
-                    datum.i128_hi = (i128_val >> 64) as i64;
-                    datum.i128_lo = i128_val as i64;
-                    datum.decimal_precision = *precision as i32;
-                    datum.decimal_scale = *scale as i32;
-                    datum
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_DECIMAL_I128,
+                        i128_hi: (i128_val >> 64) as i64,
+                        i128_lo: i128_val as i64,
+                        decimal_precision: *precision as i32,
+                        decimal_scale: *scale as i32,
+                        ..Default::default()
+                    }
                 }
             }
-            other => panic!(
-                "Will never come here. Unsupported Arrow data type for column 
{i}: {other:?}"
-            ),
+            other => return Err(anyhow!("Column {i}: unsupported Arrow data 
type {other:?}")),
+        };
+
+        fields.push(datum);
+    }
+
+    Ok(fields)
+}
+
+impl Default for ffi::FfiDatum {
+    fn default() -> Self {
+        Self {
+            datum_type: DATUM_TYPE_NULL,
+            bool_val: false,
+            i32_val: 0,
+            i64_val: 0,
+            f32_val: 0.0,
+            f64_val: 0.0,
+            string_val: String::new(),
+            bytes_val: vec![],
+            decimal_precision: 0,
+            decimal_scale: 0,
+            i128_hi: 0,
+            i128_lo: 0,
+        }
+    }
+}
+
+/// Convert any InternalRow to FfiGenericRow using Fluss schema metadata.
+/// Used for lookup results (CompactedRow) where Arrow schema is unavailable.
+pub fn internal_row_to_ffi_row(
+    row: &dyn fcore::row::InternalRow,
+    table_info: &fcore::metadata::TableInfo,
+) -> Result<ffi::FfiGenericRow> {
+    let schema = table_info.get_schema();
+    let columns = schema.columns();
+    let mut fields = Vec::with_capacity(columns.len());
+
+    for (i, col) in columns.iter().enumerate() {
+        if row.is_null_at(i) {
+            fields.push(ffi::FfiDatum::default());
+            continue;
+        }
+
+        let datum = match col.data_type() {
+            fcore::metadata::DataType::Boolean(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_BOOL,
+                bool_val: row.get_boolean(i),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::TinyInt(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT32,
+                i32_val: row.get_byte(i) as i32,
+                ..Default::default()
+            },
+            fcore::metadata::DataType::SmallInt(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT32,
+                i32_val: row.get_short(i) as i32,
+                ..Default::default()
+            },
+            fcore::metadata::DataType::Int(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT32,
+                i32_val: row.get_int(i),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::BigInt(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_INT64,
+                i64_val: row.get_long(i),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::Float(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_FLOAT32,
+                f32_val: row.get_float(i),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::Double(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_FLOAT64,
+                f64_val: row.get_double(i),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::String(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_STRING,
+                string_val: row.get_string(i).to_string(),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::Bytes(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_BYTES,
+                bytes_val: row.get_bytes(i).to_vec(),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::Date(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_DATE,
+                i32_val: row.get_date(i).get_inner(),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::Time(_) => ffi::FfiDatum {
+                datum_type: DATUM_TYPE_TIME,
+                i32_val: row.get_time(i).get_inner(),
+                ..Default::default()
+            },
+            fcore::metadata::DataType::Timestamp(dt) => {
+                let ts = row.get_timestamp_ntz(i, dt.precision());
+                ffi::FfiDatum {
+                    datum_type: DATUM_TYPE_TIMESTAMP_NTZ,
+                    i64_val: ts.get_millisecond(),
+                    i32_val: ts.get_nano_of_millisecond(),
+                    ..Default::default()
+                }
+            }
+            fcore::metadata::DataType::TimestampLTz(dt) => {
+                let ts = row.get_timestamp_ltz(i, dt.precision());
+                ffi::FfiDatum {
+                    datum_type: DATUM_TYPE_TIMESTAMP_LTZ,
+                    i64_val: ts.get_epoch_millisecond(),
+                    i32_val: ts.get_nano_of_millisecond(),
+                    ..Default::default()
+                }
+            }
+            fcore::metadata::DataType::Decimal(dt) => {
+                let precision = dt.precision();
+                let scale = dt.scale();
+                let decimal = row.get_decimal(i, precision as usize, scale as 
usize);
+                if fcore::row::Decimal::is_compact_precision(precision) {
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_DECIMAL_I64,
+                        i64_val: decimal.to_unscaled_long().map_err(|e| {
+                            anyhow!("Column {i}: compact decimal conversion 
failed: {e}")
+                        })?,
+                        decimal_precision: precision as i32,
+                        decimal_scale: scale as i32,
+                        ..Default::default()
+                    }
+                } else {
+                    let bd = decimal.to_big_decimal();
+                    let (unscaled, _) = bd.into_bigint_and_exponent();
+                    use bigdecimal::ToPrimitive;
+                    let i128_val = unscaled.to_i128().ok_or_else(|| {
+                        anyhow!("Column {i}: decimal unscaled value does not 
fit in i128")
+                    })?;
+                    ffi::FfiDatum {
+                        datum_type: DATUM_TYPE_DECIMAL_I128,
+                        i128_hi: (i128_val >> 64) as i64,
+                        i128_lo: i128_val as i64,
+                        decimal_precision: precision as i32,
+                        decimal_scale: scale as i32,
+                        ..Default::default()
+                    }
+                }
+            }
+            other => return Err(anyhow!("Unsupported data type for column {i}: 
{other:?}")),
         };
 
         fields.push(datum);
     }
 
-    fields
+    Ok(ffi::FfiGenericRow { fields })
 }
 
 pub fn core_lake_snapshot_to_ffi(snapshot: &fcore::metadata::LakeSnapshot) -> 
ffi::FfiLakeSnapshot {

Reply via email to