This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9765235 chore: fix sparse to dense representation for CPP lookup
(#348)
9765235 is described below
commit 9765235be718a09bc78582b31df106f924d40b20
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Feb 18 01:42:34 2026 +0000
chore: fix sparse to dense representation for CPP lookup (#348)
---
bindings/cpp/examples/kv_example.cpp | 29 ++++++++++++++++-------------
bindings/cpp/src/lib.rs | 22 ++++++++++++++--------
2 files changed, 30 insertions(+), 21 deletions(-)
diff --git a/bindings/cpp/examples/kv_example.cpp
b/bindings/cpp/examples/kv_example.cpp
index 497894f..46ed01f 100644
--- a/bindings/cpp/examples/kv_example.cpp
+++ b/bindings/cpp/examples/kv_example.cpp
@@ -371,11 +371,13 @@ int main() {
fluss::TablePath partitioned_kv_path("fluss", "partitioned_kv_cpp_v1");
admin.DropTable(partitioned_kv_path, true);
+ // PK columns intentionally interleaved with non-PK columns to verify
+ // that lookup correctly builds a dense PK-only row (not sparse
full-width).
auto partitioned_kv_schema = fluss::Schema::NewBuilder()
.AddColumn("region",
fluss::DataType::String())
+ .AddColumn("score",
fluss::DataType::BigInt())
.AddColumn("user_id",
fluss::DataType::Int())
.AddColumn("name",
fluss::DataType::String())
- .AddColumn("score",
fluss::DataType::BigInt())
.SetPrimaryKeys({"region", "user_id"})
.Build();
@@ -403,23 +405,24 @@ int main() {
partitioned_kv_table.NewUpsert().CreateWriter(partitioned_writer));
// Upsert rows across partitions
+ // Column order: region(0), score(1), user_id(2), name(3)
struct TestRow {
const char* region;
+ int64_t score;
int32_t user_id;
const char* name;
- int64_t score;
};
TestRow test_data[] = {
- {"US", 1, "Gustave", 100}, {"US", 2, "Lune", 200}, {"EU", 1,
"Sciel", 150},
- {"EU", 2, "Maelle", 250}, {"APAC", 1, "Noco", 300},
+ {"US", 100, 1, "Gustave"}, {"US", 200, 2, "Lune"}, {"EU", 150, 1,
"Sciel"},
+ {"EU", 250, 2, "Maelle"}, {"APAC", 300, 1, "Noco"},
};
for (const auto& td : test_data) {
auto row = partitioned_kv_table.NewRow();
row.Set("region", td.region);
+ row.Set("score", td.score);
row.Set("user_id", td.user_id);
row.Set("name", td.name);
- row.Set("score", td.score);
check("partitioned_upsert", partitioned_writer.Upsert(row));
}
check("partitioned_flush", partitioned_writer.Flush());
@@ -442,7 +445,7 @@ int main() {
<< std::endl;
std::exit(1);
}
- if (result.GetString(2) != td.name || result.GetInt64(3) != td.score) {
+ if (result.GetString(3) != td.name || result.GetInt64(1) != td.score) {
std::cerr << "ERROR: Data mismatch for region=" << td.region
<< " user_id=" << td.user_id << std::endl;
std::exit(1);
@@ -454,9 +457,9 @@ int main() {
{
auto row = partitioned_kv_table.NewRow();
row.Set("region", "US");
+ row.Set("score", static_cast<int64_t>(999));
row.Set("user_id", 1);
row.Set("name", "Gustave Updated");
- row.Set("score", static_cast<int64_t>(999));
fluss::WriteResult wr;
check("partitioned_update", partitioned_writer.Upsert(row, wr));
check("partitioned_update_wait", wr.Wait());
@@ -467,13 +470,13 @@ int main() {
pk.Set("user_id", 1);
fluss::LookupResult result;
check("partitioned_lookup_updated", partitioned_lookuper.Lookup(pk,
result));
- if (!result.Found() || result.GetString(2) != "Gustave Updated" ||
- result.GetInt64(3) != 999) {
+ if (!result.Found() || result.GetString(3) != "Gustave Updated" ||
+ result.GetInt64(1) != 999) {
std::cerr << "ERROR: Partition update verification failed" <<
std::endl;
std::exit(1);
}
- std::cout << "Update verified: US/1 name=" << result.GetString(2)
- << " score=" << result.GetInt64(3) << std::endl;
+ std::cout << "Update verified: US/1 name=" << result.GetString(3)
+ << " score=" << result.GetInt64(1) << std::endl;
}
// Lookup in non-existent partition
@@ -520,11 +523,11 @@ int main() {
pk.Set("user_id", 2);
fluss::LookupResult result;
check("partitioned_lookup_eu2", partitioned_lookuper.Lookup(pk,
result));
- if (!result.Found() || result.GetString(2) != "Maelle") {
+ if (!result.Found() || result.GetString(3) != "Maelle") {
std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" <<
std::endl;
std::exit(1);
}
- std::cout << "EU/2 still exists: name=" << result.GetString(2) <<
std::endl;
+ std::cout << "EU/2 still exists: name=" << result.GetString(3) <<
std::endl;
}
check("drop_partitioned_kv", admin.DropTable(partitioned_kv_path, true));
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 444b073..cb29882 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -1321,20 +1321,26 @@ unsafe fn delete_lookuper(lookuper: *mut Lookuper) {
}
impl Lookuper {
- /// Pad row with Null to full schema width (same as UpsertWriter::pad_row).
- /// Ensures the PK row is always full-width, matching Python's behavior.
- fn pad_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) ->
fcore::row::GenericRow<'a> {
- let num_columns = self.table_info.get_schema().columns().len();
- if row.values.len() < num_columns {
- row.values.resize(num_columns, fcore::row::Datum::Null);
+ /// Build a dense PK-only row from a (possibly sparse) input row.
+ /// The user may set PK values at their full schema positions (e.g. [0, 2])
+ /// via name-based Set(). We compact them into [0, 1, …] to match
+ /// the lookup_row_type the core KeyEncoder expects.
+ fn dense_pk_row<'a>(&self, mut row: fcore::row::GenericRow<'a>) ->
fcore::row::GenericRow<'a> {
+ let pk_indices = self.table_info.get_schema().primary_key_indexes();
+ let mut dense = fcore::row::GenericRow::new(pk_indices.len());
+ for (dense_idx, &schema_idx) in pk_indices.iter().enumerate() {
+ if schema_idx < row.values.len() {
+ dense.values[dense_idx] =
+ std::mem::replace(&mut row.values[schema_idx],
fcore::row::Datum::Null);
+ }
}
- row
+ dense
}
fn lookup(&mut self, pk_row: &GenericRowInner) -> Box<LookupResultInner> {
let schema = self.table_info.get_schema();
let generic_row = match types::resolve_row_types(&pk_row.row,
Some(schema)) {
- Ok(r) => self.pad_row(r),
+ Ok(r) => self.dense_pk_row(r),
Err(e) => {
return Box::new(LookupResultInner::from_error(
CLIENT_ERROR_CODE,