This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b811b9  Reject unsupported types in stats_columns at writer creation 
time (#20)
1b811b9 is described below

commit 1b811b9ab73a6f38ee16518bb980bac14a409184
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 20 21:48:40 2026 +0800

    Reject unsupported types in stats_columns at writer creation time (#20)
    
    Previously unsupported types were silently filtered, making the returned
    statistics list not correspond positionally to the input stats_columns.
    Now the writer returns an error if any stats_columns index is out of
    range or refers to a type that does not support statistics. Also adds
    Javadoc/docstring comments clarifying that getRowGroupStatistics returns
    entries in the same order as stats_columns.
---
 core/src/writer.rs                                 | 114 ++++++++++++++++++---
 .../org/apache/paimon/mosaic/MosaicReader.java     |   4 +
 .../org/apache/paimon/mosaic/MosaicWriter.java     |   4 +
 python/mosaic/mosaic.py                            |  10 ++
 4 files changed, 115 insertions(+), 17 deletions(-)

diff --git a/core/src/writer.rs b/core/src/writer.rs
index a529bcf..fb4ef13 100644
--- a/core/src/writer.rs
+++ b/core/src/writer.rs
@@ -99,10 +99,14 @@ impl<S: OutputFile> MosaicWriter<S> {
     pub fn new(out: S, schema: &Schema, options: WriterOptions) -> 
io::Result<Self> {
         let mosaic_schema = MosaicSchema::from_arrow(schema, 
options.num_buckets)
             .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
-        Ok(Self::from_mosaic_schema(out, mosaic_schema, options))
+        Self::from_mosaic_schema(out, mosaic_schema, options)
     }
 
-    pub fn from_mosaic_schema(out: S, schema: MosaicSchema, options: 
WriterOptions) -> Self {
+    pub fn from_mosaic_schema(
+        out: S,
+        schema: MosaicSchema,
+        options: WriterOptions,
+    ) -> io::Result<Self> {
         let num_buckets = schema.num_buckets;
         let mut bucket_writers = Vec::with_capacity(num_buckets);
 
@@ -126,20 +130,32 @@ impl<S: OutputFile> MosaicWriter<S> {
         let stats_collector = if options.stats_columns.is_empty() {
             None
         } else {
-            let cols: Vec<(usize, arrow_schema::DataType)> = options
-                .stats_columns
-                .iter()
-                .filter(|&&idx| {
-                    idx < schema.columns.len()
-                        && 
stats::supports_stats(&schema.columns[idx].data_type)
-                })
-                .map(|&idx| (idx, schema.columns[idx].data_type.clone()))
-                .collect();
-            if cols.is_empty() {
-                None
-            } else {
-                Some(StatsCollector::new(&cols))
+            let mut cols: Vec<(usize, arrow_schema::DataType)> =
+                Vec::with_capacity(options.stats_columns.len());
+            for &idx in &options.stats_columns {
+                if idx >= schema.columns.len() {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidInput,
+                        format!(
+                            "stats_columns index {} is out of range (schema 
has {} columns)",
+                            idx,
+                            schema.columns.len()
+                        ),
+                    ));
+                }
+                let dt = &schema.columns[idx].data_type;
+                if !stats::supports_stats(dt) {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidInput,
+                        format!(
+                            "stats_columns index {} has unsupported type {:?} 
for statistics",
+                            idx, dt
+                        ),
+                    ));
+                }
+                cols.push((idx, dt.clone()));
             }
+            Some(StatsCollector::new(&cols))
         };
 
         let active_buckets: Vec<usize> = bucket_writers
@@ -154,7 +170,7 @@ impl<S: OutputFile> MosaicWriter<S> {
             0.3
         };
 
-        MosaicWriter {
+        Ok(MosaicWriter {
             out,
             schema,
             bucket_writers,
@@ -172,7 +188,7 @@ impl<S: OutputFile> MosaicWriter<S> {
             total_compressed: 0,
             stats_collector,
             closed: false,
-        }
+        })
     }
 
     pub fn schema(&self) -> &MosaicSchema {
@@ -748,4 +764,68 @@ mod tests {
         let result = writer.write_batch(&batch2);
         assert!(result.is_ok());
     }
+
+    #[test]
+    fn test_stats_columns_out_of_range() {
+        let arrow_schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int64, true),
+        ]);
+        let out = MemOutputFile::new();
+        let result = MosaicWriter::new(
+            out,
+            &arrow_schema,
+            WriterOptions {
+                num_buckets: 1,
+                stats_columns: vec![5],
+                ..Default::default()
+            },
+        );
+        assert!(result.is_err());
+        let err = result.err().expect("should be an error");
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+        assert!(err.to_string().contains("out of range"));
+    }
+
+    #[test]
+    fn test_stats_columns_unsupported_type() {
+        let arrow_schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Binary, true),
+        ]);
+        let out = MemOutputFile::new();
+        let result = MosaicWriter::new(
+            out,
+            &arrow_schema,
+            WriterOptions {
+                num_buckets: 1,
+                stats_columns: vec![0, 1],
+                ..Default::default()
+            },
+        );
+        assert!(result.is_err());
+        let err = result.err().expect("should be an error");
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+        assert!(err.to_string().contains("unsupported type"));
+    }
+
+    #[test]
+    fn test_stats_columns_valid() {
+        let arrow_schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int64, true),
+            Field::new("c", DataType::Utf8, true),
+        ]);
+        let out = MemOutputFile::new();
+        let result = MosaicWriter::new(
+            out,
+            &arrow_schema,
+            WriterOptions {
+                num_buckets: 1,
+                stats_columns: vec![0, 1, 2],
+                ..Default::default()
+            },
+        );
+        assert!(result.is_ok());
+    }
 }
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java 
b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
index f24822e..cc5ec70 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
@@ -110,6 +110,10 @@ public class MosaicReader implements AutoCloseable {
         return result;
     }
 
+    /**
+     * Returns column statistics for the given row group. The returned list 
follows the same order
+     * as the {@code statsColumns} specified in {@link WriterOptions} when the 
file was written.
+     */
     public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
         int n = NativeLib.nativeReaderRowGroupNumStats(handle, rgIndex);
         if (n < 0) {
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java 
b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
index 9448199..434b514 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
@@ -106,6 +106,10 @@ public class MosaicWriter implements AutoCloseable {
         return rowGroupStats.size();
     }
 
+    /**
+     * Returns column statistics for the given row group. The returned list 
follows the same order
+     * as the {@code statsColumns} specified in {@link WriterOptions}.
+     */
     public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
         if (rowGroupStats == null) {
             throw new IllegalStateException("writer is not closed yet");
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 66178a8..a68c086 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -195,6 +195,11 @@ class MosaicWriter:
         return len(self._row_group_stats)
 
     def get_row_group_statistics(self, rg_index):
+        """Returns column statistics for the given row group.
+
+        The returned list follows the same order as the stats_columns
+        specified in WriterOptions.
+        """
         if self._row_group_stats is None:
             raise RuntimeError("writer is not closed yet")
         return self._row_group_stats[rg_index]
@@ -365,6 +370,11 @@ class MosaicReader:
         return out.value
 
     def get_row_group_statistics(self, rg_index):
+        """Returns column statistics for the given row group.
+
+        The returned list follows the same order as the stats_columns
+        specified in WriterOptions when the file was written.
+        """
         n_out = ctypes.c_uint32(0)
         rc = lib.mosaic_reader_row_group_num_stats(self._handle, rg_index, 
ctypes.byref(n_out))
         if rc != 0:

Reply via email to