This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d23cd95  fix: write valid empty BinaryRow bytes for empty table stats 
(#349)
d23cd95 is described below

commit d23cd95e98bf88c51941bd3a4d3926a851822d65
Author: shyjsarah <[email protected]>
AuthorDate: Tue Jun 2 20:41:35 2026 +0800

    fix: write valid empty BinaryRow bytes for empty table stats (#349)
    
    Empty `BinaryTableStats` previously stored `Vec::new()` for `min_values` /
    `max_values`. Java readers parse those fields via
    `SerializationUtils.deserializeBinaryRow`, which requires at minimum a
    4-byte big-endian arity prefix and throws `BufferUnderflowException` on
    zero-length input. The most visible casualty is a non-partitioned table
    written via paimon-rust: any Spark/Flink read of its manifest list crashes
    inside `SimpleStats.fromRow` because `_PARTITION_STATS._MIN_VALUES` is
    empty bytes.
    
    Introduce `BinaryTableStats::empty()` that returns stats backed by
    `EMPTY_SERIALIZED_ROW` (the arity=0 BinaryRow, 12 bytes) and route every
    existing call site through it: the production fix in
    `compute_partition_stats`, the Avro decode fallbacks for missing
    `_PARTITION_STATS` / `key_stats` / `value_stats`, and the test fixtures
    that copied the old pattern.
    
    Add regression tests that round-trip the empty stats through
    `BinaryRow::from_serialized_bytes` — the same protocol the Java reader
    uses — so future copies of `vec![]` are caught locally.
---
 .../paimon/src/spec/avro/manifest_entry_decode.rs  |   8 +-
 .../src/spec/avro/manifest_file_meta_decode.rs     |   2 +-
 crates/paimon/src/spec/manifest.rs                 |   2 +-
 crates/paimon/src/spec/stats.rs                    |  52 ++++++++++-
 crates/paimon/src/table/data_evolution_writer.rs   |   2 +-
 crates/paimon/src/table/referenced_files.rs        |   4 +-
 crates/paimon/src/table/table_commit.rs            | 102 ++++++++++++++++++---
 7 files changed, 151 insertions(+), 21 deletions(-)

diff --git a/crates/paimon/src/spec/avro/manifest_entry_decode.rs 
b/crates/paimon/src/spec/avro/manifest_entry_decode.rs
index 1d3ffcf..df251bd 100644
--- a/crates/paimon/src/spec/avro/manifest_entry_decode.rs
+++ b/crates/paimon/src/spec/avro/manifest_entry_decode.rs
@@ -270,8 +270,8 @@ fn decode_data_file_meta(
         row_count: row_count.unwrap_or(0),
         min_key: min_key.unwrap_or_default(),
         max_key: max_key.unwrap_or_default(),
-        key_stats: key_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], 
vec![], vec![])),
-        value_stats: value_stats.unwrap_or_else(|| 
BinaryTableStats::new(vec![], vec![], vec![])),
+        key_stats: key_stats.unwrap_or_else(BinaryTableStats::empty),
+        value_stats: value_stats.unwrap_or_else(BinaryTableStats::empty),
         min_sequence_number: min_sequence_number.unwrap_or(0),
         max_sequence_number: max_sequence_number.unwrap_or(0),
         schema_id: schema_id.unwrap_or(0),
@@ -391,8 +391,8 @@ fn default_data_file_meta() -> DataFileMeta {
         row_count: 0,
         min_key: vec![],
         max_key: vec![],
-        key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
-        value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+        key_stats: BinaryTableStats::empty(),
+        value_stats: BinaryTableStats::empty(),
         min_sequence_number: 0,
         max_sequence_number: 0,
         schema_id: 0,
diff --git a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs 
b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
index 7b30c52..fa5cd09 100644
--- a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
+++ b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs
@@ -72,7 +72,7 @@ impl AvroRecordDecode for ManifestFileMeta {
             file_size.unwrap_or(0),
             num_added_files.unwrap_or(0),
             num_deleted_files.unwrap_or(0),
-            partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], 
vec![], vec![])),
+            partition_stats.unwrap_or_else(BinaryTableStats::empty),
             schema_id.unwrap_or(0),
             min_bucket,
             max_bucket,
diff --git a/crates/paimon/src/spec/manifest.rs 
b/crates/paimon/src/spec/manifest.rs
index cd16133..bebd09e 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -125,7 +125,7 @@ mod tests {
         use crate::spec::ManifestEntry;
 
         fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry 
{
-            let stats = BinaryTableStats::new(vec![], vec![], vec![]);
+            let stats = BinaryTableStats::empty();
             let file = DataFileMeta {
                 file_name: file_name.to_string(),
                 file_size: 100,
diff --git a/crates/paimon/src/spec/stats.rs b/crates/paimon/src/spec/stats.rs
index 2c389a3..4ae9b24 100644
--- a/crates/paimon/src/spec/stats.rs
+++ b/crates/paimon/src/spec/stats.rs
@@ -18,7 +18,7 @@
 use serde::{Deserialize, Deserializer, Serialize, Serializer};
 use std::fmt::{Display, Formatter};
 
-use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum};
+use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum, 
EMPTY_SERIALIZED_ROW};
 use arrow_array::RecordBatch;
 
 /// Deserialize `_NULL_COUNTS` which in Avro is `["null", 
{"type":"array","items":["null","long"]}]`.
@@ -94,6 +94,21 @@ impl BinaryTableStats {
             null_counts,
         }
     }
+
+    /// Stats with empty (arity=0) BinaryRow bytes for min/max and no null 
counts.
+    ///
+    /// Use this whenever there are no columns to collect stats for (e.g. a 
non-partitioned
+    /// table's `partition_stats`, or a writer producing no key/value stats 
columns). Writing
+    /// `Vec::new()` here breaks the Java reader: 
`SerializationUtils.deserializeBinaryRow`
+    /// requires at least the 4-byte BE arity prefix and throws 
`BufferUnderflowException` on
+    /// zero-length input.
+    pub fn empty() -> BinaryTableStats {
+        Self {
+            min_values: EMPTY_SERIALIZED_ROW.clone(),
+            max_values: EMPTY_SERIALIZED_ROW.clone(),
+            null_counts: Vec::new(),
+        }
+    }
 }
 
 impl Display for BinaryTableStats {
@@ -154,3 +169,38 @@ pub fn compute_column_stats(
         null_counts,
     ))
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::spec::BinaryRow;
+
+    /// Empty stats must produce min/max bytes that the Java side's
+    /// `SerializationUtils.deserializeBinaryRow` accepts: at minimum a 4-byte 
BE
+    /// arity prefix. A bare `Vec::new()` would trigger 
`BufferUnderflowException`
+    /// when Spark/Flink read manifests written for a non-partitioned table.
+    #[test]
+    fn empty_stats_carries_arity_prefix_parseable_by_reader() {
+        let stats = BinaryTableStats::empty();
+        assert!(
+            stats.min_values().len() >= 4,
+            "min_values must contain at least the 4-byte arity prefix"
+        );
+        assert!(
+            stats.max_values().len() >= 4,
+            "max_values must contain at least the 4-byte arity prefix"
+        );
+        assert!(
+            stats.null_counts().is_empty(),
+            "null_counts stays empty so the Java reader short-circuits to 
EMPTY_STATS"
+        );
+
+        // Round-trip through the same parser the Java reader uses (4-byte BE 
arity).
+        let min_row = BinaryRow::from_serialized_bytes(stats.min_values())
+            .expect("min_values must decode as a BinaryRow");
+        let max_row = BinaryRow::from_serialized_bytes(stats.max_values())
+            .expect("max_values must decode as a BinaryRow");
+        assert_eq!(min_row.arity(), 0);
+        assert_eq!(max_row.arity(), 0);
+    }
+}
diff --git a/crates/paimon/src/table/data_evolution_writer.rs 
b/crates/paimon/src/table/data_evolution_writer.rs
index fe9e234..f63ac24 100644
--- a/crates/paimon/src/table/data_evolution_writer.rs
+++ b/crates/paimon/src/table/data_evolution_writer.rs
@@ -670,7 +670,7 @@ mod tests {
         write_cols: Option<Vec<String>>,
     ) -> DataFileMeta {
         use crate::spec::stats::BinaryTableStats;
-        let empty_stats = BinaryTableStats::new(vec![], vec![], vec![]);
+        let empty_stats = BinaryTableStats::empty();
         DataFileMeta {
             file_name: file_name.to_string(),
             file_size: 0,
diff --git a/crates/paimon/src/table/referenced_files.rs 
b/crates/paimon/src/table/referenced_files.rs
index a170b3c..755b00f 100644
--- a/crates/paimon/src/table/referenced_files.rs
+++ b/crates/paimon/src/table/referenced_files.rs
@@ -1063,8 +1063,8 @@ mod tests {
             row_count: 100,
             min_key: vec![],
             max_key: vec![],
-            key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
-            value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+            key_stats: BinaryTableStats::empty(),
+            value_stats: BinaryTableStats::empty(),
             min_sequence_number: 0,
             max_sequence_number: 0,
             schema_id: 0,
diff --git a/crates/paimon/src/table/table_commit.rs 
b/crates/paimon/src/table/table_commit.rs
index 274dd96..27ade42 100644
--- a/crates/paimon/src/table/table_commit.rs
+++ b/crates/paimon/src/table/table_commit.rs
@@ -24,9 +24,9 @@ use crate::io::FileIO;
 use crate::spec::stats::BinaryTableStats;
 use crate::spec::FileKind;
 use crate::spec::{
-    datums_to_binary_row, extract_datum, BinaryRow, CommitKind, CoreOptions, 
DataType, Datum,
-    IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, 
ManifestFileMeta, ManifestList,
-    PartitionStatistics, Snapshot,
+    datums_to_binary_row, extract_datum, BinaryRow, BinaryRowBuilder, 
CommitKind, CoreOptions,
+    DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, 
ManifestEntry, ManifestFileMeta,
+    ManifestList, PartitionStatistics, Snapshot,
 };
 use crate::table::commit_message::CommitMessage;
 use crate::table::partition_filter::PartitionFilter;
@@ -932,7 +932,7 @@ impl TableCommit {
         let num_fields = partition_fields.len();
 
         if num_fields == 0 || entries.is_empty() {
-            return Ok(BinaryTableStats::new(vec![], vec![], vec![]));
+            return Ok(BinaryTableStats::empty());
         }
 
         let data_types: Vec<_> = partition_fields
@@ -970,11 +970,8 @@ impl TableCommit {
             }
         }
 
-        let min_datums: Vec<_> = mins.iter().zip(data_types.iter()).collect();
-        let max_datums: Vec<_> = maxs.iter().zip(data_types.iter()).collect();
-
-        let min_bytes = datums_to_binary_row(&min_datums);
-        let max_bytes = datums_to_binary_row(&max_datums);
+        let min_bytes = build_partition_stats_row(&mins, &data_types);
+        let max_bytes = build_partition_stats_row(&maxs, &data_types);
         let null_counts = null_counts.into_iter().map(Some).collect();
 
         Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts))
@@ -1127,6 +1124,20 @@ impl TableCommit {
     }
 }
 
+/// Serialized BinaryRow for partition stats; unlike `datums_to_binary_row`, 
returns a
+/// valid arity-N row even when every datum is `None` (the all-null case must 
still
+/// decode on the Java side).
+fn build_partition_stats_row(datums: &[Option<Datum>], data_types: 
&[DataType]) -> Vec<u8> {
+    let mut builder = BinaryRowBuilder::new(datums.len() as i32);
+    for (pos, (datum_opt, data_type)) in 
datums.iter().zip(data_types.iter()).enumerate() {
+        match datum_opt {
+            Some(d) => builder.write_datum(pos, d, data_type),
+            None => builder.set_null_at(pos),
+        }
+    }
+    builder.build_serialized()
+}
+
 /// Plan for resolving commit entries.
 enum CommitEntriesPlan {
     /// Caller-provided entries. May contain `FileKind::Delete` entries from 
CoW
@@ -1234,8 +1245,8 @@ mod tests {
             row_count,
             min_key: vec![],
             max_key: vec![],
-            key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
-            value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+            key_stats: BinaryTableStats::empty(),
+            value_stats: BinaryTableStats::empty(),
             min_sequence_number: 0,
             max_sequence_number: 0,
             schema_id: 0,
@@ -1837,6 +1848,75 @@ mod tests {
         );
     }
 
+    /// Regression: a non-partitioned table (e.g. `CREATE TABLE test_pk (... 
PRIMARY KEY ...)`)
+    /// must still emit `_PARTITION_STATS._MIN_VALUES`/`_MAX_VALUES` carrying 
the 4-byte BE
+    /// arity prefix; otherwise Java readers like Spark/Flink hit
+    /// `BufferUnderflowException` inside 
`SerializationUtils.deserializeBinaryRow`.
+    #[test]
+    fn compute_partition_stats_no_partition_fields_returns_decodable_empty() {
+        let file_io = test_file_io();
+        let commit = setup_commit(&file_io, "memory:/test_no_partition_stats");
+
+        let entry = ManifestEntry::new(
+            FileKind::Add,
+            vec![],
+            0,
+            1,
+            test_data_file("data-0.parquet", 1),
+            2,
+        );
+
+        let stats = commit.compute_partition_stats(&[entry]).unwrap();
+        BinaryRow::from_serialized_bytes(stats.min_values())
+            .expect("min_values must decode via the same protocol as Java's 
deserializeBinaryRow");
+        BinaryRow::from_serialized_bytes(stats.max_values())
+            .expect("max_values must decode via the same protocol as Java's 
deserializeBinaryRow");
+        assert!(stats.null_counts().is_empty());
+    }
+
+    /// Regression: when there are no entries at all, the empty stats we 
return must also
+    /// satisfy the protocol — same Java reader path runs on it.
+    #[test]
+    fn compute_partition_stats_empty_entries_returns_decodable_empty() {
+        let file_io = test_file_io();
+        let commit = setup_partitioned_commit(&file_io, 
"memory:/test_no_entries_stats");
+
+        let stats = commit.compute_partition_stats(&[]).unwrap();
+        BinaryRow::from_serialized_bytes(stats.min_values()).unwrap();
+        BinaryRow::from_serialized_bytes(stats.max_values()).unwrap();
+        assert!(stats.null_counts().is_empty());
+    }
+
+    /// Regression: partitioned table with an all-null partition row must 
still emit
+    /// decodable min/max bytes (otherwise Java hits 
`BufferUnderflowException`).
+    #[test]
+    fn 
compute_partition_stats_all_null_partition_values_returns_decodable_bytes() {
+        let file_io = test_file_io();
+        let commit = setup_partitioned_commit(&file_io, 
"memory:/test_all_null_partition_stats");
+
+        let mut builder = BinaryRowBuilder::new(1);
+        builder.set_null_at(0);
+        let null_partition = builder.build_serialized();
+
+        let entry = ManifestEntry::new(
+            FileKind::Add,
+            null_partition,
+            0,
+            1,
+            test_data_file("data-null-pt.parquet", 1),
+            2,
+        );
+
+        let stats = commit.compute_partition_stats(&[entry]).unwrap();
+        let min_row = 
BinaryRow::from_serialized_bytes(stats.min_values()).unwrap();
+        let max_row = 
BinaryRow::from_serialized_bytes(stats.max_values()).unwrap();
+        assert_eq!(min_row.arity(), 1);
+        assert_eq!(max_row.arity(), 1);
+        assert!(min_row.is_null_at(0));
+        assert!(max_row.is_null_at(0));
+        assert_eq!(stats.null_counts(), &vec![Some(1)]);
+    }
+
     /// `write_manifest_file` must aggregate min/max bucket and level across 
entries so the
     /// Java reader can prune manifests by bucket / level (see 
apache/paimon#5345). This
     /// drives a real commit so all the call-site plumbing is exercised end to 
end.

Reply via email to