This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9765235  chore: fix sparse to dense representation for CPP lookup 
(#348)
9765235 is described below

commit 9765235be718a09bc78582b31df106f924d40b20
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 18 01:42:34 2026 +0000

    chore: fix sparse to dense representation for CPP lookup (#348)
---
 bindings/cpp/examples/kv_example.cpp | 29 ++++++++++++++++-------------
 bindings/cpp/src/lib.rs              | 22 ++++++++++++++--------
 2 files changed, 30 insertions(+), 21 deletions(-)

diff --git a/bindings/cpp/examples/kv_example.cpp 
b/bindings/cpp/examples/kv_example.cpp
index 497894f..46ed01f 100644
--- a/bindings/cpp/examples/kv_example.cpp
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -371,11 +371,13 @@ int main() {
     fluss::TablePath partitioned_kv_path("fluss", "partitioned_kv_cpp_v1");
     admin.DropTable(partitioned_kv_path, true);
 
+    // PK columns intentionally interleaved with non-PK columns to verify
+    // that lookup correctly builds a dense PK-only row (not sparse 
full-width).
     auto partitioned_kv_schema = fluss::Schema::NewBuilder()
                                      .AddColumn("region", 
fluss::DataType::String())
+                                     .AddColumn("score", 
fluss::DataType::BigInt())
                                      .AddColumn("user_id", 
fluss::DataType::Int())
                                      .AddColumn("name", 
fluss::DataType::String())
-                                     .AddColumn("score", 
fluss::DataType::BigInt())
                                      .SetPrimaryKeys({"region", "user_id"})
                                      .Build();
 
@@ -403,23 +405,24 @@ int main() {
           partitioned_kv_table.NewUpsert().CreateWriter(partitioned_writer));
 
     // Upsert rows across partitions
+    // Column order: region(0), score(1), user_id(2), name(3)
     struct TestRow {
         const char* region;
+        int64_t score;
         int32_t user_id;
         const char* name;
-        int64_t score;
     };
     TestRow test_data[] = {
-        {"US", 1, "Gustave", 100}, {"US", 2, "Lune", 200},   {"EU", 1, 
"Sciel", 150},
-        {"EU", 2, "Maelle", 250},  {"APAC", 1, "Noco", 300},
+        {"US", 100, 1, "Gustave"}, {"US", 200, 2, "Lune"},   {"EU", 150, 1, 
"Sciel"},
+        {"EU", 250, 2, "Maelle"},  {"APAC", 300, 1, "Noco"},
     };
 
     for (const auto& td : test_data) {
         auto row = partitioned_kv_table.NewRow();
         row.Set("region", td.region);
+        row.Set("score", td.score);
         row.Set("user_id", td.user_id);
         row.Set("name", td.name);
-        row.Set("score", td.score);
         check("partitioned_upsert", partitioned_writer.Upsert(row));
     }
     check("partitioned_flush", partitioned_writer.Flush());
@@ -442,7 +445,7 @@ int main() {
                       << std::endl;
             std::exit(1);
         }
-        if (result.GetString(2) != td.name || result.GetInt64(3) != td.score) {
+        if (result.GetString(3) != td.name || result.GetInt64(1) != td.score) {
             std::cerr << "ERROR: Data mismatch for region=" << td.region
                       << " user_id=" << td.user_id << std::endl;
             std::exit(1);
@@ -454,9 +457,9 @@ int main() {
     {
         auto row = partitioned_kv_table.NewRow();
         row.Set("region", "US");
+        row.Set("score", static_cast<int64_t>(999));
         row.Set("user_id", 1);
         row.Set("name", "Gustave Updated");
-        row.Set("score", static_cast<int64_t>(999));
         fluss::WriteResult wr;
         check("partitioned_update", partitioned_writer.Upsert(row, wr));
         check("partitioned_update_wait", wr.Wait());
@@ -467,13 +470,13 @@ int main() {
         pk.Set("user_id", 1);
         fluss::LookupResult result;
         check("partitioned_lookup_updated", partitioned_lookuper.Lookup(pk, 
result));
-        if (!result.Found() || result.GetString(2) != "Gustave Updated" ||
-            result.GetInt64(3) != 999) {
+        if (!result.Found() || result.GetString(3) != "Gustave Updated" ||
+            result.GetInt64(1) != 999) {
             std::cerr << "ERROR: Partition update verification failed" << 
std::endl;
             std::exit(1);
         }
-        std::cout << "Update verified: US/1 name=" << result.GetString(2)
-                  << " score=" << result.GetInt64(3) << std::endl;
+        std::cout << "Update verified: US/1 name=" << result.GetString(3)
+                  << " score=" << result.GetInt64(1) << std::endl;
     }
 
     // Lookup in non-existent partition
@@ -520,11 +523,11 @@ int main() {
         pk.Set("user_id", 2);
         fluss::LookupResult result;
         check("partitioned_lookup_eu2", partitioned_lookuper.Lookup(pk, 
result));
-        if (!result.Found() || result.GetString(2) != "Maelle") {
+        if (!result.Found() || result.GetString(3) != "Maelle") {
             std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" << 
std::endl;
             std::exit(1);
         }
-        std::cout << "EU/2 still exists: name=" << result.GetString(2) << 
std::endl;
+        std::cout << "EU/2 still exists: name=" << result.GetString(3) << 
std::endl;
     }
 
     check("drop_partitioned_kv", admin.DropTable(partitioned_kv_path, true));
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 444b073..cb29882 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -1321,20 +1321,26 @@ unsafe fn delete_lookuper(lookuper: *mut Lookuper) {
 }
 
 impl Lookuper {
-    /// Pad row with Null to full schema width (same as UpsertWriter::pad_row).
-    /// Ensures the PK row is always full-width, matching Python's behavior.
-    fn pad_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) -> 
fcore::row::GenericRow<'a> {
-        let num_columns = self.table_info.get_schema().columns().len();
-        if row.values.len() < num_columns {
-            row.values.resize(num_columns, fcore::row::Datum::Null);
+    /// Build a dense PK-only row from a (possibly sparse) input row.
+    /// The user may set PK values at their full schema positions (e.g. [0, 2])
+    /// via name-based Set(). We compact them into [0, 1, …] to match
+    /// the lookup_row_type the core KeyEncoder expects.
+    fn dense_pk_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) -> 
fcore::row::GenericRow<'a> {
+        let pk_indices = self.table_info.get_schema().primary_key_indexes();
+        let mut dense = fcore::row::GenericRow::new(pk_indices.len());
+        for (dense_idx, &schema_idx) in pk_indices.iter().enumerate() {
+            if schema_idx < row.values.len() {
+                dense.values[dense_idx] =
+                    std::mem::replace(&mut row.values[schema_idx], 
fcore::row::Datum::Null);
+            }
         }
-        row
+        dense
     }
 
     fn lookup(&mut self, pk_row: &GenericRowInner) -> Box<LookupResultInner> {
         let schema = self.table_info.get_schema();
         let generic_row = match types::resolve_row_types(&pk_row.row, 
Some(schema)) {
-            Ok(r) => self.pad_row(r),
+            Ok(r) => self.dense_pk_row(r),
             Err(e) => {
                 return Box::new(LookupResultInner::from_error(
                     CLIENT_ERROR_CODE,

Reply via email to