This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 61aa7e1 feat: support kv tables in CPP (#288)
61aa7e1 is described below
commit 61aa7e1d4abbd6008fd84d3d1c3bfe095b36c85a
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Feb 9 12:41:14 2026 +0000
feat: support kv tables in CPP (#288)
---
bindings/cpp/BUILD.bazel | 33 +++
bindings/cpp/CMakeLists.txt | 6 +
bindings/cpp/examples/admin_example.cpp | 5 +-
bindings/cpp/examples/kv_example.cpp | 488 ++++++++++++++++++++++++++++++++
bindings/cpp/include/fluss.hpp | 134 ++++++++-
bindings/cpp/src/ffi_converter.hpp | 3 +-
bindings/cpp/src/lib.rs | 295 ++++++++++++++++++-
bindings/cpp/src/table.cpp | 235 ++++++++++++++-
bindings/cpp/src/types.rs | 432 +++++++++++++++++++---------
9 files changed, 1483 insertions(+), 148 deletions(-)
diff --git a/bindings/cpp/BUILD.bazel b/bindings/cpp/BUILD.bazel
index aff8f50..0ae2ce3 100644
--- a/bindings/cpp/BUILD.bazel
+++ b/bindings/cpp/BUILD.bazel
@@ -373,3 +373,36 @@ cc_binary(
visibility = ["//visibility:public"],
)
+cc_binary(
+ name = "fluss_cpp_kv_example",
+ srcs = [
+ "examples/kv_example.cpp",
+ ],
+ deps = [":fluss_cpp"],
+ copts = [
+ "-std=c++17",
+ ] + select({
+ ":debug_mode": [
+ "-g3",
+ "-O0",
+ "-ggdb",
+ "-fno-omit-frame-pointer",
+ "-DDEBUG",
+ ],
+ ":fastbuild_mode": [
+ "-g",
+ "-O0",
+ ],
+ ":release_mode": [
+ "-O2",
+ "-DNDEBUG",
+ ],
+ }),
+ linkopts = select({
+ ":debug_mode": ["-g"],
+ ":fastbuild_mode": ["-g"],
+ ":release_mode": [],
+ }),
+ visibility = ["//visibility:public"],
+)
+
diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt
index ae70842..05c58ea 100644
--- a/bindings/cpp/CMakeLists.txt
+++ b/bindings/cpp/CMakeLists.txt
@@ -108,6 +108,12 @@ target_link_libraries(fluss_cpp_admin_example PRIVATE
Arrow::arrow_shared)
target_compile_definitions(fluss_cpp_admin_example PRIVATE ARROW_FOUND)
target_include_directories(fluss_cpp_admin_example PUBLIC ${CPP_INCLUDE_DIR})
+add_executable(fluss_cpp_kv_example examples/kv_example.cpp)
+target_link_libraries(fluss_cpp_kv_example PRIVATE fluss_cpp)
+target_link_libraries(fluss_cpp_kv_example PRIVATE Arrow::arrow_shared)
+target_compile_definitions(fluss_cpp_kv_example PRIVATE ARROW_FOUND)
+target_include_directories(fluss_cpp_kv_example PUBLIC ${CPP_INCLUDE_DIR})
+
set_target_properties(fluss_cpp
PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
)
diff --git a/bindings/cpp/examples/admin_example.cpp
b/bindings/cpp/examples/admin_example.cpp
index 7b7a333..196fe97 100644
--- a/bindings/cpp/examples/admin_example.cpp
+++ b/bindings/cpp/examples/admin_example.cpp
@@ -61,9 +61,8 @@ int main() {
fluss::DatabaseInfo db_info;
check("get_database_info", admin.GetDatabaseInfo(db_name, db_info));
- std::cout << "Database info: name=" << db_info.database_name
- << " comment=" << db_info.comment << " created_time=" <<
db_info.created_time
- << std::endl;
+ std::cout << "Database info: name=" << db_info.database_name << "
comment=" << db_info.comment
+ << " created_time=" << db_info.created_time << std::endl;
std::vector<std::string> databases;
check("list_databases", admin.ListDatabases(databases));
diff --git a/bindings/cpp/examples/kv_example.cpp
b/bindings/cpp/examples/kv_example.cpp
new file mode 100644
index 0000000..daebfb2
--- /dev/null
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -0,0 +1,488 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <string>
+#include <vector>
+
+#include "fluss.hpp"
+
+static void check(const char* step, const fluss::Result& r) {
+ if (!r.Ok()) {
+ std::cerr << step << " failed: code=" << r.error_code << " msg=" <<
r.error_message
+ << std::endl;
+ std::exit(1);
+ }
+}
+
+int main() {
+ const std::string bootstrap = "127.0.0.1:9123";
+
+ // 1) Connect and get Admin
+ fluss::Connection conn;
+ check("connect", fluss::Connection::Connect(bootstrap, conn));
+
+ fluss::Admin admin;
+ check("get_admin", conn.GetAdmin(admin));
+
+ fluss::TablePath kv_table_path("fluss", "kv_table_cpp_v1");
+
+ // Drop if exists
+ admin.DropTable(kv_table_path, true);
+
+ // 2) Create a KV table with primary key, including decimal and temporal
types
+ auto kv_schema = fluss::Schema::NewBuilder()
+ .AddColumn("user_id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("email", fluss::DataType::String())
+ .AddColumn("score", fluss::DataType::Float())
+ .AddColumn("balance", fluss::DataType::Decimal(10, 2))
+ .AddColumn("birth_date", fluss::DataType::Date())
+ .AddColumn("login_time", fluss::DataType::Time())
+ .AddColumn("created_at", fluss::DataType::Timestamp())
+ .AddColumn("last_seen",
fluss::DataType::TimestampLtz())
+ .SetPrimaryKeys({"user_id"})
+ .Build();
+
+ auto kv_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(kv_schema)
+ .SetBucketCount(3)
+ .SetComment("cpp kv table example")
+ .Build();
+
+ check("create_kv_table", admin.CreateTable(kv_table_path, kv_descriptor,
false));
+ std::cout << "Created KV table with primary key" << std::endl;
+
+ fluss::Table kv_table;
+ check("get_kv_table", conn.GetTable(kv_table_path, kv_table));
+
+ // 3) Upsert rows using name-based Set()
+ // - Set("balance", "1234.56") auto-routes to SetDecimal (schema-aware)
+ // - Set("created_at", ts) auto-routes to SetTimestampNtz (schema-aware)
+ // - Set("last_seen", ts) auto-routes to SetTimestampLtz (schema-aware)
+ std::cout << "\n--- Upsert Rows ---" << std::endl;
+ fluss::UpsertWriter upsert_writer;
+ check("new_upsert_writer", kv_table.NewUpsertWriter(upsert_writer));
+
+ // Fire-and-forget upserts
+ {
+ auto row = kv_table.NewRow();
+ row.Set("user_id", 1);
+ row.Set("name", "Alice");
+ row.Set("email", "[email protected]");
+ row.Set("score", 95.5f);
+ row.Set("balance", "1234.56");
+ row.Set("birth_date", fluss::Date::FromYMD(1990, 3, 15));
+ row.Set("login_time", fluss::Time::FromHMS(9, 30, 0));
+ row.Set("created_at", fluss::Timestamp::FromMillis(1700000000000));
+ row.Set("last_seen", fluss::Timestamp::FromMillis(1700000060000));
+ check("upsert_1", upsert_writer.Upsert(row));
+ }
+ {
+ auto row = kv_table.NewRow();
+ row.Set("user_id", 2);
+ row.Set("name", "Bob");
+ row.Set("email", "[email protected]");
+ row.Set("score", 87.3f);
+ row.Set("balance", "567.89");
+ row.Set("birth_date", fluss::Date::FromYMD(1985, 7, 22));
+ row.Set("login_time", fluss::Time::FromHMS(14, 15, 30));
+ row.Set("created_at", fluss::Timestamp::FromMillis(1700000100000));
+ row.Set("last_seen", fluss::Timestamp::FromMillis(1700000200000));
+ check("upsert_2", upsert_writer.Upsert(row));
+ }
+
+ // Per-record acknowledgment
+ {
+ auto row = kv_table.NewRow();
+ row.Set("user_id", 3);
+ row.Set("name", "Charlie");
+ row.Set("email", "[email protected]");
+ row.Set("score", 92.0f);
+ row.Set("balance", "99999.99");
+ row.Set("birth_date", fluss::Date::FromYMD(2000, 1, 1));
+ row.Set("login_time", fluss::Time::FromHMS(23, 59, 59));
+ row.Set("created_at", fluss::Timestamp::FromMillis(1700000300000));
+ row.Set("last_seen", fluss::Timestamp::FromMillis(1700000400000));
+ fluss::WriteResult wr;
+ check("upsert_3", upsert_writer.Upsert(row, wr));
+ check("upsert_3_wait", wr.Wait());
+ std::cout << "Upsert acknowledged by server" << std::endl;
+ }
+
+ check("upsert_flush", upsert_writer.Flush());
+ std::cout << "Upserted 3 rows" << std::endl;
+
+ // 4) Lookup by primary key — verify all types round-trip
+ std::cout << "\n--- Lookup by Primary Key ---" << std::endl;
+ fluss::Lookuper lookuper;
+ check("new_lookuper", kv_table.NewLookuper(lookuper));
+
+ // Lookup existing key
+ {
+ auto pk_row = kv_table.NewRow();
+ pk_row.Set("user_id", 1);
+
+ bool found = false;
+ fluss::GenericRow result_row;
+ check("lookup_1", lookuper.Lookup(pk_row, found, result_row));
+ if (found) {
+ auto date = result_row.GetDate(5);
+ auto time = result_row.GetTime(6);
+ auto created = result_row.GetTimestamp(7);
+ auto seen = result_row.GetTimestamp(8);
+ std::cout << "Found user_id=1:"
+ << "\n name=" << result_row.GetString(1)
+ << "\n email=" << result_row.GetString(2)
+ << "\n score=" << result_row.GetFloat32(3)
+ << "\n balance=" << result_row.DecimalToString(4)
+ << "\n birth_date=" << date.Year() << "-" <<
date.Month() << "-"
+ << date.Day() << "\n login_time=" << time.Hour() << ":"
<< time.Minute()
+ << ":" << time.Second() << "\n created_at(ms)=" <<
created.epoch_millis
+ << "\n last_seen(ms)=" << seen.epoch_millis <<
std::endl;
+ } else {
+ std::cerr << "ERROR: Expected to find user_id=1" << std::endl;
+ std::exit(1);
+ }
+ }
+
+ // Lookup non-existing key
+ {
+ auto pk_row = kv_table.NewRow();
+ pk_row.Set("user_id", 999);
+
+ bool found = false;
+ fluss::GenericRow result_row;
+ check("lookup_999", lookuper.Lookup(pk_row, found, result_row));
+ if (!found) {
+ std::cout << "user_id=999 not found (expected)" << std::endl;
+ } else {
+ std::cerr << "ERROR: Expected user_id=999 to not be found" <<
std::endl;
+ std::exit(1);
+ }
+ }
+
+ // 5) Update via upsert (overwrite existing key)
+ std::cout << "\n--- Update via Upsert ---" << std::endl;
+ {
+ auto row = kv_table.NewRow();
+ row.Set("user_id", 1);
+ row.Set("name", "Alice Updated");
+ row.Set("email", "[email protected]");
+ row.Set("score", 99.0f);
+ row.Set("balance", "9999.00");
+ row.Set("birth_date", fluss::Date::FromYMD(1990, 3, 15));
+ row.Set("login_time", fluss::Time::FromHMS(10, 0, 0));
+ row.Set("created_at", fluss::Timestamp::FromMillis(1700000000000));
+ row.Set("last_seen", fluss::Timestamp::FromMillis(1700000500000));
+ fluss::WriteResult wr;
+ check("upsert_update", upsert_writer.Upsert(row, wr));
+ check("upsert_update_wait", wr.Wait());
+ }
+
+ // Verify update
+ {
+ auto pk_row = kv_table.NewRow();
+ pk_row.Set("user_id", 1);
+
+ bool found = false;
+ fluss::GenericRow result_row;
+ check("lookup_updated", lookuper.Lookup(pk_row, found, result_row));
+ if (found && result_row.GetString(1) == "Alice Updated") {
+ std::cout << "Update verified: name=" << result_row.GetString(1)
+ << " balance=" << result_row.DecimalToString(4)
+ << " last_seen(ms)=" <<
result_row.GetTimestamp(8).epoch_millis << std::endl;
+ } else {
+ std::cerr << "ERROR: Update verification failed" << std::endl;
+ std::exit(1);
+ }
+ }
+
+ // 6) Delete by primary key
+ std::cout << "\n--- Delete by Primary Key ---" << std::endl;
+ {
+ auto pk_row = kv_table.NewRow();
+ pk_row.Set("user_id", 2);
+ fluss::WriteResult wr;
+ check("delete_2", upsert_writer.Delete(pk_row, wr));
+ check("delete_2_wait", wr.Wait());
+ std::cout << "Deleted user_id=2" << std::endl;
+ }
+
+ // Verify deletion
+ {
+ auto pk_row = kv_table.NewRow();
+ pk_row.Set("user_id", 2);
+
+ bool found = false;
+ fluss::GenericRow result_row;
+ check("lookup_deleted", lookuper.Lookup(pk_row, found, result_row));
+ if (!found) {
+ std::cout << "Delete verified: user_id=2 not found" << std::endl;
+ } else {
+ std::cerr << "ERROR: Expected user_id=2 to be deleted" <<
std::endl;
+ std::exit(1);
+ }
+ }
+
+ // 7) Partial update by column names
+ std::cout << "\n--- Partial Update by Column Names ---" << std::endl;
+ fluss::UpsertWriter partial_writer;
+ check("new_partial_upsert_writer",
+ kv_table.NewUpsertWriter(partial_writer,
+ std::vector<std::string>{"user_id",
"balance", "last_seen"}));
+
+ {
+ auto row = kv_table.NewRow();
+ row.Set("user_id", 3);
+ row.Set("balance", "50000.00");
+ row.Set("last_seen", fluss::Timestamp::FromMillis(1700000999000));
+ fluss::WriteResult wr;
+ check("partial_upsert", partial_writer.Upsert(row, wr));
+ check("partial_upsert_wait", wr.Wait());
+ std::cout << "Partial update: set balance=50000.00, last_seen for
user_id=3" << std::endl;
+ }
+
+ // Verify partial update (other fields unchanged)
+ {
+ auto pk_row = kv_table.NewRow();
+ pk_row.Set("user_id", 3);
+
+ bool found = false;
+ fluss::GenericRow result_row;
+ check("lookup_partial", lookuper.Lookup(pk_row, found, result_row));
+ if (found) {
+ std::cout << "Partial update verified:"
+ << "\n name=" << result_row.GetString(1) << "
(unchanged)"
+ << "\n balance=" << result_row.DecimalToString(4) << "
(updated)"
+ << "\n last_seen(ms)=" <<
result_row.GetTimestamp(8).epoch_millis
+ << " (updated)" << std::endl;
+ } else {
+ std::cerr << "ERROR: Expected to find user_id=3" << std::endl;
+ std::exit(1);
+ }
+ }
+
+ // 8) Partial update by column indices (using index-based setters for
lower overhead)
+ std::cout << "\n--- Partial Update by Column Indices ---" << std::endl;
+ fluss::UpsertWriter partial_writer_idx;
+ // Columns: 0=user_id (PK), 1=name — update name only
+ check("new_partial_upsert_writer_idx",
+ kv_table.NewUpsertWriter(partial_writer_idx, std::vector<size_t>{0,
1}));
+
+ {
+ // Index-based setters: lighter than name-based, useful for hot paths
+ fluss::GenericRow row;
+ row.SetInt32(0, 3); // user_id (PK)
+ row.SetString(1, "Charlie Updated"); // name
+ fluss::WriteResult wr;
+ check("partial_upsert_idx", partial_writer_idx.Upsert(row, wr));
+ check("partial_upsert_idx_wait", wr.Wait());
+ std::cout << "Partial update by indices: set name='Charlie Updated'
for user_id=3"
+ << std::endl;
+ }
+
+ // Verify: name changed, balance/last_seen unchanged from previous partial
update
+ {
+ auto pk_row = kv_table.NewRow();
+ pk_row.Set("user_id", 3);
+
+ bool found = false;
+ fluss::GenericRow result_row;
+ check("lookup_partial_idx", lookuper.Lookup(pk_row, found,
result_row));
+ if (found) {
+ std::cout << "Partial update by indices verified:"
+ << "\n name=" << result_row.GetString(1) << " (updated)"
+ << "\n balance=" << result_row.DecimalToString(4) << "
(unchanged)"
+ << "\n last_seen(ms)=" <<
result_row.GetTimestamp(8).epoch_millis
+ << " (unchanged)" << std::endl;
+ } else {
+ std::cerr << "ERROR: Expected to find user_id=3" << std::endl;
+ std::exit(1);
+ }
+ }
+
+ // Cleanup
+ check("drop_kv_table", admin.DropTable(kv_table_path, true));
+
+ // 9) Partitioned KV table
+ std::cout << "\n--- Partitioned KV Table ---" << std::endl;
+ fluss::TablePath part_kv_path("fluss", "partitioned_kv_cpp_v1");
+ admin.DropTable(part_kv_path, true);
+
+ auto part_kv_schema = fluss::Schema::NewBuilder()
+ .AddColumn("region", fluss::DataType::String())
+ .AddColumn("user_id", fluss::DataType::Int())
+ .AddColumn("name", fluss::DataType::String())
+ .AddColumn("score", fluss::DataType::BigInt())
+ .SetPrimaryKeys({"region", "user_id"})
+ .Build();
+
+ auto part_kv_descriptor = fluss::TableDescriptor::NewBuilder()
+ .SetSchema(part_kv_schema)
+ .SetPartitionKeys({"region"})
+ .SetComment("partitioned kv table example")
+ .Build();
+
+ check("create_part_kv", admin.CreateTable(part_kv_path,
part_kv_descriptor, false));
+ std::cout << "Created partitioned KV table" << std::endl;
+
+ // Create partitions
+ check("create_US", admin.CreatePartition(part_kv_path, {{"region",
"US"}}));
+ check("create_EU", admin.CreatePartition(part_kv_path, {{"region",
"EU"}}));
+ check("create_APAC", admin.CreatePartition(part_kv_path, {{"region",
"APAC"}}));
+ std::cout << "Created partitions: US, EU, APAC" << std::endl;
+
+ fluss::Table part_kv_table;
+ check("get_part_kv_table", conn.GetTable(part_kv_path, part_kv_table));
+
+ fluss::UpsertWriter part_writer;
+ check("new_part_writer", part_kv_table.NewUpsertWriter(part_writer));
+
+ // Upsert rows across partitions
+ struct TestRow {
+ const char* region;
+ int32_t user_id;
+ const char* name;
+ int64_t score;
+ };
+ TestRow test_data[] = {
+ {"US", 1, "Gustave", 100}, {"US", 2, "Lune", 200}, {"EU", 1,
"Sciel", 150},
+ {"EU", 2, "Maelle", 250}, {"APAC", 1, "Noco", 300},
+ };
+
+ for (const auto& td : test_data) {
+ auto row = part_kv_table.NewRow();
+ row.Set("region", td.region);
+ row.Set("user_id", td.user_id);
+ row.Set("name", td.name);
+ row.Set("score", td.score);
+ check("part_upsert", part_writer.Upsert(row));
+ }
+ check("part_flush", part_writer.Flush());
+ std::cout << "Upserted 5 rows across 3 partitions" << std::endl;
+
+ // Lookup all rows
+ fluss::Lookuper part_lookuper;
+ check("new_part_lookuper", part_kv_table.NewLookuper(part_lookuper));
+
+ for (const auto& td : test_data) {
+ auto pk = part_kv_table.NewRow();
+ pk.Set("region", td.region);
+ pk.Set("user_id", td.user_id);
+
+ bool found = false;
+ fluss::GenericRow result;
+ check("part_lookup", part_lookuper.Lookup(pk, found, result));
+ if (!found) {
+ std::cerr << "ERROR: Expected to find region=" << td.region << "
user_id=" << td.user_id
+ << std::endl;
+ std::exit(1);
+ }
+ if (result.GetString(2) != td.name || result.GetInt64(3) != td.score) {
+ std::cerr << "ERROR: Data mismatch for region=" << td.region
+ << " user_id=" << td.user_id << std::endl;
+ std::exit(1);
+ }
+ }
+ std::cout << "All 5 rows verified across partitions" << std::endl;
+
+ // Update within a partition
+ {
+ auto row = part_kv_table.NewRow();
+ row.Set("region", "US");
+ row.Set("user_id", 1);
+ row.Set("name", "Gustave Updated");
+ row.Set("score", static_cast<int64_t>(999));
+ fluss::WriteResult wr;
+ check("part_update", part_writer.Upsert(row, wr));
+ check("part_update_wait", wr.Wait());
+ }
+ {
+ auto pk = part_kv_table.NewRow();
+ pk.Set("region", "US");
+ pk.Set("user_id", 1);
+ bool found = false;
+ fluss::GenericRow result;
+ check("part_lookup_updated", part_lookuper.Lookup(pk, found, result));
+ if (!found || result.GetString(2) != "Gustave Updated" ||
result.GetInt64(3) != 999) {
+ std::cerr << "ERROR: Partition update verification failed" <<
std::endl;
+ std::exit(1);
+ }
+ std::cout << "Update verified: US/1 name=" << result.GetString(2)
+ << " score=" << result.GetInt64(3) << std::endl;
+ }
+
+ // Lookup in non-existent partition
+ {
+ auto pk = part_kv_table.NewRow();
+ pk.Set("region", "UNKNOWN");
+ pk.Set("user_id", 1);
+ bool found = false;
+ fluss::GenericRow result;
+ check("part_lookup_unknown", part_lookuper.Lookup(pk, found, result));
+ if (found) {
+ std::cerr << "ERROR: Expected UNKNOWN partition lookup to return
not found"
+ << std::endl;
+ std::exit(1);
+ }
+ std::cout << "UNKNOWN partition lookup: not found (expected)" <<
std::endl;
+ }
+
+ // Delete within a partition
+ {
+ auto pk = part_kv_table.NewRow();
+ pk.Set("region", "EU");
+ pk.Set("user_id", 1);
+ fluss::WriteResult wr;
+ check("part_delete", part_writer.Delete(pk, wr));
+ check("part_delete_wait", wr.Wait());
+ }
+ {
+ auto pk = part_kv_table.NewRow();
+ pk.Set("region", "EU");
+ pk.Set("user_id", 1);
+ bool found = false;
+ fluss::GenericRow result;
+ check("part_lookup_deleted", part_lookuper.Lookup(pk, found, result));
+ if (found) {
+ std::cerr << "ERROR: Expected EU/1 to be deleted" << std::endl;
+ std::exit(1);
+ }
+ std::cout << "Delete verified: EU/1 not found" << std::endl;
+ }
+
+ // Verify other record in same partition still exists
+ {
+ auto pk = part_kv_table.NewRow();
+ pk.Set("region", "EU");
+ pk.Set("user_id", 2);
+ bool found = false;
+ fluss::GenericRow result;
+ check("part_lookup_eu2", part_lookuper.Lookup(pk, found, result));
+ if (!found || result.GetString(2) != "Maelle") {
+ std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" <<
std::endl;
+ std::exit(1);
+ }
+ std::cout << "EU/2 still exists: name=" << result.GetString(2) <<
std::endl;
+ }
+
+ check("drop_part_kv", admin.DropTable(part_kv_path, true));
+ std::cout << "\nKV table example completed successfully!" << std::endl;
+
+ return 0;
+}
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 50dffae..c635c81 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -41,6 +41,8 @@ struct Table;
struct AppendWriter;
struct WriteResult;
struct LogScanner;
+struct UpsertWriter;
+struct Lookuper;
} // namespace ffi
struct Date {
@@ -490,6 +492,7 @@ struct GenericRow {
size_t FieldCount() const { return fields.size(); }
+ // ── Index-based getters ──────────────────────────────────────────
DatumType GetType(size_t idx) const { return GetField(idx).GetType(); }
bool IsNull(size_t idx) const { return GetField(idx).IsNull(); }
bool GetBool(size_t idx) const { return GetTypedField(idx,
DatumType::Bool).GetBool(); }
@@ -528,6 +531,7 @@ struct GenericRow {
return d.DecimalToString();
}
+ // ── Index-based setters ──────────────────────────────────────────
void SetNull(size_t idx) {
EnsureSize(idx);
fields[idx] = Datum::Null();
@@ -593,8 +597,77 @@ struct GenericRow {
fields[idx] = Datum::DecimalString(value);
}
+ // ── Name-based setters (require schema — see Table::NewRow()) ───
+ void Set(const std::string& name, std::nullptr_t) {
SetNull(Resolve(name)); }
+ void Set(const std::string& name, bool v) { SetBool(Resolve(name), v); }
+ void Set(const std::string& name, int32_t v) { SetInt32(Resolve(name), v);
}
+ void Set(const std::string& name, int64_t v) { SetInt64(Resolve(name), v);
}
+ void Set(const std::string& name, float v) { SetFloat32(Resolve(name), v);
}
+ void Set(const std::string& name, double v) { SetFloat64(Resolve(name),
v); }
+ // const char* overload to prevent "string literal" → bool conversion
+ void Set(const std::string& name, const char* v) {
+ auto [idx, type] = ResolveColumn(name);
+ if (type == TypeId::Decimal) {
+ SetDecimal(idx, v);
+ } else if (type == TypeId::String) {
+ SetString(idx, v);
+ } else {
+ throw std::runtime_error("GenericRow::Set: column '" + name +
+ "' is not a string or decimal column");
+ }
+ }
+ void Set(const std::string& name, std::string v) {
+ auto [idx, type] = ResolveColumn(name);
+ if (type == TypeId::Decimal) {
+ SetDecimal(idx, v);
+ } else if (type == TypeId::String) {
+ SetString(idx, std::move(v));
+ } else {
+ throw std::runtime_error("GenericRow::Set: column '" + name +
+ "' is not a string or decimal column");
+ }
+ }
+ void Set(const std::string& name, std::vector<uint8_t> v) {
+ SetBytes(Resolve(name), std::move(v));
+ }
+ void Set(const std::string& name, fluss::Date d) { SetDate(Resolve(name),
d); }
+ void Set(const std::string& name, fluss::Time t) { SetTime(Resolve(name),
t); }
+ void Set(const std::string& name, fluss::Timestamp ts) {
+ auto [idx, type] = ResolveColumn(name);
+ if (type == TypeId::TimestampLtz) {
+ SetTimestampLtz(idx, ts);
+ } else if (type == TypeId::Timestamp) {
+ SetTimestampNtz(idx, ts);
+ } else {
+ throw std::runtime_error("GenericRow::Set: column '" + name +
+ "' is not a timestamp column");
+ }
+ }
+
private:
+ friend class Table;
+ struct ColumnInfo {
+ size_t index;
+ TypeId type_id;
+ };
+ using ColumnMap = std::unordered_map<std::string, ColumnInfo>;
std::vector<Datum> fields;
+ std::shared_ptr<ColumnMap> column_map_;
+
+ size_t Resolve(const std::string& name) const { return
ResolveColumn(name).index; }
+
+ const ColumnInfo& ResolveColumn(const std::string& name) const {
+ if (!column_map_) {
+ throw std::runtime_error(
+ "GenericRow: name-based Set() requires a schema. "
+ "Use Table::NewRow() to create a schema-aware row.");
+ }
+ auto it = column_map_->find(name);
+ if (it == column_map_->end()) {
+ throw std::runtime_error("GenericRow: unknown column '" + name +
"'");
+ }
+ return it->second;
+ }
const Datum& GetField(size_t idx) const {
if (idx >= fields.size()) {
@@ -725,6 +798,8 @@ struct DatabaseInfo {
};
class AppendWriter;
+class UpsertWriter;
+class Lookuper;
class WriteResult;
class LogScanner;
class Admin;
@@ -792,8 +867,7 @@ class Admin {
const std::unordered_map<std::string, std::string>&
partition_spec,
bool ignore_if_not_exists = false);
- Result CreateDatabase(const std::string& database_name,
- const DatabaseDescriptor& descriptor,
+ Result CreateDatabase(const std::string& database_name, const
DatabaseDescriptor& descriptor,
bool ignore_if_exists = false);
Result DropDatabase(const std::string& database_name, bool
ignore_if_not_exists = false,
@@ -833,7 +907,13 @@ class Table {
bool Available() const;
+ GenericRow NewRow() const;
+
Result NewAppendWriter(AppendWriter& out);
+ Result NewUpsertWriter(UpsertWriter& out);
+ Result NewUpsertWriter(UpsertWriter& out, const std::vector<std::string>&
column_names);
+ Result NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>&
column_indices);
+ Result NewLookuper(Lookuper& out);
TableScan NewScan();
TableInfo GetTableInfo() const;
@@ -846,7 +926,10 @@ class Table {
Table(ffi::Table* table) noexcept;
void Destroy() noexcept;
+ const std::shared_ptr<GenericRow::ColumnMap>& GetColumnMap() const;
+
ffi::Table* table_{nullptr};
+ mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
};
class TableScan {
@@ -887,6 +970,7 @@ class WriteResult {
private:
friend class AppendWriter;
+ friend class UpsertWriter;
WriteResult(ffi::WriteResult* inner) noexcept;
void Destroy() noexcept;
@@ -917,6 +1001,52 @@ class AppendWriter {
ffi::AppendWriter* writer_{nullptr};
};
+class UpsertWriter {
+ public:
+ UpsertWriter() noexcept;
+ ~UpsertWriter() noexcept;
+
+ UpsertWriter(const UpsertWriter&) = delete;
+ UpsertWriter& operator=(const UpsertWriter&) = delete;
+ UpsertWriter(UpsertWriter&& other) noexcept;
+ UpsertWriter& operator=(UpsertWriter&& other) noexcept;
+
+ bool Available() const;
+
+ Result Upsert(const GenericRow& row);
+ Result Upsert(const GenericRow& row, WriteResult& out);
+ Result Delete(const GenericRow& row);
+ Result Delete(const GenericRow& row, WriteResult& out);
+ Result Flush();
+
+ private:
+ friend class Table;
+ UpsertWriter(ffi::UpsertWriter* writer) noexcept;
+ void Destroy() noexcept;
+ ffi::UpsertWriter* writer_{nullptr};
+};
+
+class Lookuper {
+ public:
+ Lookuper() noexcept;
+ ~Lookuper() noexcept;
+
+ Lookuper(const Lookuper&) = delete;
+ Lookuper& operator=(const Lookuper&) = delete;
+ Lookuper(Lookuper&& other) noexcept;
+ Lookuper& operator=(Lookuper&& other) noexcept;
+
+ bool Available() const;
+
+ Result Lookup(const GenericRow& pk_row, bool& found, GenericRow& out);
+
+ private:
+ friend class Table;
+ Lookuper(ffi::Lookuper* lookuper) noexcept;
+ void Destroy() noexcept;
+ ffi::Lookuper* lookuper_{nullptr};
+};
+
class LogScanner {
public:
LogScanner() noexcept;
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 8fc8415..40676e5 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -290,8 +290,7 @@ inline LakeSnapshot from_ffi_lake_snapshot(const
ffi::FfiLakeSnapshot& ffi_snaps
return snapshot;
}
-inline ffi::FfiDatabaseDescriptor to_ffi_database_descriptor(
- const DatabaseDescriptor& desc) {
+inline ffi::FfiDatabaseDescriptor to_ffi_database_descriptor(const
DatabaseDescriptor& desc) {
ffi::FfiDatabaseDescriptor ffi_desc;
ffi_desc.comment = rust::String(desc.comment);
for (const auto& [k, v] : desc.properties) {
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 5f3e7e9..ee7f1d8 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -182,6 +182,12 @@ mod ffi {
bucket_offsets: Vec<FfiBucketOffsetPair>,
}
+ struct FfiLookupResult {
+ result: FfiResult,
+ found: bool,
+ row: FfiGenericRow,
+ }
+
struct FfiLakeSnapshotResult {
result: FfiResult,
lake_snapshot: FfiLakeSnapshot,
@@ -242,6 +248,8 @@ mod ffi {
type AppendWriter;
type WriteResult;
type LogScanner;
+ type UpsertWriter;
+ type Lookuper;
// Connection
fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>;
@@ -330,6 +338,16 @@ mod ffi {
fn get_table_info_from_table(self: &Table) -> FfiTableInfo;
fn get_table_path(self: &Table) -> FfiTablePath;
fn has_primary_key(self: &Table) -> bool;
+ fn new_upsert_writer(self: &Table) -> Result<*mut UpsertWriter>;
+ fn new_upsert_writer_with_column_names(
+ self: &Table,
+ column_names: Vec<String>,
+ ) -> Result<*mut UpsertWriter>;
+ fn new_upsert_writer_with_column_indices(
+ self: &Table,
+ column_indices: Vec<usize>,
+ ) -> Result<*mut UpsertWriter>;
+ fn new_lookuper(self: &Table) -> Result<*mut Lookuper>;
// AppendWriter
unsafe fn delete_append_writer(writer: *mut AppendWriter);
@@ -339,6 +357,16 @@ mod ffi {
// WriteResult — dropped automatically via rust::Box, or call wait()
for ack
fn wait(self: &mut WriteResult) -> FfiResult;
+ // UpsertWriter
+ unsafe fn delete_upsert_writer(writer: *mut UpsertWriter);
+ fn upsert(self: &mut UpsertWriter, row: &FfiGenericRow) ->
Result<Box<WriteResult>>;
+ fn delete_row(self: &mut UpsertWriter, row: &FfiGenericRow) ->
Result<Box<WriteResult>>;
+ fn upsert_flush(self: &mut UpsertWriter) -> FfiResult;
+
+ // Lookuper
+ unsafe fn delete_lookuper(lookuper: *mut Lookuper);
+ fn lookup(self: &mut Lookuper, pk_row: &FfiGenericRow) ->
FfiLookupResult;
+
// LogScanner
unsafe fn delete_log_scanner(scanner: *mut LogScanner);
fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) ->
FfiResult;
@@ -399,6 +427,16 @@ pub struct LogScanner {
projected_columns: Vec<fcore::metadata::Column>,
}
+pub struct UpsertWriter {
+ inner: fcore::client::UpsertWriter,
+ table_info: fcore::metadata::TableInfo,
+}
+
+pub struct Lookuper {
+ inner: fcore::client::Lookuper,
+ table_info: fcore::metadata::TableInfo,
+}
+
fn ok_result() -> ffi::FfiResult {
ffi::FfiResult {
error_code: 0,
@@ -1045,6 +1083,113 @@ impl Table {
fn has_primary_key(&self) -> bool {
self.has_pk
}
+
+ fn new_upsert_writer(&self) -> Result<*mut UpsertWriter, String> {
+ let _enter = RUNTIME.enter();
+
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let table_upsert = fluss_table
+ .new_upsert()
+ .map_err(|e| format!("Failed to create upsert: {e}"))?;
+
+ let writer = table_upsert
+ .create_writer()
+ .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+
+ Ok(Box::into_raw(Box::new(UpsertWriter {
+ inner: writer,
+ table_info: self.table_info.clone(),
+ })))
+ }
+
+ fn new_upsert_writer_with_column_names(
+ &self,
+ column_names: Vec<String>,
+ ) -> Result<*mut UpsertWriter, String> {
+ let _enter = RUNTIME.enter();
+
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let table_upsert = fluss_table
+ .new_upsert()
+ .map_err(|e| format!("Failed to create upsert: {e}"))?;
+
+ let col_refs: Vec<&str> = column_names.iter().map(|s|
s.as_str()).collect();
+ let table_upsert = table_upsert
+ .partial_update_with_column_names(&col_refs)
+ .map_err(|e| format!("Failed to set partial update columns:
{e}"))?;
+
+ let writer = table_upsert
+ .create_writer()
+ .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+
+ Ok(Box::into_raw(Box::new(UpsertWriter {
+ inner: writer,
+ table_info: self.table_info.clone(),
+ })))
+ }
+
+ fn new_upsert_writer_with_column_indices(
+ &self,
+ column_indices: Vec<usize>,
+ ) -> Result<*mut UpsertWriter, String> {
+ let _enter = RUNTIME.enter();
+
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let table_upsert = fluss_table
+ .new_upsert()
+ .map_err(|e| format!("Failed to create upsert: {e}"))?;
+
+ let table_upsert = table_upsert
+ .partial_update(Some(column_indices))
+ .map_err(|e| format!("Failed to set partial update columns:
{e}"))?;
+
+ let writer = table_upsert
+ .create_writer()
+ .map_err(|e| format!("Failed to create upsert writer: {e}"))?;
+
+ Ok(Box::into_raw(Box::new(UpsertWriter {
+ inner: writer,
+ table_info: self.table_info.clone(),
+ })))
+ }
+
+ fn new_lookuper(&self) -> Result<*mut Lookuper, String> {
+ let _enter = RUNTIME.enter();
+
+ let fluss_table = fcore::client::FlussTable::new(
+ &self.connection,
+ self.metadata.clone(),
+ self.table_info.clone(),
+ );
+
+ let table_lookup = fluss_table
+ .new_lookup()
+ .map_err(|e| format!("Failed to create lookup: {e}"))?;
+
+ let lookuper = table_lookup
+ .create_lookuper()
+ .map_err(|e| format!("Failed to create lookuper: {e}"))?;
+
+ Ok(Box::into_raw(Box::new(Lookuper {
+ inner: lookuper,
+ table_info: self.table_info.clone(),
+ })))
+ }
}
// AppendWriter implementation
@@ -1095,6 +1240,137 @@ impl WriteResult {
}
}
+// UpsertWriter implementation
+unsafe fn delete_upsert_writer(writer: *mut UpsertWriter) {
+ if !writer.is_null() {
+ unsafe {
+ drop(Box::from_raw(writer));
+ }
+ }
+}
+
+impl UpsertWriter {
+ /// Pad row with Null to full schema width.
+ /// This allows callers to only set the fields they care about.
+ fn pad_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) ->
fcore::row::GenericRow<'a> {
+ let num_columns = self.table_info.get_schema().columns().len();
+ if row.values.len() < num_columns {
+ row.values.resize(num_columns, fcore::row::Datum::Null);
+ }
+ row
+ }
+
+ fn upsert(&mut self, row: &ffi::FfiGenericRow) -> Result<Box<WriteResult>,
String> {
+ let schema = self.table_info.get_schema();
+ let generic_row = types::ffi_row_to_core(row,
Some(schema)).map_err(|e| e.to_string())?;
+ let generic_row = self.pad_row(generic_row);
+
+ let result_future = self
+ .inner
+ .upsert(&generic_row)
+ .map_err(|e| format!("Failed to upsert: {e}"))?;
+
+ Ok(Box::new(WriteResult {
+ inner: Some(result_future),
+ }))
+ }
+
+ fn delete_row(&mut self, row: &ffi::FfiGenericRow) ->
Result<Box<WriteResult>, String> {
+ let schema = self.table_info.get_schema();
+ let generic_row = types::ffi_row_to_core(row,
Some(schema)).map_err(|e| e.to_string())?;
+ let generic_row = self.pad_row(generic_row);
+
+ let result_future = self
+ .inner
+ .delete(&generic_row)
+ .map_err(|e| format!("Failed to delete: {e}"))?;
+
+ Ok(Box::new(WriteResult {
+ inner: Some(result_future),
+ }))
+ }
+
+ fn upsert_flush(&mut self) -> ffi::FfiResult {
+ let result = RUNTIME.block_on(async { self.inner.flush().await });
+
+ match result {
+ Ok(_) => ok_result(),
+ Err(e) => err_result(1, e.to_string()),
+ }
+ }
+}
+
+// Lookuper implementation
+unsafe fn delete_lookuper(lookuper: *mut Lookuper) {
+ if !lookuper.is_null() {
+ unsafe {
+ drop(Box::from_raw(lookuper));
+ }
+ }
+}
+
+impl Lookuper {
+ /// Pad row with Null to full schema width (same as UpsertWriter::pad_row).
+ /// Ensures the PK row is always full-width, matching Python's behavior.
+ fn pad_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) ->
fcore::row::GenericRow<'a> {
+ let num_columns = self.table_info.get_schema().columns().len();
+ if row.values.len() < num_columns {
+ row.values.resize(num_columns, fcore::row::Datum::Null);
+ }
+ row
+ }
+
+ fn lookup(&mut self, pk_row: &ffi::FfiGenericRow) -> ffi::FfiLookupResult {
+ let schema = self.table_info.get_schema();
+ let generic_row = match types::ffi_row_to_core(pk_row, Some(schema)) {
+ Ok(r) => self.pad_row(r),
+ Err(e) => {
+ return ffi::FfiLookupResult {
+ result: err_result(1, e.to_string()),
+ found: false,
+ row: ffi::FfiGenericRow { fields: vec![] },
+ };
+ }
+ };
+
+ let lookup_result = match
RUNTIME.block_on(self.inner.lookup(&generic_row)) {
+ Ok(r) => r,
+ Err(e) => {
+ return ffi::FfiLookupResult {
+ result: err_result(1, e.to_string()),
+ found: false,
+ row: ffi::FfiGenericRow { fields: vec![] },
+ };
+ }
+ };
+
+ match lookup_result.get_single_row() {
+ Ok(Some(row)) => match types::internal_row_to_ffi_row(&row,
&self.table_info) {
+ Ok(ffi_row) => ffi::FfiLookupResult {
+ result: ok_result(),
+ found: true,
+ row: ffi_row,
+ },
+ Err(e) => ffi::FfiLookupResult {
+ result: err_result(1, e.to_string()),
+ found: false,
+ row: ffi::FfiGenericRow { fields: vec![] },
+ },
+ },
+ Ok(None) => ffi::FfiLookupResult {
+ result: ok_result(),
+ found: false,
+ row: ffi::FfiGenericRow { fields: vec![] },
+ },
+ Err(e) => ffi::FfiLookupResult {
+ result: err_result(1, e.to_string()),
+ found: false,
+ row: ffi::FfiGenericRow { fields: vec![] },
+ },
+ }
+ }
+}
+
// LogScanner implementation
unsafe fn delete_log_scanner(scanner: *mut LogScanner) {
if !scanner.is_null() {
@@ -1258,13 +1534,18 @@ impl LogScanner {
let result = RUNTIME.block_on(async { inner.poll(timeout).await });
match result {
- Ok(records) => ffi::FfiScanRecordsResult {
- result: ok_result(),
- scan_records: types::core_scan_records_to_ffi(
- &records,
- &self.projected_columns,
- ),
- },
+ Ok(records) => {
+ match types::core_scan_records_to_ffi(&records,
&self.projected_columns) {
+ Ok(scan_records) => ffi::FfiScanRecordsResult {
+ result: ok_result(),
+ scan_records,
+ },
+ Err(e) => ffi::FfiScanRecordsResult {
+ result: err_result(1, e.to_string()),
+ scan_records: ffi::FfiScanRecords { records:
vec![] },
+ },
+ }
+ }
Err(e) => ffi::FfiScanRecordsResult {
result: err_result(1, e.to_string()),
scan_records: ffi::FfiScanRecords { records: vec![] },
diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp
index 4425b5f..5b2f66c 100644
--- a/bindings/cpp/src/table.cpp
+++ b/bindings/cpp/src/table.cpp
@@ -92,12 +92,16 @@ void Table::Destroy() noexcept {
}
}
-Table::Table(Table&& other) noexcept : table_(other.table_) { other.table_ =
nullptr; }
+Table::Table(Table&& other) noexcept
+ : table_(other.table_), column_map_(std::move(other.column_map_)) {
+ other.table_ = nullptr;
+}
Table& Table::operator=(Table&& other) noexcept {
if (this != &other) {
Destroy();
table_ = other.table_;
+ column_map_ = std::move(other.column_map_);
other.table_ = nullptr;
}
return *this;
@@ -111,7 +115,7 @@ Result Table::NewAppendWriter(AppendWriter& out) {
}
try {
- out.writer_ = table_->new_append_writer();
+ out = AppendWriter(table_->new_append_writer());
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
@@ -177,6 +181,24 @@ Result TableScan::CreateRecordBatchScanner(LogScanner&
out) {
}
}
+const std::shared_ptr<GenericRow::ColumnMap>& Table::GetColumnMap() const {
+ if (!column_map_ && Available()) {
+ auto info = GetTableInfo();
+ column_map_ = std::make_shared<GenericRow::ColumnMap>();
+ for (size_t i = 0; i < info.schema.columns.size(); ++i) {
+ (*column_map_)[info.schema.columns[i].name] = {i,
+
info.schema.columns[i].data_type.id()};
+ }
+ }
+ return column_map_;
+}
+
+GenericRow Table::NewRow() const {
+ GenericRow row;
+ row.column_map_ = GetColumnMap();
+ return row;
+}
+
TableInfo Table::GetTableInfo() const {
if (!Available()) {
return TableInfo{};
@@ -281,7 +303,7 @@ Result AppendWriter::Append(const GenericRow& row,
WriteResult& out) {
try {
auto ffi_row = utils::to_ffi_generic_row(row);
auto rust_box = writer_->append(ffi_row);
- out.inner_ = rust_box.into_raw();
+ out = WriteResult(rust_box.into_raw());
return utils::make_ok();
} catch (const rust::Error& e) {
return utils::make_error(1, e.what());
@@ -299,6 +321,213 @@ Result AppendWriter::Flush() {
return utils::from_ffi_result(ffi_result);
}
+// UpsertWriter implementation
+UpsertWriter::UpsertWriter() noexcept = default;
+
+UpsertWriter::UpsertWriter(ffi::UpsertWriter* writer) noexcept :
writer_(writer) {}
+
+UpsertWriter::~UpsertWriter() noexcept { Destroy(); }
+
+void UpsertWriter::Destroy() noexcept {
+ if (writer_) {
+ ffi::delete_upsert_writer(writer_);
+ writer_ = nullptr;
+ }
+}
+
+UpsertWriter::UpsertWriter(UpsertWriter&& other) noexcept :
writer_(other.writer_) {
+ other.writer_ = nullptr;
+}
+
+UpsertWriter& UpsertWriter::operator=(UpsertWriter&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ writer_ = other.writer_;
+ other.writer_ = nullptr;
+ }
+ return *this;
+}
+
+bool UpsertWriter::Available() const { return writer_ != nullptr; }
+
+Result UpsertWriter::Upsert(const GenericRow& row) {
+ WriteResult wr;
+ return Upsert(row, wr);
+}
+
+Result UpsertWriter::Upsert(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->upsert(ffi_row);
+ out = WriteResult(rust_box.into_raw());
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Delete(const GenericRow& row) {
+ WriteResult wr;
+ return Delete(row, wr);
+}
+
+Result UpsertWriter::Delete(const GenericRow& row, WriteResult& out) {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(row);
+ auto rust_box = writer_->delete_row(ffi_row);
+ out = WriteResult(rust_box.into_raw());
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result UpsertWriter::Flush() {
+ if (!Available()) {
+ return utils::make_error(1, "UpsertWriter not available");
+ }
+
+ auto ffi_result = writer_->upsert_flush();
+ return utils::from_ffi_result(ffi_result);
+}
+
+// Lookuper implementation
+Lookuper::Lookuper() noexcept = default;
+
+Lookuper::Lookuper(ffi::Lookuper* lookuper) noexcept : lookuper_(lookuper) {}
+
+Lookuper::~Lookuper() noexcept { Destroy(); }
+
+void Lookuper::Destroy() noexcept {
+ if (lookuper_) {
+ ffi::delete_lookuper(lookuper_);
+ lookuper_ = nullptr;
+ }
+}
+
+Lookuper::Lookuper(Lookuper&& other) noexcept : lookuper_(other.lookuper_) {
+ other.lookuper_ = nullptr;
+}
+
+Lookuper& Lookuper::operator=(Lookuper&& other) noexcept {
+ if (this != &other) {
+ Destroy();
+ lookuper_ = other.lookuper_;
+ other.lookuper_ = nullptr;
+ }
+ return *this;
+}
+
+bool Lookuper::Available() const { return lookuper_ != nullptr; }
+
+Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow&
out) {
+ if (!Available()) {
+ return utils::make_error(1, "Lookuper not available");
+ }
+
+ try {
+ auto ffi_row = utils::to_ffi_generic_row(pk_row);
+ auto ffi_result = lookuper_->lookup(ffi_row);
+ auto result = utils::from_ffi_result(ffi_result.result);
+ if (!result.Ok()) {
+ found = false;
+ return result;
+ }
+ found = ffi_result.found;
+ if (found) {
+ out = utils::from_ffi_generic_row(ffi_result.row);
+ }
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ found = false;
+ return utils::make_error(1, e.what());
+ }
+}
+
+// Table KV methods
+Result Table::NewUpsertWriter(UpsertWriter& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out = UpsertWriter(table_->new_upsert_writer());
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const
std::vector<std::string>& column_names) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<rust::String> rust_names;
+ for (const auto& name : column_names) {
+ rust_names.push_back(rust::String(name));
+ }
+ out =
UpsertWriter(table_->new_upsert_writer_with_column_names(std::move(rust_names)));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>&
column_indices) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ rust::Vec<size_t> rust_indices;
+ for (size_t idx : column_indices) {
+ rust_indices.push_back(idx);
+ }
+ out =
UpsertWriter(table_->new_upsert_writer_with_column_indices(std::move(rust_indices)));
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
+Result Table::NewLookuper(Lookuper& out) {
+ if (!Available()) {
+ return utils::make_error(1, "Table not available");
+ }
+
+ try {
+ out = Lookuper(table_->new_lookuper());
+ return utils::make_ok();
+ } catch (const rust::Error& e) {
+ return utils::make_error(1, e.what());
+ } catch (const std::exception& e) {
+ return utils::make_error(1, e.what());
+ }
+}
+
// LogScanner implementation
LogScanner::LogScanner() noexcept = default;
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 65b9b04..17aa872 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -316,7 +316,22 @@ pub fn ffi_row_to_core<'a>(
let datum = match field.datum_type {
DATUM_TYPE_NULL => Datum::Null,
DATUM_TYPE_BOOL => Datum::Bool(field.bool_val),
- DATUM_TYPE_INT32 => Datum::Int32(field.i32_val),
+ DATUM_TYPE_INT32 => match schema
+ .and_then(|s| s.columns().get(idx))
+ .map(|c| c.data_type())
+ {
+ Some(fcore::metadata::DataType::TinyInt(_)) => {
+ Datum::Int8(i8::try_from(field.i32_val).map_err(|_| {
+ anyhow!("Column {idx}: {} overflows TinyInt",
field.i32_val)
+ })?)
+ }
+ Some(fcore::metadata::DataType::SmallInt(_)) => {
+ Datum::Int16(i16::try_from(field.i32_val).map_err(|_| {
+ anyhow!("Column {idx}: {} overflows SmallInt",
field.i32_val)
+ })?)
+ }
+ _ => Datum::Int32(field.i32_val),
+ },
DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
@@ -360,11 +375,11 @@ pub fn ffi_row_to_core<'a>(
DATUM_TYPE_TIME =>
Datum::Time(fcore::row::Time::new(field.i32_val)),
DATUM_TYPE_TIMESTAMP_NTZ => Datum::TimestampNtz(
fcore::row::TimestampNtz::from_millis_nanos(field.i64_val,
field.i32_val)
- .unwrap_or_else(|_|
fcore::row::TimestampNtz::new(field.i64_val)),
+ .map_err(|e| anyhow!("Column {idx}: {e}"))?,
),
DATUM_TYPE_TIMESTAMP_LTZ => Datum::TimestampLtz(
fcore::row::TimestampLtz::from_millis_nanos(field.i64_val,
field.i32_val)
- .unwrap_or_else(|_|
fcore::row::TimestampLtz::new(field.i64_val)),
+ .map_err(|e| anyhow!("Column {idx}: {e}"))?,
),
other => return Err(anyhow!("Column {idx}: unknown datum type
{other}")),
};
@@ -377,7 +392,7 @@ pub fn ffi_row_to_core<'a>(
pub fn core_scan_records_to_ffi(
records: &fcore::record::ScanRecords,
columns: &[fcore::metadata::Column],
-) -> ffi::FfiScanRecords {
+) -> Result<ffi::FfiScanRecords> {
let mut ffi_records = Vec::new();
// Iterate over all buckets and their records
@@ -385,7 +400,7 @@ pub fn core_scan_records_to_ffi(
let bucket_id = table_bucket.bucket_id();
for record in bucket_records {
let row = record.row();
- let fields = core_row_to_ffi_fields(row, columns);
+ let fields = core_row_to_ffi_fields(row, columns)?;
ffi_records.push(ffi::FfiScanRecord {
bucket_id,
@@ -396,32 +411,15 @@ pub fn core_scan_records_to_ffi(
}
}
- ffi::FfiScanRecords {
+ Ok(ffi::FfiScanRecords {
records: ffi_records,
- }
+ })
}
fn core_row_to_ffi_fields(
row: &fcore::row::ColumnarRow,
columns: &[fcore::metadata::Column],
-) -> Vec<ffi::FfiDatum> {
- fn new_datum(datum_type: i32) -> ffi::FfiDatum {
- ffi::FfiDatum {
- datum_type,
- bool_val: false,
- i32_val: 0,
- i64_val: 0,
- f32_val: 0.0,
- f64_val: 0.0,
- string_val: String::new(),
- bytes_val: vec![],
- decimal_precision: 0,
- decimal_scale: 0,
- i128_hi: 0,
- i128_lo: 0,
- }
- }
-
+) -> Result<Vec<ffi::FfiDatum>> {
let record_batch = row.get_record_batch();
let schema = record_batch.schema();
let row_id = row.get_row_id();
@@ -430,124 +428,135 @@ fn core_row_to_ffi_fields(
for (i, field) in schema.fields().iter().enumerate() {
if row.is_null_at(i) {
- fields.push(new_datum(DATUM_TYPE_NULL));
+ fields.push(ffi::FfiDatum::default());
continue;
}
let datum = match field.data_type() {
- ArrowDataType::Boolean => {
- let mut datum = new_datum(DATUM_TYPE_BOOL);
- datum.bool_val = row.get_boolean(i);
- datum
- }
- ArrowDataType::Int8 => {
- let mut datum = new_datum(DATUM_TYPE_INT32);
- datum.i32_val = row.get_byte(i) as i32;
- datum
- }
- ArrowDataType::Int16 => {
- let mut datum = new_datum(DATUM_TYPE_INT32);
- datum.i32_val = row.get_short(i) as i32;
- datum
- }
- ArrowDataType::Int32 => {
- let mut datum = new_datum(DATUM_TYPE_INT32);
- datum.i32_val = row.get_int(i);
- datum
- }
- ArrowDataType::Int64 => {
- let mut datum = new_datum(DATUM_TYPE_INT64);
- datum.i64_val = row.get_long(i);
- datum
- }
- ArrowDataType::Float32 => {
- let mut datum = new_datum(DATUM_TYPE_FLOAT32);
- datum.f32_val = row.get_float(i);
- datum
- }
- ArrowDataType::Float64 => {
- let mut datum = new_datum(DATUM_TYPE_FLOAT64);
- datum.f64_val = row.get_double(i);
- datum
- }
- ArrowDataType::Utf8 => {
- let mut datum = new_datum(DATUM_TYPE_STRING);
- // todo: avoid copy string
- datum.string_val = row.get_string(i).to_string();
- datum
- }
+ ArrowDataType::Boolean => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_BOOL,
+ bool_val: row.get_boolean(i),
+ ..Default::default()
+ },
+ ArrowDataType::Int8 => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT32,
+ i32_val: row.get_byte(i) as i32,
+ ..Default::default()
+ },
+ ArrowDataType::Int16 => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT32,
+ i32_val: row.get_short(i) as i32,
+ ..Default::default()
+ },
+ ArrowDataType::Int32 => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT32,
+ i32_val: row.get_int(i),
+ ..Default::default()
+ },
+ ArrowDataType::Int64 => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT64,
+ i64_val: row.get_long(i),
+ ..Default::default()
+ },
+ ArrowDataType::Float32 => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_FLOAT32,
+ f32_val: row.get_float(i),
+ ..Default::default()
+ },
+ ArrowDataType::Float64 => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_FLOAT64,
+ f64_val: row.get_double(i),
+ ..Default::default()
+ },
+ ArrowDataType::Utf8 => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_STRING,
+ string_val: row.get_string(i).to_string(),
+ ..Default::default()
+ },
ArrowDataType::LargeUtf8 => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<LargeStringArray>()
- .expect("LargeUtf8 column expected");
- let mut datum = new_datum(DATUM_TYPE_STRING);
- datum.string_val = array.value(row_id).to_string();
- datum
- }
- ArrowDataType::Binary => {
- let mut datum = new_datum(DATUM_TYPE_BYTES);
- // todo: avoid copy bytes for blob
- datum.bytes_val = row.get_bytes(i).to_vec();
- datum
- }
- ArrowDataType::FixedSizeBinary(len) => {
- let mut datum = new_datum(DATUM_TYPE_BYTES);
- datum.bytes_val = row.get_binary(i, *len as usize).to_vec();
- datum
+ .ok_or_else(|| anyhow!("Column {i}: expected LargeUtf8
array"))?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_STRING,
+ string_val: array.value(row_id).to_string(),
+ ..Default::default()
+ }
}
+ ArrowDataType::Binary => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_BYTES,
+ bytes_val: row.get_bytes(i).to_vec(),
+ ..Default::default()
+ },
+ ArrowDataType::FixedSizeBinary(len) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_BYTES,
+ bytes_val: row.get_binary(i, *len as usize).to_vec(),
+ ..Default::default()
+ },
ArrowDataType::LargeBinary => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<LargeBinaryArray>()
- .expect("LargeBinary column expected");
- let mut datum = new_datum(DATUM_TYPE_BYTES);
- datum.bytes_val = array.value(row_id).to_vec();
- datum
+ .ok_or_else(|| anyhow!("Column {i}: expected LargeBinary
array"))?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_BYTES,
+ bytes_val: array.value(row_id).to_vec(),
+ ..Default::default()
+ }
}
ArrowDataType::Date32 => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Date32Array>()
- .expect("Date32 column expected");
- let mut datum = new_datum(DATUM_TYPE_DATE);
- datum.i32_val = array.value(row_id);
- datum
+ .ok_or_else(|| anyhow!("Column {i}: expected Date32
array"))?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_DATE,
+ i32_val: array.value(row_id),
+ ..Default::default()
+ }
}
ArrowDataType::Timestamp(unit, _tz) => {
let datum_type = match columns.get(i).map(|c| c.data_type()) {
Some(fcore::metadata::DataType::TimestampLTz(_)) =>
DATUM_TYPE_TIMESTAMP_LTZ,
_ => DATUM_TYPE_TIMESTAMP_NTZ,
};
- let mut datum = new_datum(datum_type);
+ let mut datum = ffi::FfiDatum {
+ datum_type,
+ ..Default::default()
+ };
match unit {
TimeUnit::Second => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<TimestampSecondArray>()
- .expect("Timestamp(second) column expected");
+ .ok_or_else(|| {
+ anyhow!("Column {i}: expected
Timestamp(second) array")
+ })?;
datum.i64_val = array.value(row_id) *
MILLIS_PER_SECOND;
- datum.i32_val = 0;
}
TimeUnit::Millisecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
- .expect("Timestamp(millisecond) column expected");
+ .ok_or_else(|| {
+ anyhow!("Column {i}: expected
Timestamp(millisecond) array")
+ })?;
datum.i64_val = array.value(row_id);
- datum.i32_val = 0;
}
TimeUnit::Microsecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
- .expect("Timestamp(microsecond) column expected");
+ .ok_or_else(|| {
+ anyhow!("Column {i}: expected
Timestamp(microsecond) array")
+ })?;
let micros = array.value(row_id);
datum.i64_val = micros.div_euclid(MICROS_PER_MILLI);
datum.i32_val =
@@ -558,7 +567,9 @@ fn core_row_to_ffi_fields(
.column(i)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
- .expect("Timestamp(nanosecond) column expected");
+ .ok_or_else(|| {
+ anyhow!("Column {i}: expected
Timestamp(nanosecond) array")
+ })?;
let nanos = array.value(row_id);
datum.i64_val = nanos.div_euclid(NANOS_PER_MILLI);
datum.i32_val = nanos.rem_euclid(NANOS_PER_MILLI) as
i32;
@@ -572,22 +583,26 @@ fn core_row_to_ffi_fields(
.column(i)
.as_any()
.downcast_ref::<Time32SecondArray>()
- .expect("Time32(second) column expected");
- let mut datum = new_datum(DATUM_TYPE_TIME);
- datum.i32_val = array.value(row_id) * MILLIS_PER_SECOND as
i32;
- datum
+ .ok_or_else(|| anyhow!("Column {i}: expected
Time32(second) array"))?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_TIME,
+ i32_val: array.value(row_id) * MILLIS_PER_SECOND as
i32,
+ ..Default::default()
+ }
}
TimeUnit::Millisecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Time32MillisecondArray>()
- .expect("Time32(millisecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_TIME);
- datum.i32_val = array.value(row_id);
- datum
+ .ok_or_else(|| anyhow!("Column {i}: expected
Time32(millisecond) array"))?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_TIME,
+ i32_val: array.value(row_id),
+ ..Default::default()
+ }
}
- _ => panic!("Will never come here. Unsupported Time32 unit for
column {i}"),
+ _ => return Err(anyhow!("Column {i}: unsupported Time32
unit")),
},
ArrowDataType::Time64(unit) => match unit {
TimeUnit::Microsecond => {
@@ -595,55 +610,210 @@ fn core_row_to_ffi_fields(
.column(i)
.as_any()
.downcast_ref::<Time64MicrosecondArray>()
- .expect("Time64(microsecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_TIME);
- datum.i32_val = (array.value(row_id) / MICROS_PER_MILLI)
as i32;
- datum
+ .ok_or_else(|| anyhow!("Column {i}: expected
Time64(microsecond) array"))?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_TIME,
+ i32_val: (array.value(row_id) / MICROS_PER_MILLI) as
i32,
+ ..Default::default()
+ }
}
TimeUnit::Nanosecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Time64NanosecondArray>()
- .expect("Time64(nanosecond) column expected");
- let mut datum = new_datum(DATUM_TYPE_TIME);
- datum.i32_val = (array.value(row_id) / NANOS_PER_MILLI) as
i32;
- datum
+ .ok_or_else(|| anyhow!("Column {i}: expected
Time64(nanosecond) array"))?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_TIME,
+ i32_val: (array.value(row_id) / NANOS_PER_MILLI) as
i32,
+ ..Default::default()
+ }
}
- _ => panic!("Will never come here. Unsupported Time64 unit for
column {i}"),
+ _ => return Err(anyhow!("Column {i}: unsupported Time64
unit")),
},
ArrowDataType::Decimal128(precision, scale) => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Decimal128Array>()
- .expect("Decimal128 column expected");
+ .ok_or_else(|| anyhow!("Column {i}: expected Decimal128
array"))?;
let i128_val = array.value(row_id);
if fcore::row::Decimal::is_compact_precision(*precision as
u32) {
- let mut datum = new_datum(DATUM_TYPE_DECIMAL_I64);
- datum.i64_val = i128_val as i64;
- datum.decimal_precision = *precision as i32;
- datum.decimal_scale = *scale as i32;
- datum
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_DECIMAL_I64,
+ i64_val: i128_val as i64,
+ decimal_precision: *precision as i32,
+ decimal_scale: *scale as i32,
+ ..Default::default()
+ }
} else {
- let mut datum = new_datum(DATUM_TYPE_DECIMAL_I128);
- datum.i128_hi = (i128_val >> 64) as i64;
- datum.i128_lo = i128_val as i64;
- datum.decimal_precision = *precision as i32;
- datum.decimal_scale = *scale as i32;
- datum
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_DECIMAL_I128,
+ i128_hi: (i128_val >> 64) as i64,
+ i128_lo: i128_val as i64,
+ decimal_precision: *precision as i32,
+ decimal_scale: *scale as i32,
+ ..Default::default()
+ }
}
}
- other => panic!(
- "Will never come here. Unsupported Arrow data type for column
{i}: {other:?}"
- ),
+ other => return Err(anyhow!("Column {i}: unsupported Arrow data
type {other:?}")),
+ };
+
+ fields.push(datum);
+ }
+
+ Ok(fields)
+}
+
+impl Default for ffi::FfiDatum {
+ fn default() -> Self {
+ Self {
+ datum_type: DATUM_TYPE_NULL,
+ bool_val: false,
+ i32_val: 0,
+ i64_val: 0,
+ f32_val: 0.0,
+ f64_val: 0.0,
+ string_val: String::new(),
+ bytes_val: vec![],
+ decimal_precision: 0,
+ decimal_scale: 0,
+ i128_hi: 0,
+ i128_lo: 0,
+ }
+ }
+}
+
+/// Convert any InternalRow to FfiGenericRow using Fluss schema metadata.
+/// Used for lookup results (CompactedRow) where Arrow schema is unavailable.
+pub fn internal_row_to_ffi_row(
+ row: &dyn fcore::row::InternalRow,
+ table_info: &fcore::metadata::TableInfo,
+) -> Result<ffi::FfiGenericRow> {
+ let schema = table_info.get_schema();
+ let columns = schema.columns();
+ let mut fields = Vec::with_capacity(columns.len());
+
+ for (i, col) in columns.iter().enumerate() {
+ if row.is_null_at(i) {
+ fields.push(ffi::FfiDatum::default());
+ continue;
+ }
+
+ let datum = match col.data_type() {
+ fcore::metadata::DataType::Boolean(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_BOOL,
+ bool_val: row.get_boolean(i),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::TinyInt(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT32,
+ i32_val: row.get_byte(i) as i32,
+ ..Default::default()
+ },
+ fcore::metadata::DataType::SmallInt(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT32,
+ i32_val: row.get_short(i) as i32,
+ ..Default::default()
+ },
+ fcore::metadata::DataType::Int(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT32,
+ i32_val: row.get_int(i),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::BigInt(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_INT64,
+ i64_val: row.get_long(i),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::Float(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_FLOAT32,
+ f32_val: row.get_float(i),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::Double(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_FLOAT64,
+ f64_val: row.get_double(i),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::String(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_STRING,
+ string_val: row.get_string(i).to_string(),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::Bytes(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_BYTES,
+ bytes_val: row.get_bytes(i).to_vec(),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::Date(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_DATE,
+ i32_val: row.get_date(i).get_inner(),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::Time(_) => ffi::FfiDatum {
+ datum_type: DATUM_TYPE_TIME,
+ i32_val: row.get_time(i).get_inner(),
+ ..Default::default()
+ },
+ fcore::metadata::DataType::Timestamp(dt) => {
+ let ts = row.get_timestamp_ntz(i, dt.precision());
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_TIMESTAMP_NTZ,
+ i64_val: ts.get_millisecond(),
+ i32_val: ts.get_nano_of_millisecond(),
+ ..Default::default()
+ }
+ }
+ fcore::metadata::DataType::TimestampLTz(dt) => {
+ let ts = row.get_timestamp_ltz(i, dt.precision());
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_TIMESTAMP_LTZ,
+ i64_val: ts.get_epoch_millisecond(),
+ i32_val: ts.get_nano_of_millisecond(),
+ ..Default::default()
+ }
+ }
+ fcore::metadata::DataType::Decimal(dt) => {
+ let precision = dt.precision();
+ let scale = dt.scale();
+ let decimal = row.get_decimal(i, precision as usize, scale as
usize);
+ if fcore::row::Decimal::is_compact_precision(precision) {
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_DECIMAL_I64,
+ i64_val: decimal.to_unscaled_long().map_err(|e| {
+ anyhow!("Column {i}: compact decimal conversion
failed: {e}")
+ })?,
+ decimal_precision: precision as i32,
+ decimal_scale: scale as i32,
+ ..Default::default()
+ }
+ } else {
+ let bd = decimal.to_big_decimal();
+ let (unscaled, _) = bd.into_bigint_and_exponent();
+ use bigdecimal::ToPrimitive;
+ let i128_val = unscaled.to_i128().ok_or_else(|| {
+ anyhow!("Column {i}: decimal unscaled value does not
fit in i128")
+ })?;
+ ffi::FfiDatum {
+ datum_type: DATUM_TYPE_DECIMAL_I128,
+ i128_hi: (i128_val >> 64) as i64,
+ i128_lo: i128_val as i64,
+ decimal_precision: precision as i32,
+ decimal_scale: scale as i32,
+ ..Default::default()
+ }
+ }
+ }
+ other => return Err(anyhow!("Unsupported data type for column {i}:
{other:?}")),
};
fields.push(datum);
}
- fields
+ Ok(ffi::FfiGenericRow { fields })
}
pub fn core_lake_snapshot_to_ffi(snapshot: &fcore::metadata::LakeSnapshot) ->
ffi::FfiLakeSnapshot {