This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a13656  chore: Fix issue where interleaving non-Pk column with PK 
columns cause lookup panic (#346)
8a13656 is described below

commit 8a136560ff8d2e2535d2b74736bcd06d71b7e139
Author: Keith Lee <[email protected]>
AuthorDate: Tue Feb 17 12:27:54 2026 +0000

    chore: Fix issue where interleaving non-Pk column with PK columns cause 
lookup panic (#346)
---
 bindings/python/src/lookup.rs          |  4 +--
 bindings/python/src/table.rs           | 52 +++++++++++++++++++++++++++++-----
 bindings/python/test/test_kv_table.py  |  4 ++-
 bindings/python/test/test_log_table.py |  3 ++
 4 files changed, 53 insertions(+), 10 deletions(-)

diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs
index 718f8e5..e2edbf4 100644
--- a/bindings/python/src/lookup.rs
+++ b/bindings/python/src/lookup.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::table::{internal_row_to_dict, python_to_sparse_generic_row};
+use crate::table::{internal_row_to_dict, python_to_dense_generic_row};
 use crate::*;
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::Arc;
@@ -53,7 +53,7 @@ impl Lookuper {
         pk: &Bound<'_, PyAny>,
     ) -> PyResult<Bound<'py, PyAny>> {
         let pk_indices = self.table_info.get_schema().primary_key_indexes();
-        let generic_row = python_to_sparse_generic_row(pk, &self.table_info, 
&pk_indices)?;
+        let generic_row = python_to_dense_generic_row(pk, &self.table_info, 
&pk_indices)?;
         let inner = self.inner.clone();
         let table_info = self.table_info.clone();
 
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 8085215..c3ea248 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -783,6 +783,7 @@ fn process_sequence(
     target_indices: &[usize],
     fields: &[fcore::metadata::DataField],
     datums: &mut [fcore::row::Datum<'static>],
+    sparse: bool,
 ) -> PyResult<()> {
     if seq.len()? != target_indices.len() {
         return Err(FlussError::new_err(format!(
@@ -794,7 +795,8 @@ fn process_sequence(
     for (i, &col_idx) in target_indices.iter().enumerate() {
         let field = &fields[col_idx];
         let value = seq.get_item(i)?;
-        datums[col_idx] = python_value_to_datum(&value, field.data_type())
+        let dest = if sparse { col_idx } else { i };
+        datums[dest] = python_value_to_datum(&value, field.data_type())
             .map_err(|e| FlussError::new_err(format!("Field '{}': {}", 
field.name(), e)))?;
     }
     Ok(())
@@ -806,12 +808,37 @@ pub fn python_to_sparse_generic_row(
     row: &Bound<PyAny>,
     table_info: &fcore::metadata::TableInfo,
     target_indices: &[usize],
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    python_to_generic_row_inner(row, table_info, target_indices, true)
+}
+
+/// Build a dense GenericRow with exactly `target_indices.len()` fields,
+/// containing only the target column values in order.
+pub fn python_to_dense_generic_row(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+    target_indices: &[usize],
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    python_to_generic_row_inner(row, table_info, target_indices, false)
+}
+
+/// Build a GenericRow from user input. When `sparse` is true, the row is full 
width and padded with nulls
+fn python_to_generic_row_inner(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+    target_indices: &[usize],
+    sparse: bool,
 ) -> PyResult<fcore::row::GenericRow<'static>> {
     let row_type = table_info.row_type();
     let fields = row_type.fields();
     let target_names: Vec<&str> = target_indices.iter().map(|&i| 
fields[i].name()).collect();
 
-    let mut datums: Vec<fcore::row::Datum<'static>> = 
vec![fcore::row::Datum::Null; fields.len()];
+    let num_fields = if sparse {
+        fields.len()
+    } else {
+        target_indices.len()
+    };
+    let mut datums: Vec<fcore::row::Datum<'static>> = 
vec![fcore::row::Datum::Null; num_fields];
 
     let row_input: RowInput = row.extract().map_err(|_| {
         let type_name = row
@@ -849,19 +876,30 @@ pub fn python_to_sparse_generic_row(
                 let value = dict
                     .get_item(name)?
                     .ok_or_else(|| FlussError::new_err(format!("Missing field: 
{name}")))?;
-                datums[col_idx] = python_value_to_datum(&value, 
field.data_type())
+                let dest = if sparse { col_idx } else { i };
+                datums[dest] = python_value_to_datum(&value, field.data_type())
                     .map_err(|e| FlussError::new_err(format!("Field '{name}': 
{e}")))?;
             }
         }
 
         RowInput::List(list) => {
-            let seq = list.as_sequence();
-            process_sequence(seq, target_indices, fields, &mut datums)?;
+            process_sequence(
+                list.as_sequence(),
+                target_indices,
+                fields,
+                &mut datums,
+                sparse,
+            )?;
         }
 
         RowInput::Tuple(tuple) => {
-            let seq = tuple.as_sequence();
-            process_sequence(seq, target_indices, fields, &mut datums)?;
+            process_sequence(
+                tuple.as_sequence(),
+                target_indices,
+                fields,
+                &mut datums,
+                sparse,
+            )?;
         }
     }
 
diff --git a/bindings/python/test/test_kv_table.py 
b/bindings/python/test/test_kv_table.py
index 98b0cee..36aa3e4 100644
--- a/bindings/python/test/test_kv_table.py
+++ b/bindings/python/test/test_kv_table.py
@@ -101,12 +101,14 @@ async def test_composite_primary_keys(connection, admin):
     table_path = fluss.TablePath("fluss", "py_test_composite_pk")
     await admin.drop_table(table_path, ignore_if_not_exists=True)
 
+    # PK columns intentionally interleaved with non-PK column to verify
+    # that lookup correctly handles non-contiguous primary key indices.
     schema = fluss.Schema(
         pa.schema(
             [
                 pa.field("region", pa.string()),
-                pa.field("user_id", pa.int32()),
                 pa.field("score", pa.int64()),
+                pa.field("user_id", pa.int32()),
             ]
         ),
         primary_keys=["region", "user_id"],
diff --git a/bindings/python/test/test_log_table.py 
b/bindings/python/test/test_log_table.py
index 3219f03..09586aa 100644
--- a/bindings/python/test/test_log_table.py
+++ b/bindings/python/test/test_log_table.py
@@ -179,6 +179,9 @@ async def test_list_offsets(connection, admin):
     )
     assert ts_before[0] == 0
 
+    # Intentional sleep to avoid race condition FlussError(code=38) The 
timestamp is invalid
+    await asyncio.sleep(1)
+
     # Timestamp after append should resolve to offset 3
     ts_after = await admin.list_offsets(
         table_path,

Reply via email to