This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch write_stats
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git

commit 593c6433949dbcde5f8868e1002c8f7f1c349181
Author: JingsongLi <[email protected]>
AuthorDate: Wed May 20 15:57:36 2026 +0800

    Add C++ writer stats API and tests for all 4 languages
    
    - Add num_row_groups() and get_row_group_statistics() to C++ Writer class
      in mosaic.hpp, matching the documented API
    - Restructure C++ Writer lifecycle: close() no longer frees the handle,
      allowing stats access after close (destructor frees)
    - Move ColumnStatistics struct before Writer class to resolve ordering
    - Add writer stats tests for Rust, Java, Python, and C++
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 core/src/reader_tests.rs                           | 165 +++++++++++++++
 cpp/test_mosaic.cpp                                | 235 ++++++++++++++++++++-
 include/mosaic.hpp                                 |  60 ++++--
 .../apache/paimon/mosaic/MosaicRoundtripTest.java  | 175 +++++++++++++++
 python/tests/test_mosaic.py                        | 136 ++++++++++++
 5 files changed, 756 insertions(+), 15 deletions(-)

diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index b519379..d716b08 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -3457,3 +3457,168 @@ fn test_file_open_single_io() {
         "file open should use only 1 IO when metadata fits in tail prefetch"
     );
 }
+
+#[test]
+fn test_writer_stats_basic() {
+    let columns = vec![
+        ("id".to_string(), DataType::Int32, true),
+        ("name".to_string(), DataType::Utf8, true),
+        ("score".to_string(), DataType::Float64, true),
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0, 2],
+            num_buckets: 2,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows: Vec<Vec<Value>> = (0..100)
+        .map(|i| {
+            vec![
+                Value::Integer(i * 2),
+                Value::String(format!("row_{}", i).into_bytes()),
+                Value::Double(i as f64 * 0.5),
+            ]
+        })
+        .collect();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    assert_eq!(writer.num_row_groups(), 1);
+    let stats = writer.row_group_stats(0);
+    assert_eq!(stats.len(), 2);
+
+    assert_eq!(stats[0].column_index, 0);
+    assert_eq!(stats[0].null_count, 0);
+    assert!(matches!(&stats[0].min, Some(Value::Integer(0))));
+    assert!(matches!(&stats[0].max, Some(Value::Integer(198))));
+
+    assert_eq!(stats[1].column_index, 2);
+    assert_eq!(stats[1].null_count, 0);
+    match &stats[1].min {
+        Some(Value::Double(v)) => assert!(v.abs() < 1e-10),
+        other => panic!("expected Double(0.0), got {:?}", other),
+    }
+    match &stats[1].max {
+        Some(Value::Double(v)) => assert!((*v - 49.5).abs() < 1e-10),
+        other => panic!("expected Double(49.5), got {:?}", other),
+    }
+}
+
+#[test]
+fn test_writer_stats_with_nulls() {
+    let columns = vec![
+        ("a".to_string(), DataType::Int32, true),
+        ("b".to_string(), DataType::Int64, true),
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0, 1],
+            num_buckets: 1,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows = vec![
+        vec![Value::Integer(10), Value::Null],
+        vec![Value::Null, Value::Null],
+        vec![Value::Integer(5), Value::BigInt(100)],
+        vec![Value::Integer(20), Value::BigInt(50)],
+    ];
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    assert_eq!(writer.num_row_groups(), 1);
+    let stats = writer.row_group_stats(0);
+    assert_eq!(stats.len(), 2);
+
+    assert_eq!(stats[0].column_index, 0);
+    assert_eq!(stats[0].null_count, 1);
+    assert!(matches!(&stats[0].min, Some(Value::Integer(5))));
+    assert!(matches!(&stats[0].max, Some(Value::Integer(20))));
+
+    assert_eq!(stats[1].column_index, 1);
+    assert_eq!(stats[1].null_count, 2);
+    assert!(matches!(&stats[1].min, Some(Value::BigInt(50))));
+    assert!(matches!(&stats[1].max, Some(Value::BigInt(100))));
+}
+
+#[test]
+fn test_writer_stats_all_null() {
+    let columns = vec![("x".to_string(), DataType::Int32, true)];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0],
+            num_buckets: 1,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows: Vec<Vec<Value>> = (0..10).map(|_| vec![Value::Null]).collect();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    assert_eq!(writer.num_row_groups(), 1);
+    let stats = writer.row_group_stats(0);
+    assert_eq!(stats.len(), 1);
+    assert_eq!(stats[0].null_count, 10);
+    assert!(stats[0].min.is_none());
+    assert!(stats[0].max.is_none());
+}
+
+#[test]
+fn test_writer_stats_matches_reader_stats() {
+    let columns = vec![
+        ("id".to_string(), DataType::Int32, true),
+        ("score".to_string(), DataType::Float64, true),
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0, 1],
+            num_buckets: 1,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows: Vec<Vec<Value>> = (0..50)
+        .map(|i| vec![Value::Integer(i), Value::Double(i as f64 * 2.0)])
+        .collect();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    let writer_stats = writer.row_group_stats(0);
+
+    let data = writer.output().buf.clone();
+    let len = data.len() as u64;
+    let reader = MosaicReader::new(ByteArrayInputFile::new(data), 
len).unwrap();
+    let reader_stats = reader.row_group_stats(0).unwrap();
+
+    assert_eq!(writer_stats.len(), reader_stats.len());
+    for (ws, rs) in writer_stats.iter().zip(reader_stats.iter()) {
+        assert_eq!(ws.column_index, rs.column_index);
+        assert_eq!(ws.null_count, rs.null_count);
+        assert_eq!(format!("{:?}", ws.min), format!("{:?}", rs.min));
+        assert_eq!(format!("{:?}", ws.max), format!("{:?}", rs.max));
+    }
+}
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 4de8cad..eacdfcf 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -466,6 +466,235 @@ static void test_multiple_row_groups() {
     printf("  PASS test_multiple_row_groups\n");
 }
 
+static void test_writer_stats() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32()),
+        arrow::field("name", arrow::utf8()),
+        arrow::field("score", arrow::float64()),
+    });
+
+    arrow::Int32Builder id_b;
+    arrow::StringBuilder name_b;
+    arrow::DoubleBuilder score_b;
+    for (int i = 0; i < 10; i++) {
+        assert(id_b.Append(i * 10).ok());
+        assert(name_b.Append("item_" + std::to_string(i)).ok());
+        assert(score_b.Append(i * 1.1).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 10, {
+        id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(),
+        score_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    uint32_t stats_cols[] = {0, 2};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 2;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    ASSERT_EQ(writer.num_row_groups(), 1u);
+    auto stats = writer.get_row_group_statistics(0);
+    ASSERT_TRUE(stats.size() > 0);
+
+    for (auto& s : stats) {
+        ASSERT_TRUE(s.column_index == 0 || s.column_index == 2);
+        ASSERT_EQ(s.null_count, 0u);
+        ASSERT_TRUE(s.has_min_max());
+    }
+
+    auto id_stat = std::find_if(stats.begin(), stats.end(),
+        [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; });
+    ASSERT_TRUE(id_stat != stats.end());
+    ASSERT_EQ(id_stat->min_value.size(), 4u);
+    ASSERT_EQ(id_stat->max_value.size(), 4u);
+    int32_t min_id = 0, max_id = 0;
+    memcpy(&min_id, id_stat->min_value.data(), 4);
+    memcpy(&max_id, id_stat->max_value.data(), 4);
+    min_id = __builtin_bswap32(min_id);
+    max_id = __builtin_bswap32(max_id);
+    ASSERT_EQ(min_id, 0);
+    ASSERT_EQ(max_id, 90);
+    printf("  PASS test_writer_stats\n");
+}
+
+static void test_writer_stats_with_nulls() {
+    auto schema = arrow::schema({
+        arrow::field("a", arrow::int32()),
+        arrow::field("b", arrow::int64()),
+    });
+
+    arrow::Int32Builder a_b;
+    assert(a_b.Append(10).ok());
+    assert(a_b.AppendNull().ok());
+    assert(a_b.Append(5).ok());
+    assert(a_b.Append(20).ok());
+
+    arrow::Int64Builder b_b;
+    assert(b_b.AppendNull().ok());
+    assert(b_b.AppendNull().ok());
+    assert(b_b.Append(100).ok());
+    assert(b_b.Append(50).ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 4, {
+        a_b.Finish().ValueUnsafe(), b_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 1;
+    uint32_t stats_cols[] = {0, 1};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 2;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    ASSERT_EQ(writer.num_row_groups(), 1u);
+    auto stats = writer.get_row_group_statistics(0);
+    ASSERT_EQ(stats.size(), 2u);
+
+    auto a_stat = std::find_if(stats.begin(), stats.end(),
+        [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; });
+    ASSERT_TRUE(a_stat != stats.end());
+    ASSERT_EQ(a_stat->null_count, 1u);
+    ASSERT_TRUE(a_stat->has_min_max());
+    int32_t min_a = 0, max_a = 0;
+    memcpy(&min_a, a_stat->min_value.data(), 4);
+    memcpy(&max_a, a_stat->max_value.data(), 4);
+    min_a = __builtin_bswap32(min_a);
+    max_a = __builtin_bswap32(max_a);
+    ASSERT_EQ(min_a, 5);
+    ASSERT_EQ(max_a, 20);
+
+    auto b_stat = std::find_if(stats.begin(), stats.end(),
+        [](const mosaic::ColumnStatistics& s) { return s.column_index == 1; });
+    ASSERT_TRUE(b_stat != stats.end());
+    ASSERT_EQ(b_stat->null_count, 2u);
+    ASSERT_TRUE(b_stat->has_min_max());
+    int64_t min_b = 0, max_b = 0;
+    memcpy(&min_b, b_stat->min_value.data(), 8);
+    memcpy(&max_b, b_stat->max_value.data(), 8);
+    min_b = __builtin_bswap64(min_b);
+    max_b = __builtin_bswap64(max_b);
+    ASSERT_EQ(min_b, 50);
+    ASSERT_EQ(max_b, 100);
+    printf("  PASS test_writer_stats_with_nulls\n");
+}
+
+static void test_writer_stats_all_null() {
+    auto schema = arrow::schema({
+        arrow::field("x", arrow::int32()),
+    });
+
+    arrow::Int32Builder x_b;
+    assert(x_b.AppendNull().ok());
+    assert(x_b.AppendNull().ok());
+    assert(x_b.AppendNull().ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 3, {
+        x_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 1;
+    uint32_t stats_cols[] = {0};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 1;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    ASSERT_EQ(writer.num_row_groups(), 1u);
+    auto stats = writer.get_row_group_statistics(0);
+    ASSERT_EQ(stats.size(), 1u);
+    ASSERT_EQ(stats[0].null_count, 3u);
+    ASSERT_TRUE(!stats[0].has_min_max());
+    printf("  PASS test_writer_stats_all_null\n");
+}
+
+static void test_writer_stats_matches_reader() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32()),
+        arrow::field("value", arrow::float64()),
+    });
+
+    arrow::Int32Builder id_b;
+    arrow::DoubleBuilder val_b;
+    for (int i = 0; i < 20; i++) {
+        assert(id_b.Append(i * 5).ok());
+        assert(val_b.Append(i * 2.5).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 20, {
+        id_b.Finish().ValueUnsafe(), val_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 1;
+    uint32_t stats_cols[] = {0, 1};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 2;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    auto writer_stats = writer.get_row_group_statistics(0);
+
+    MemBuffer buf;
+    buf.data = write_buf.data;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto reader_stats = reader.get_row_group_statistics(0);
+
+    ASSERT_EQ(writer_stats.size(), reader_stats.size());
+    for (size_t i = 0; i < writer_stats.size(); i++) {
+        ASSERT_EQ(writer_stats[i].column_index, reader_stats[i].column_index);
+        ASSERT_EQ(writer_stats[i].null_count, reader_stats[i].null_count);
+        ASSERT_EQ(writer_stats[i].min_value, reader_stats[i].min_value);
+        ASSERT_EQ(writer_stats[i].max_value, reader_stats[i].max_value);
+    }
+    printf("  PASS test_writer_stats_matches_reader\n");
+}
+
 int main() {
     printf("Running Mosaic C++ tests...\n");
     test_basic_roundtrip();
@@ -476,6 +705,10 @@ int main() {
     test_compression_zstd();
     test_schema_roundtrip();
     test_multiple_row_groups();
-    printf("All %d tests passed.\n", 8);
+    test_writer_stats();
+    test_writer_stats_with_nulls();
+    test_writer_stats_all_null();
+    test_writer_stats_matches_reader();
+    printf("All %d tests passed.\n", 12);
     return 0;
 }
diff --git a/include/mosaic.hpp b/include/mosaic.hpp
index 45c8fd1..68ce2e7 100644
--- a/include/mosaic.hpp
+++ b/include/mosaic.hpp
@@ -94,6 +94,16 @@ struct WriterOptions {
     uint32_t page_size_threshold = 32 * 1024;
 };
 
+// ======================== Statistics ========================
+
+struct ColumnStatistics {
+    uint32_t column_index;
+    uint64_t null_count;
+    std::vector<uint8_t> min_value;
+    std::vector<uint8_t> max_value;
+    bool has_min_max() const { return !min_value.empty(); }
+};
+
 class Writer {
 public:
     /// Construct a writer. `arrow_schema` is a pointer to an ArrowSchema 
(Arrow C Data Interface).
@@ -123,17 +133,18 @@ public:
     Writer(const Writer&) = delete;
     Writer& operator=(const Writer&) = delete;
     Writer(Writer&& other) noexcept
-        : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) {
+        : callbacks_(std::move(other.callbacks_)), handle_(other.handle_), 
closed_(other.closed_) {
         other.handle_ = nullptr;
     }
     Writer& operator=(Writer&& other) noexcept {
         if (this != &other) {
             if (handle_) {
-                mosaic_writer_close(handle_);
+                if (!closed_) mosaic_writer_close(handle_);
                 mosaic_writer_free(handle_);
             }
             callbacks_ = std::move(other.callbacks_);
             handle_ = other.handle_;
+            closed_ = other.closed_;
             other.handle_ = nullptr;
         }
         return *this;
@@ -141,7 +152,7 @@ public:
 
     ~Writer() {
         if (handle_) {
-            mosaic_writer_close(handle_);
+            if (!closed_) mosaic_writer_close(handle_);
             mosaic_writer_free(handle_);
         }
     }
@@ -160,22 +171,43 @@ public:
     }
 
     void close() {
-        check(mosaic_writer_close(handle_));
+        if (!closed_) {
+            check(mosaic_writer_close(handle_));
+            closed_ = true;
+        }
+    }
+
+    uint32_t num_row_groups() const {
+        uint32_t out = 0;
+        check(mosaic_writer_num_row_groups(handle_, &out));
+        return out;
+    }
+
+    std::vector<ColumnStatistics> get_row_group_statistics(uint32_t rg_index) 
const {
+        uint32_t n = 0;
+        check(mosaic_writer_row_group_num_stats(handle_, rg_index, &n));
+        std::vector<ColumnStatistics> result;
+        result.reserve(n);
+        for (uint32_t i = 0; i < n; i++) {
+            ColumnStatistics s;
+            check(mosaic_writer_row_group_stat_column_index(handle_, rg_index, 
i, &s.column_index));
+            check(mosaic_writer_row_group_stat_null_count(handle_, rg_index, 
i, &s.null_count));
+            size_t min_len = 0, max_len = 0;
+            const uint8_t* min_ptr = mosaic_writer_row_group_stat_min(handle_, 
rg_index, i, &min_len);
+            const uint8_t* max_ptr = mosaic_writer_row_group_stat_max(handle_, 
rg_index, i, &max_len);
+            if (min_ptr && min_len > 0)
+                s.min_value.assign(min_ptr, min_ptr + min_len);
+            if (max_ptr && max_len > 0)
+                s.max_value.assign(max_ptr, max_ptr + max_len);
+            result.push_back(std::move(s));
+        }
+        return result;
     }
 
 private:
     std::shared_ptr<OutputFile> callbacks_;
     MosaicWriterHandle* handle_ = nullptr;
-};
-
-// ======================== Statistics ========================
-
-struct ColumnStatistics {
-    uint32_t column_index;
-    uint64_t null_count;
-    std::vector<uint8_t> min_value;
-    std::vector<uint8_t> max_value;
-    bool has_min_max() const { return !min_value.empty(); }
+    bool closed_ = false;
 };
 
 // ======================== Reader ========================
diff --git 
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java 
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 523c55e..1aaf9bb 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -727,4 +727,179 @@ public class MosaicRoundtripTest {
             assertTrue(readSchema.getFields().get(1).isNullable());
         }
     }
+
+    @Test
+    public void testWriterStats() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("name", ArrowType.Utf8.INSTANCE),
+                Field.nullable("score", new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 2);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            VarCharVector names = (VarCharVector) root.getVector("name");
+            Float8Vector scores = (Float8Vector) root.getVector("score");
+
+            int n = 10;
+            ids.allocateNew(n);
+            names.allocateNew(n);
+            scores.allocateNew(n);
+
+            for (int i = 0; i < n; i++) {
+                ids.set(i, i * 10);
+                names.setSafe(i, ("item_" + i).getBytes());
+                scores.set(i, i * 1.1);
+            }
+            root.setRowCount(n);
+            writer.write(root);
+        }
+        writer.close();
+
+        assertEquals(1, writer.numRowGroups());
+        List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+        assertTrue(stats.size() > 0);
+        for (ColumnStatistics stat : stats) {
+            assertTrue(stat.getColumnIndex() == 0 || stat.getColumnIndex() == 
2);
+            assertEquals(0, stat.getNullCount());
+            assertTrue(stat.hasMinMax());
+            assertNotNull(stat.getMin());
+            assertNotNull(stat.getMax());
+        }
+
+        ColumnStatistics idStat = stats.stream().filter(s -> 
s.getColumnIndex() == 0).findFirst().get();
+        int minId = 
ByteBuffer.wrap(idStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+        int maxId = 
ByteBuffer.wrap(idStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+        assertEquals(0, minId);
+        assertEquals(90, maxId);
+    }
+
+    @Test
+    public void testWriterStatsWithNulls() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("a", new ArrowType.Int(32, true)),
+                Field.nullable("b", new ArrowType.Int(64, true))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 
1).numBuckets(1);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector aVec = (IntVector) root.getVector("a");
+            BigIntVector bVec = (BigIntVector) root.getVector("b");
+            aVec.allocateNew(4);
+            bVec.allocateNew(4);
+
+            aVec.set(0, 10);
+            aVec.setNull(1);
+            aVec.set(2, 5);
+            aVec.set(3, 20);
+
+            bVec.setNull(0);
+            bVec.setNull(1);
+            bVec.set(2, 100);
+            bVec.set(3, 50);
+
+            root.setRowCount(4);
+            writer.write(root);
+        }
+        writer.close();
+
+        assertEquals(1, writer.numRowGroups());
+        List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+        assertEquals(2, stats.size());
+
+        ColumnStatistics aStat = stats.stream().filter(s -> s.getColumnIndex() 
== 0).findFirst().get();
+        assertEquals(1, aStat.getNullCount());
+        assertTrue(aStat.hasMinMax());
+        int minA = 
ByteBuffer.wrap(aStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+        int maxA = 
ByteBuffer.wrap(aStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+        assertEquals(5, minA);
+        assertEquals(20, maxA);
+
+        ColumnStatistics bStat = stats.stream().filter(s -> s.getColumnIndex() 
== 1).findFirst().get();
+        assertEquals(2, bStat.getNullCount());
+        assertTrue(bStat.hasMinMax());
+        long minB = 
ByteBuffer.wrap(bStat.getMin()).order(ByteOrder.BIG_ENDIAN).getLong();
+        long maxB = 
ByteBuffer.wrap(bStat.getMax()).order(ByteOrder.BIG_ENDIAN).getLong();
+        assertEquals(50, minB);
+        assertEquals(100, maxB);
+    }
+
+    @Test
+    public void testWriterStatsAllNull() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("x", new ArrowType.Int(32, true))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0).numBuckets(1);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector xVec = (IntVector) root.getVector("x");
+            xVec.allocateNew(3);
+            xVec.setNull(0);
+            xVec.setNull(1);
+            xVec.setNull(2);
+            root.setRowCount(3);
+            writer.write(root);
+        }
+        writer.close();
+
+        assertEquals(1, writer.numRowGroups());
+        List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+        assertEquals(1, stats.size());
+        assertEquals(3, stats.get(0).getNullCount());
+        assertFalse(stats.get(0).hasMinMax());
+    }
+
+    @Test
+    public void testWriterStatsMatchesReaderStats() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("value", new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 
1).numBuckets(1);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            Float8Vector values = (Float8Vector) root.getVector("value");
+            int n = 20;
+            ids.allocateNew(n);
+            values.allocateNew(n);
+            for (int i = 0; i < n; i++) {
+                ids.set(i, i * 5);
+                values.set(i, i * 2.5);
+            }
+            root.setRowCount(n);
+            writer.write(root);
+        }
+        writer.close();
+
+        byte[] data = baos.toByteArray();
+        try (MosaicReader reader = readerFromBytes(data)) {
+            List<ColumnStatistics> writerStats = 
writer.getRowGroupStatistics(0);
+            List<ColumnStatistics> readerStats = 
reader.getRowGroupStatistics(0);
+
+            assertEquals(writerStats.size(), readerStats.size());
+            for (int i = 0; i < writerStats.size(); i++) {
+                ColumnStatistics ws = writerStats.get(i);
+                ColumnStatistics rs = readerStats.get(i);
+                assertEquals(ws.getColumnIndex(), rs.getColumnIndex());
+                assertEquals(ws.getNullCount(), rs.getNullCount());
+                assertEquals(ws.hasMinMax(), rs.hasMinMax());
+                assertArrayEquals(ws.getMin(), rs.getMin());
+                assertArrayEquals(ws.getMax(), rs.getMax());
+            }
+        }
+    }
 }
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 6ec64fc..aff877e 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -588,3 +588,139 @@ class TestWriter:
 
         with pytest.raises(RuntimeError, match="writer is closed"):
             writer.write(batch)
+
+    def test_writer_stats_basic(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.utf8()),
+                pa.field("score", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([i * 10 for i in range(10)], type=pa.int32()),
+                pa.array([f"item_{i}" for i in range(10)]),
+                pa.array([i * 1.1 for i in range(10)]),
+            ],
+            names=["id", "name", "score"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 2])
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        assert writer.num_row_groups >= 1
+        stats = writer.get_row_group_statistics(0)
+        assert len(stats) > 0
+        for stat in stats:
+            assert isinstance(stat, ColumnStatistics)
+            assert stat.column_index in (0, 2)
+            assert stat.null_count == 0
+            assert stat.has_min_max
+            assert stat.min is not None
+            assert stat.max is not None
+
+        id_stat = next(s for s in stats if s.column_index == 0)
+        min_id = struct.unpack(">i", id_stat.min)[0]
+        max_id = struct.unpack(">i", id_stat.max)[0]
+        assert min_id == 0
+        assert max_id == 90
+
+    def test_writer_stats_with_nulls(self):
+        pa_schema = pa.schema(
+            [pa.field("a", pa.int32()), pa.field("b", pa.int64())]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([10, None, 5, 20], type=pa.int32()),
+                pa.array([None, None, 100, 50], type=pa.int64()),
+            ],
+            names=["a", "b"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        assert writer.num_row_groups == 1
+        stats = writer.get_row_group_statistics(0)
+        assert len(stats) == 2
+
+        a_stat = next(s for s in stats if s.column_index == 0)
+        assert a_stat.null_count == 1
+        assert a_stat.has_min_max
+        min_a = struct.unpack(">i", a_stat.min)[0]
+        max_a = struct.unpack(">i", a_stat.max)[0]
+        assert min_a == 5
+        assert max_a == 20
+
+        b_stat = next(s for s in stats if s.column_index == 1)
+        assert b_stat.null_count == 2
+        assert b_stat.has_min_max
+        min_b = struct.unpack(">q", b_stat.min)[0]
+        max_b = struct.unpack(">q", b_stat.max)[0]
+        assert min_b == 50
+        assert max_b == 100
+
+    def test_writer_stats_all_null(self):
+        pa_schema = pa.schema([pa.field("x", pa.int32())])
+
+        batch = pa.record_batch(
+            [pa.array([None, None, None], type=pa.int32())], names=["x"]
+        )
+
+        opts = WriterOptions(stats_columns=[0], num_buckets=1)
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        assert writer.num_row_groups == 1
+        stats = writer.get_row_group_statistics(0)
+        assert len(stats) == 1
+        assert stats[0].null_count == 3
+        assert not stats[0].has_min_max
+        assert stats[0].min is None
+        assert stats[0].max is None
+
+    def test_writer_stats_matches_reader_stats(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("score", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(50)), type=pa.int32()),
+                pa.array([i * 2.0 for i in range(50)]),
+            ],
+            names=["id", "score"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        data = buf.getvalue()
+        with _reader_from_bytes(data) as reader:
+            for rg in range(writer.num_row_groups):
+                w_stats = writer.get_row_group_statistics(rg)
+                r_stats = reader.get_row_group_statistics(rg)
+                assert len(w_stats) == len(r_stats)
+                for ws, rs in zip(w_stats, r_stats):
+                    assert ws.column_index == rs.column_index
+                    assert ws.null_count == rs.null_count
+                    assert ws.has_min_max == rs.has_min_max
+                    assert ws.min == rs.min
+                    assert ws.max == rs.max

Reply via email to