This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8a13656 chore: Fix issue where interleaving non-Pk column with PK
columns cause lookup panic (#346)
8a13656 is described below
commit 8a136560ff8d2e2535d2b74736bcd06d71b7e139
Author: Keith Lee <[email protected]>
AuthorDate: Tue Feb 17 12:27:54 2026 +0000
chore: Fix issue where interleaving non-Pk column with PK columns cause
lookup panic (#346)
---
bindings/python/src/lookup.rs | 4 +--
bindings/python/src/table.rs | 52 +++++++++++++++++++++++++++++-----
bindings/python/test/test_kv_table.py | 4 ++-
bindings/python/test/test_log_table.py | 3 ++
4 files changed, 53 insertions(+), 10 deletions(-)
diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs
index 718f8e5..e2edbf4 100644
--- a/bindings/python/src/lookup.rs
+++ b/bindings/python/src/lookup.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::table::{internal_row_to_dict, python_to_sparse_generic_row};
+use crate::table::{internal_row_to_dict, python_to_dense_generic_row};
use crate::*;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
@@ -53,7 +53,7 @@ impl Lookuper {
pk: &Bound<'_, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
let pk_indices = self.table_info.get_schema().primary_key_indexes();
- let generic_row = python_to_sparse_generic_row(pk, &self.table_info,
&pk_indices)?;
+ let generic_row = python_to_dense_generic_row(pk, &self.table_info,
&pk_indices)?;
let inner = self.inner.clone();
let table_info = self.table_info.clone();
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 8085215..c3ea248 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -783,6 +783,7 @@ fn process_sequence(
target_indices: &[usize],
fields: &[fcore::metadata::DataField],
datums: &mut [fcore::row::Datum<'static>],
+ sparse: bool,
) -> PyResult<()> {
if seq.len()? != target_indices.len() {
return Err(FlussError::new_err(format!(
@@ -794,7 +795,8 @@ fn process_sequence(
for (i, &col_idx) in target_indices.iter().enumerate() {
let field = &fields[col_idx];
let value = seq.get_item(i)?;
- datums[col_idx] = python_value_to_datum(&value, field.data_type())
+ let dest = if sparse { col_idx } else { i };
+ datums[dest] = python_value_to_datum(&value, field.data_type())
.map_err(|e| FlussError::new_err(format!("Field '{}': {}",
field.name(), e)))?;
}
Ok(())
@@ -806,12 +808,37 @@ pub fn python_to_sparse_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
target_indices: &[usize],
+) -> PyResult<fcore::row::GenericRow<'static>> {
+ python_to_generic_row_inner(row, table_info, target_indices, true)
+}
+
+/// Build a dense GenericRow with exactly `target_indices.len()` fields,
+/// containing only the target column values in order.
+pub fn python_to_dense_generic_row(
+ row: &Bound<PyAny>,
+ table_info: &fcore::metadata::TableInfo,
+ target_indices: &[usize],
+) -> PyResult<fcore::row::GenericRow<'static>> {
+ python_to_generic_row_inner(row, table_info, target_indices, false)
+}
+
+/// Build a GenericRow from user input. When `sparse` is true, the row is full
width and padded with nulls
+fn python_to_generic_row_inner(
+ row: &Bound<PyAny>,
+ table_info: &fcore::metadata::TableInfo,
+ target_indices: &[usize],
+ sparse: bool,
) -> PyResult<fcore::row::GenericRow<'static>> {
let row_type = table_info.row_type();
let fields = row_type.fields();
let target_names: Vec<&str> = target_indices.iter().map(|&i|
fields[i].name()).collect();
- let mut datums: Vec<fcore::row::Datum<'static>> =
vec![fcore::row::Datum::Null; fields.len()];
+ let num_fields = if sparse {
+ fields.len()
+ } else {
+ target_indices.len()
+ };
+ let mut datums: Vec<fcore::row::Datum<'static>> =
vec![fcore::row::Datum::Null; num_fields];
let row_input: RowInput = row.extract().map_err(|_| {
let type_name = row
@@ -849,19 +876,30 @@ pub fn python_to_sparse_generic_row(
let value = dict
.get_item(name)?
.ok_or_else(|| FlussError::new_err(format!("Missing field:
{name}")))?;
- datums[col_idx] = python_value_to_datum(&value,
field.data_type())
+ let dest = if sparse { col_idx } else { i };
+ datums[dest] = python_value_to_datum(&value, field.data_type())
.map_err(|e| FlussError::new_err(format!("Field '{name}':
{e}")))?;
}
}
RowInput::List(list) => {
- let seq = list.as_sequence();
- process_sequence(seq, target_indices, fields, &mut datums)?;
+ process_sequence(
+ list.as_sequence(),
+ target_indices,
+ fields,
+ &mut datums,
+ sparse,
+ )?;
}
RowInput::Tuple(tuple) => {
- let seq = tuple.as_sequence();
- process_sequence(seq, target_indices, fields, &mut datums)?;
+ process_sequence(
+ tuple.as_sequence(),
+ target_indices,
+ fields,
+ &mut datums,
+ sparse,
+ )?;
}
}
diff --git a/bindings/python/test/test_kv_table.py
b/bindings/python/test/test_kv_table.py
index 98b0cee..36aa3e4 100644
--- a/bindings/python/test/test_kv_table.py
+++ b/bindings/python/test/test_kv_table.py
@@ -101,12 +101,14 @@ async def test_composite_primary_keys(connection, admin):
table_path = fluss.TablePath("fluss", "py_test_composite_pk")
await admin.drop_table(table_path, ignore_if_not_exists=True)
+ # PK columns intentionally interleaved with non-PK column to verify
+ # that lookup correctly handles non-contiguous primary key indices.
schema = fluss.Schema(
pa.schema(
[
pa.field("region", pa.string()),
- pa.field("user_id", pa.int32()),
pa.field("score", pa.int64()),
+ pa.field("user_id", pa.int32()),
]
),
primary_keys=["region", "user_id"],
diff --git a/bindings/python/test/test_log_table.py
b/bindings/python/test/test_log_table.py
index 3219f03..09586aa 100644
--- a/bindings/python/test/test_log_table.py
+++ b/bindings/python/test/test_log_table.py
@@ -179,6 +179,9 @@ async def test_list_offsets(connection, admin):
)
assert ts_before[0] == 0
+ # Intentional sleep to avoid race condition FlussError(code=38) The
timestamp is invalid
+ await asyncio.sleep(1)
+
# Timestamp after append should resolve to offset 3
ts_after = await admin.list_offsets(
table_path,