This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 1b811b9 Reject unsupported types in stats_columns at writer creation
time (#20)
1b811b9 is described below
commit 1b811b9ab73a6f38ee16518bb980bac14a409184
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 20 21:48:40 2026 +0800
Reject unsupported types in stats_columns at writer creation time (#20)
Previously unsupported types were silently filtered, making the returned
statistics list not correspond positionally to the input stats_columns.
Now the writer returns an error if any stats_columns index is out of
range or refers to a type that does not support statistics. Also adds
Javadoc/docstring comments clarifying that getRowGroupStatistics returns
entries in the same order as stats_columns.
---
core/src/writer.rs | 114 ++++++++++++++++++---
.../org/apache/paimon/mosaic/MosaicReader.java | 4 +
.../org/apache/paimon/mosaic/MosaicWriter.java | 4 +
python/mosaic/mosaic.py | 10 ++
4 files changed, 115 insertions(+), 17 deletions(-)
diff --git a/core/src/writer.rs b/core/src/writer.rs
index a529bcf..fb4ef13 100644
--- a/core/src/writer.rs
+++ b/core/src/writer.rs
@@ -99,10 +99,14 @@ impl<S: OutputFile> MosaicWriter<S> {
pub fn new(out: S, schema: &Schema, options: WriterOptions) ->
io::Result<Self> {
let mosaic_schema = MosaicSchema::from_arrow(schema,
options.num_buckets)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
- Ok(Self::from_mosaic_schema(out, mosaic_schema, options))
+ Self::from_mosaic_schema(out, mosaic_schema, options)
}
- pub fn from_mosaic_schema(out: S, schema: MosaicSchema, options:
WriterOptions) -> Self {
+ pub fn from_mosaic_schema(
+ out: S,
+ schema: MosaicSchema,
+ options: WriterOptions,
+ ) -> io::Result<Self> {
let num_buckets = schema.num_buckets;
let mut bucket_writers = Vec::with_capacity(num_buckets);
@@ -126,20 +130,32 @@ impl<S: OutputFile> MosaicWriter<S> {
let stats_collector = if options.stats_columns.is_empty() {
None
} else {
- let cols: Vec<(usize, arrow_schema::DataType)> = options
- .stats_columns
- .iter()
- .filter(|&&idx| {
- idx < schema.columns.len()
- &&
stats::supports_stats(&schema.columns[idx].data_type)
- })
- .map(|&idx| (idx, schema.columns[idx].data_type.clone()))
- .collect();
- if cols.is_empty() {
- None
- } else {
- Some(StatsCollector::new(&cols))
+ let mut cols: Vec<(usize, arrow_schema::DataType)> =
+ Vec::with_capacity(options.stats_columns.len());
+ for &idx in &options.stats_columns {
+ if idx >= schema.columns.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!(
+ "stats_columns index {} is out of range (schema
has {} columns)",
+ idx,
+ schema.columns.len()
+ ),
+ ));
+ }
+ let dt = &schema.columns[idx].data_type;
+ if !stats::supports_stats(dt) {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!(
+ "stats_columns index {} has unsupported type {:?}
for statistics",
+ idx, dt
+ ),
+ ));
+ }
+ cols.push((idx, dt.clone()));
}
+ Some(StatsCollector::new(&cols))
};
let active_buckets: Vec<usize> = bucket_writers
@@ -154,7 +170,7 @@ impl<S: OutputFile> MosaicWriter<S> {
0.3
};
- MosaicWriter {
+ Ok(MosaicWriter {
out,
schema,
bucket_writers,
@@ -172,7 +188,7 @@ impl<S: OutputFile> MosaicWriter<S> {
total_compressed: 0,
stats_collector,
closed: false,
- }
+ })
}
pub fn schema(&self) -> &MosaicSchema {
@@ -748,4 +764,68 @@ mod tests {
let result = writer.write_batch(&batch2);
assert!(result.is_ok());
}
+
+ #[test]
+ fn test_stats_columns_out_of_range() {
+ let arrow_schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int64, true),
+ ]);
+ let out = MemOutputFile::new();
+ let result = MosaicWriter::new(
+ out,
+ &arrow_schema,
+ WriterOptions {
+ num_buckets: 1,
+ stats_columns: vec![5],
+ ..Default::default()
+ },
+ );
+ assert!(result.is_err());
+ let err = result.err().expect("should be an error");
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+ assert!(err.to_string().contains("out of range"));
+ }
+
+ #[test]
+ fn test_stats_columns_unsupported_type() {
+ let arrow_schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Binary, true),
+ ]);
+ let out = MemOutputFile::new();
+ let result = MosaicWriter::new(
+ out,
+ &arrow_schema,
+ WriterOptions {
+ num_buckets: 1,
+ stats_columns: vec![0, 1],
+ ..Default::default()
+ },
+ );
+ assert!(result.is_err());
+ let err = result.err().expect("should be an error");
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+ assert!(err.to_string().contains("unsupported type"));
+ }
+
+ #[test]
+ fn test_stats_columns_valid() {
+ let arrow_schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int64, true),
+ Field::new("c", DataType::Utf8, true),
+ ]);
+ let out = MemOutputFile::new();
+ let result = MosaicWriter::new(
+ out,
+ &arrow_schema,
+ WriterOptions {
+ num_buckets: 1,
+ stats_columns: vec![0, 1, 2],
+ ..Default::default()
+ },
+ );
+ assert!(result.is_ok());
+ }
}
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
index f24822e..cc5ec70 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
@@ -110,6 +110,10 @@ public class MosaicReader implements AutoCloseable {
return result;
}
+ /**
+ * Returns column statistics for the given row group. The returned list
follows the same order
+ * as the {@code statsColumns} specified in {@link WriterOptions} when the
file was written.
+ */
public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
int n = NativeLib.nativeReaderRowGroupNumStats(handle, rgIndex);
if (n < 0) {
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
index 9448199..434b514 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
@@ -106,6 +106,10 @@ public class MosaicWriter implements AutoCloseable {
return rowGroupStats.size();
}
+ /**
+ * Returns column statistics for the given row group. The returned list
follows the same order
+ * as the {@code statsColumns} specified in {@link WriterOptions}.
+ */
public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
if (rowGroupStats == null) {
throw new IllegalStateException("writer is not closed yet");
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 66178a8..a68c086 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -195,6 +195,11 @@ class MosaicWriter:
return len(self._row_group_stats)
def get_row_group_statistics(self, rg_index):
+ """Returns column statistics for the given row group.
+
+ The returned list follows the same order as the stats_columns
+ specified in WriterOptions.
+ """
if self._row_group_stats is None:
raise RuntimeError("writer is not closed yet")
return self._row_group_stats[rg_index]
@@ -365,6 +370,11 @@ class MosaicReader:
return out.value
def get_row_group_statistics(self, rg_index):
+ """Returns column statistics for the given row group.
+
+ The returned list follows the same order as the stats_columns
+ specified in WriterOptions when the file was written.
+ """
n_out = ctypes.c_uint32(0)
rc = lib.mosaic_reader_row_group_num_stats(self._handle, rg_index,
ctypes.byref(n_out))
if rc != 0: