This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d23cd95 fix: write valid empty BinaryRow bytes for empty table stats
(#349)
d23cd95 is described below
commit d23cd95e98bf88c51941bd3a4d3926a851822d65
Author: shyjsarah <[email protected]>
AuthorDate: Tue Jun 2 20:41:35 2026 +0800
fix: write valid empty BinaryRow bytes for empty table stats (#349)
Empty `BinaryTableStats` previously stored `Vec::new()` for `min_values` /
`max_values`. Java readers parse those fields via
`SerializationUtils.deserializeBinaryRow`, which requires at minimum a
4-byte big-endian arity prefix and throws `BufferUnderflowException` on
zero-length input. The most visible casualty is a non-partitioned table
written via paimon-rust: any Spark/Flink read of its manifest list crashes
inside `SimpleStats.fromRow` because `_PARTITION_STATS._MIN_VALUES` is
empty bytes.
Introduce `BinaryTableStats::empty()` that returns stats backed by
`EMPTY_SERIALIZED_ROW` (the arity=0 BinaryRow, 12 bytes) and route every
existing call site through it: the production fix in
`compute_partition_stats`, the Avro decode fallbacks for missing
`_PARTITION_STATS` / `key_stats` / `value_stats`, and the test fixtures
that copied the old pattern.
Add regression tests that round-trip the empty stats through
`BinaryRow::from_serialized_bytes` — the same protocol the Java reader
uses — so future copies of `vec![]` are caught locally.
---
.../paimon/src/spec/avro/manifest_entry_decode.rs | 8 +-
.../src/spec/avro/manifest_file_meta_decode.rs | 2 +-
crates/paimon/src/spec/manifest.rs | 2 +-
crates/paimon/src/spec/stats.rs | 52 ++++++++++-
crates/paimon/src/table/data_evolution_writer.rs | 2 +-
crates/paimon/src/table/referenced_files.rs | 4 +-
crates/paimon/src/table/table_commit.rs | 102 ++++++++++++++++++---
7 files changed, 151 insertions(+), 21 deletions(-)
diff --git a/crates/paimon/src/spec/avro/manifest_entry_decode.rs
b/crates/paimon/src/spec/avro/manifest_entry_decode.rs
index 1d3ffcf..df251bd 100644
--- a/crates/paimon/src/spec/avro/manifest_entry_decode.rs
+++ b/crates/paimon/src/spec/avro/manifest_entry_decode.rs
@@ -270,8 +270,8 @@ fn decode_data_file_meta(
row_count: row_count.unwrap_or(0),
min_key: min_key.unwrap_or_default(),
max_key: max_key.unwrap_or_default(),
- key_stats: key_stats.unwrap_or_else(|| BinaryTableStats::new(vec![],
vec![], vec![])),
- value_stats: value_stats.unwrap_or_else(||
BinaryTableStats::new(vec![], vec![], vec![])),
+ key_stats: key_stats.unwrap_or_else(BinaryTableStats::empty),
+ value_stats: value_stats.unwrap_or_else(BinaryTableStats::empty),
min_sequence_number: min_sequence_number.unwrap_or(0),
max_sequence_number: max_sequence_number.unwrap_or(0),
schema_id: schema_id.unwrap_or(0),
@@ -391,8 +391,8 @@ fn default_data_file_meta() -> DataFileMeta {
row_count: 0,
min_key: vec![],
max_key: vec![],
- key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
- value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+ key_stats: BinaryTableStats::empty(),
+ value_stats: BinaryTableStats::empty(),
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
diff --git a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
index 7b30c52..fa5cd09 100644
--- a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
+++ b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
@@ -72,7 +72,7 @@ impl AvroRecordDecode for ManifestFileMeta {
file_size.unwrap_or(0),
num_added_files.unwrap_or(0),
num_deleted_files.unwrap_or(0),
- partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![],
vec![], vec![])),
+ partition_stats.unwrap_or_else(BinaryTableStats::empty),
schema_id.unwrap_or(0),
min_bucket,
max_bucket,
diff --git a/crates/paimon/src/spec/manifest.rs
b/crates/paimon/src/spec/manifest.rs
index cd16133..bebd09e 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -125,7 +125,7 @@ mod tests {
use crate::spec::ManifestEntry;
fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry
{
- let stats = BinaryTableStats::new(vec![], vec![], vec![]);
+ let stats = BinaryTableStats::empty();
let file = DataFileMeta {
file_name: file_name.to_string(),
file_size: 100,
diff --git a/crates/paimon/src/spec/stats.rs b/crates/paimon/src/spec/stats.rs
index 2c389a3..4ae9b24 100644
--- a/crates/paimon/src/spec/stats.rs
+++ b/crates/paimon/src/spec/stats.rs
@@ -18,7 +18,7 @@
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{Display, Formatter};
-use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum};
+use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum,
EMPTY_SERIALIZED_ROW};
use arrow_array::RecordBatch;
/// Deserialize `_NULL_COUNTS` which in Avro is `["null",
{"type":"array","items":["null","long"]}]`.
@@ -94,6 +94,21 @@ impl BinaryTableStats {
null_counts,
}
}
+
+ /// Stats with empty (arity=0) BinaryRow bytes for min/max and no null
counts.
+ ///
+ /// Use this whenever there are no columns to collect stats for (e.g. a
non-partitioned
+ /// table's `partition_stats`, or a writer producing no key/value stats
columns). Writing
+ /// `Vec::new()` here breaks the Java reader:
`SerializationUtils.deserializeBinaryRow`
+ /// requires at least the 4-byte BE arity prefix and throws
`BufferUnderflowException` on
+ /// zero-length input.
+ pub fn empty() -> BinaryTableStats {
+ Self {
+ min_values: EMPTY_SERIALIZED_ROW.clone(),
+ max_values: EMPTY_SERIALIZED_ROW.clone(),
+ null_counts: Vec::new(),
+ }
+ }
}
impl Display for BinaryTableStats {
@@ -154,3 +169,38 @@ pub fn compute_column_stats(
null_counts,
))
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::spec::BinaryRow;
+
+ /// Empty stats must produce min/max bytes that the Java side's
+ /// `SerializationUtils.deserializeBinaryRow` accepts: at minimum a 4-byte
BE
+ /// arity prefix. A bare `Vec::new()` would trigger
`BufferUnderflowException`
+ /// when Spark/Flink read manifests written for a non-partitioned table.
+ #[test]
+ fn empty_stats_carries_arity_prefix_parseable_by_reader() {
+ let stats = BinaryTableStats::empty();
+ assert!(
+ stats.min_values().len() >= 4,
+ "min_values must contain at least the 4-byte arity prefix"
+ );
+ assert!(
+ stats.max_values().len() >= 4,
+ "max_values must contain at least the 4-byte arity prefix"
+ );
+ assert!(
+ stats.null_counts().is_empty(),
+ "null_counts stays empty so the Java reader short-circuits to
EMPTY_STATS"
+ );
+
+ // Round-trip through the same parser the Java reader uses (4-byte BE
arity).
+ let min_row = BinaryRow::from_serialized_bytes(stats.min_values())
+ .expect("min_values must decode as a BinaryRow");
+ let max_row = BinaryRow::from_serialized_bytes(stats.max_values())
+ .expect("max_values must decode as a BinaryRow");
+ assert_eq!(min_row.arity(), 0);
+ assert_eq!(max_row.arity(), 0);
+ }
+}
diff --git a/crates/paimon/src/table/data_evolution_writer.rs
b/crates/paimon/src/table/data_evolution_writer.rs
index fe9e234..f63ac24 100644
--- a/crates/paimon/src/table/data_evolution_writer.rs
+++ b/crates/paimon/src/table/data_evolution_writer.rs
@@ -670,7 +670,7 @@ mod tests {
write_cols: Option<Vec<String>>,
) -> DataFileMeta {
use crate::spec::stats::BinaryTableStats;
- let empty_stats = BinaryTableStats::new(vec![], vec![], vec![]);
+ let empty_stats = BinaryTableStats::empty();
DataFileMeta {
file_name: file_name.to_string(),
file_size: 0,
diff --git a/crates/paimon/src/table/referenced_files.rs
b/crates/paimon/src/table/referenced_files.rs
index a170b3c..755b00f 100644
--- a/crates/paimon/src/table/referenced_files.rs
+++ b/crates/paimon/src/table/referenced_files.rs
@@ -1063,8 +1063,8 @@ mod tests {
row_count: 100,
min_key: vec![],
max_key: vec![],
- key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
- value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+ key_stats: BinaryTableStats::empty(),
+ value_stats: BinaryTableStats::empty(),
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
diff --git a/crates/paimon/src/table/table_commit.rs
b/crates/paimon/src/table/table_commit.rs
index 274dd96..27ade42 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -24,9 +24,9 @@ use crate::io::FileIO;
use crate::spec::stats::BinaryTableStats;
use crate::spec::FileKind;
use crate::spec::{
- datums_to_binary_row, extract_datum, BinaryRow, CommitKind, CoreOptions,
DataType, Datum,
- IndexManifest, IndexManifestEntry, Manifest, ManifestEntry,
ManifestFileMeta, ManifestList,
- PartitionStatistics, Snapshot,
+ datums_to_binary_row, extract_datum, BinaryRow, BinaryRowBuilder,
CommitKind, CoreOptions,
+ DataType, Datum, IndexManifest, IndexManifestEntry, Manifest,
ManifestEntry, ManifestFileMeta,
+ ManifestList, PartitionStatistics, Snapshot,
};
use crate::table::commit_message::CommitMessage;
use crate::table::partition_filter::PartitionFilter;
@@ -932,7 +932,7 @@ impl TableCommit {
let num_fields = partition_fields.len();
if num_fields == 0 || entries.is_empty() {
- return Ok(BinaryTableStats::new(vec![], vec![], vec![]));
+ return Ok(BinaryTableStats::empty());
}
let data_types: Vec<_> = partition_fields
@@ -970,11 +970,8 @@ impl TableCommit {
}
}
- let min_datums: Vec<_> = mins.iter().zip(data_types.iter()).collect();
- let max_datums: Vec<_> = maxs.iter().zip(data_types.iter()).collect();
-
- let min_bytes = datums_to_binary_row(&min_datums);
- let max_bytes = datums_to_binary_row(&max_datums);
+ let min_bytes = build_partition_stats_row(&mins, &data_types);
+ let max_bytes = build_partition_stats_row(&maxs, &data_types);
let null_counts = null_counts.into_iter().map(Some).collect();
Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts))
@@ -1127,6 +1124,20 @@ impl TableCommit {
}
}
+/// Serialized BinaryRow for partition stats; unlike `datums_to_binary_row`,
returns a
+/// valid arity-N row even when every datum is `None` (the all-null case must
still
+/// decode on the Java side).
+fn build_partition_stats_row(datums: &[Option<Datum>], data_types:
&[DataType]) -> Vec<u8> {
+ let mut builder = BinaryRowBuilder::new(datums.len() as i32);
+ for (pos, (datum_opt, data_type)) in
datums.iter().zip(data_types.iter()).enumerate() {
+ match datum_opt {
+ Some(d) => builder.write_datum(pos, d, data_type),
+ None => builder.set_null_at(pos),
+ }
+ }
+ builder.build_serialized()
+}
+
/// Plan for resolving commit entries.
enum CommitEntriesPlan {
/// Caller-provided entries. May contain `FileKind::Delete` entries from
CoW
@@ -1234,8 +1245,8 @@ mod tests {
row_count,
min_key: vec![],
max_key: vec![],
- key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
- value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+ key_stats: BinaryTableStats::empty(),
+ value_stats: BinaryTableStats::empty(),
min_sequence_number: 0,
max_sequence_number: 0,
schema_id: 0,
@@ -1837,6 +1848,75 @@ mod tests {
);
}
+ /// Regression: a non-partitioned table (e.g. `CREATE TABLE test_pk (...
PRIMARY KEY ...)`)
+ /// must still emit `_PARTITION_STATS._MIN_VALUES`/`_MAX_VALUES` carrying
the 4-byte BE
+ /// arity prefix; otherwise Java readers like Spark/Flink hit
+ /// `BufferUnderflowException` inside
`SerializationUtils.deserializeBinaryRow`.
+ #[test]
+ fn compute_partition_stats_no_partition_fields_returns_decodable_empty() {
+ let file_io = test_file_io();
+ let commit = setup_commit(&file_io, "memory:/test_no_partition_stats");
+
+ let entry = ManifestEntry::new(
+ FileKind::Add,
+ vec![],
+ 0,
+ 1,
+ test_data_file("data-0.parquet", 1),
+ 2,
+ );
+
+ let stats = commit.compute_partition_stats(&[entry]).unwrap();
+ BinaryRow::from_serialized_bytes(stats.min_values())
+ .expect("min_values must decode via the same protocol as Java's
deserializeBinaryRow");
+ BinaryRow::from_serialized_bytes(stats.max_values())
+ .expect("max_values must decode via the same protocol as Java's
deserializeBinaryRow");
+ assert!(stats.null_counts().is_empty());
+ }
+
+ /// Regression: when there are no entries at all, the empty stats we
return must also
+ /// satisfy the protocol — same Java reader path runs on it.
+ #[test]
+ fn compute_partition_stats_empty_entries_returns_decodable_empty() {
+ let file_io = test_file_io();
+ let commit = setup_partitioned_commit(&file_io,
"memory:/test_no_entries_stats");
+
+ let stats = commit.compute_partition_stats(&[]).unwrap();
+ BinaryRow::from_serialized_bytes(stats.min_values()).unwrap();
+ BinaryRow::from_serialized_bytes(stats.max_values()).unwrap();
+ assert!(stats.null_counts().is_empty());
+ }
+
+ /// Regression: partitioned table with an all-null partition row must
still emit
+ /// decodable min/max bytes (otherwise Java hits
`BufferUnderflowException`).
+ #[test]
+ fn
compute_partition_stats_all_null_partition_values_returns_decodable_bytes() {
+ let file_io = test_file_io();
+ let commit = setup_partitioned_commit(&file_io,
"memory:/test_all_null_partition_stats");
+
+ let mut builder = BinaryRowBuilder::new(1);
+ builder.set_null_at(0);
+ let null_partition = builder.build_serialized();
+
+ let entry = ManifestEntry::new(
+ FileKind::Add,
+ null_partition,
+ 0,
+ 1,
+ test_data_file("data-null-pt.parquet", 1),
+ 2,
+ );
+
+ let stats = commit.compute_partition_stats(&[entry]).unwrap();
+ let min_row =
BinaryRow::from_serialized_bytes(stats.min_values()).unwrap();
+ let max_row =
BinaryRow::from_serialized_bytes(stats.max_values()).unwrap();
+ assert_eq!(min_row.arity(), 1);
+ assert_eq!(max_row.arity(), 1);
+ assert!(min_row.is_null_at(0));
+ assert!(max_row.is_null_at(0));
+ assert_eq!(stats.null_counts(), &vec![Some(1)]);
+ }
+
/// `write_manifest_file` must aggregate min/max bucket and level across
entries so the
/// Java reader can prune manifests by bucket / level (see
apache/paimon#5345). This
/// drives a real commit so all the call-site plumbing is exercised end to
end.