This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new 775f0a7  Expose ColumnStatistics from MosaicWriter for all language 
bindings (#16)
775f0a7 is described below

commit 775f0a75e3490b4b4c8ea6eae7a4d95213203167
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 20 16:10:57 2026 +0800

    Expose ColumnStatistics from MosaicWriter for all language bindings (#16)
    
    After close(), writer stats are available without re-reading the file.
    This is useful for Paimon to obtain min/max statistics immediately
    after writing for indexing and metadata purposes.
    
    Changes across all layers:
    - Rust core: add num_row_groups() and row_group_stats() to MosaicWriter
    - JNI: add 6 native functions for writer stats access
    - FFI: add C API functions for writer stats (used by Python and C++)
    - Java: MosaicWriter caches stats on close(), exposes via
      numRowGroups() and getRowGroupStatistics(rg)
    - Python: MosaicWriter collects stats on close(), exposes via
      num_row_groups property and get_row_group_statistics(rg)
    - Docs: updated all 4 language API docs (Rust, Java, Python, C++)
---
 .gitignore                                         |   4 +
 core/src/reader_tests.rs                           | 165 +++++++++++++++
 core/src/writer.rs                                 |   8 +
 cpp/test_mosaic.cpp                                | 235 ++++++++++++++++++++-
 docs/cpp-api.html                                  |  20 +-
 docs/java-api.html                                 |  20 +-
 docs/python-api.html                               |  18 +-
 docs/rust-api.html                                 |  15 +-
 ffi/src/lib.rs                                     | 169 +++++++++++++++
 include/mosaic.hpp                                 |  60 ++++--
 .../org/apache/paimon/mosaic/MosaicWriter.java     |  41 ++++
 .../java/org/apache/paimon/mosaic/NativeLib.java   |   6 +
 .../apache/paimon/mosaic/MosaicRoundtripTest.java  | 175 +++++++++++++++
 jni/src/lib.rs                                     | 143 +++++++++++++
 python/mosaic/_ffi.py                              |  20 ++
 python/mosaic/mosaic.py                            |  46 +++-
 python/tests/test_mosaic.py                        | 136 ++++++++++++
 17 files changed, 1254 insertions(+), 27 deletions(-)

diff --git a/.gitignore b/.gitignore
index 005d39a..f0b6b17 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,8 +21,12 @@ Cargo.lock
 
 # Java
 java/target/
+java/src/main/resources/native/
 *.class
 
+# C++
+cpp/build/
+
 # Python
 __pycache__/
 *.pyc
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index b519379..d716b08 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -3457,3 +3457,168 @@ fn test_file_open_single_io() {
         "file open should use only 1 IO when metadata fits in tail prefetch"
     );
 }
+
+#[test]
+fn test_writer_stats_basic() {
+    let columns = vec![
+        ("id".to_string(), DataType::Int32, true),
+        ("name".to_string(), DataType::Utf8, true),
+        ("score".to_string(), DataType::Float64, true),
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0, 2],
+            num_buckets: 2,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows: Vec<Vec<Value>> = (0..100)
+        .map(|i| {
+            vec![
+                Value::Integer(i * 2),
+                Value::String(format!("row_{}", i).into_bytes()),
+                Value::Double(i as f64 * 0.5),
+            ]
+        })
+        .collect();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    assert_eq!(writer.num_row_groups(), 1);
+    let stats = writer.row_group_stats(0);
+    assert_eq!(stats.len(), 2);
+
+    assert_eq!(stats[0].column_index, 0);
+    assert_eq!(stats[0].null_count, 0);
+    assert!(matches!(&stats[0].min, Some(Value::Integer(0))));
+    assert!(matches!(&stats[0].max, Some(Value::Integer(198))));
+
+    assert_eq!(stats[1].column_index, 2);
+    assert_eq!(stats[1].null_count, 0);
+    match &stats[1].min {
+        Some(Value::Double(v)) => assert!(v.abs() < 1e-10),
+        other => panic!("expected Double(0.0), got {:?}", other),
+    }
+    match &stats[1].max {
+        Some(Value::Double(v)) => assert!((*v - 49.5).abs() < 1e-10),
+        other => panic!("expected Double(49.5), got {:?}", other),
+    }
+}
+
+#[test]
+fn test_writer_stats_with_nulls() {
+    let columns = vec![
+        ("a".to_string(), DataType::Int32, true),
+        ("b".to_string(), DataType::Int64, true),
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0, 1],
+            num_buckets: 1,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows = vec![
+        vec![Value::Integer(10), Value::Null],
+        vec![Value::Null, Value::Null],
+        vec![Value::Integer(5), Value::BigInt(100)],
+        vec![Value::Integer(20), Value::BigInt(50)],
+    ];
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    assert_eq!(writer.num_row_groups(), 1);
+    let stats = writer.row_group_stats(0);
+    assert_eq!(stats.len(), 2);
+
+    assert_eq!(stats[0].column_index, 0);
+    assert_eq!(stats[0].null_count, 1);
+    assert!(matches!(&stats[0].min, Some(Value::Integer(5))));
+    assert!(matches!(&stats[0].max, Some(Value::Integer(20))));
+
+    assert_eq!(stats[1].column_index, 1);
+    assert_eq!(stats[1].null_count, 2);
+    assert!(matches!(&stats[1].min, Some(Value::BigInt(50))));
+    assert!(matches!(&stats[1].max, Some(Value::BigInt(100))));
+}
+
+#[test]
+fn test_writer_stats_all_null() {
+    let columns = vec![("x".to_string(), DataType::Int32, true)];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0],
+            num_buckets: 1,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows: Vec<Vec<Value>> = (0..10).map(|_| vec![Value::Null]).collect();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    assert_eq!(writer.num_row_groups(), 1);
+    let stats = writer.row_group_stats(0);
+    assert_eq!(stats.len(), 1);
+    assert_eq!(stats[0].null_count, 10);
+    assert!(stats[0].min.is_none());
+    assert!(stats[0].max.is_none());
+}
+
+#[test]
+fn test_writer_stats_matches_reader_stats() {
+    let columns = vec![
+        ("id".to_string(), DataType::Int32, true),
+        ("score".to_string(), DataType::Float64, true),
+    ];
+    let out = MemOutputFile::new();
+    let mut writer = MosaicWriter::new(
+        out,
+        &columns_to_arrow_schema(&columns),
+        WriterOptions {
+            compression: COMPRESSION_NONE,
+            stats_columns: vec![0, 1],
+            num_buckets: 1,
+            ..Default::default()
+        },
+    )
+    .unwrap();
+
+    let rows: Vec<Vec<Value>> = (0..50)
+        .map(|i| vec![Value::Integer(i), Value::Double(i as f64 * 2.0)])
+        .collect();
+    write_values(&mut writer, &columns, &rows);
+    writer.close().unwrap();
+
+    let writer_stats = writer.row_group_stats(0);
+
+    let data = writer.output().buf.clone();
+    let len = data.len() as u64;
+    let reader = MosaicReader::new(ByteArrayInputFile::new(data), 
len).unwrap();
+    let reader_stats = reader.row_group_stats(0).unwrap();
+
+    assert_eq!(writer_stats.len(), reader_stats.len());
+    for (ws, rs) in writer_stats.iter().zip(reader_stats.iter()) {
+        assert_eq!(ws.column_index, rs.column_index);
+        assert_eq!(ws.null_count, rs.null_count);
+        assert_eq!(format!("{:?}", ws.min), format!("{:?}", rs.min));
+        assert_eq!(format!("{:?}", ws.max), format!("{:?}", rs.max));
+    }
+}
diff --git a/core/src/writer.rs b/core/src/writer.rs
index fff3668..a529bcf 100644
--- a/core/src/writer.rs
+++ b/core/src/writer.rs
@@ -187,6 +187,14 @@ impl<S: OutputFile> MosaicWriter<S> {
         &mut self.out
     }
 
+    pub fn num_row_groups(&self) -> usize {
+        self.row_group_metas.len()
+    }
+
+    pub fn row_group_stats(&self, rg_index: usize) -> &[ColumnStats] {
+        &self.row_group_metas[rg_index].stats
+    }
+
     pub fn estimated_file_size(&self) -> u64 {
         let written = self.out.pos();
         let buffered_estimate = (self.current_buffered_size as f64 * 
self.compression_ratio) as u64;
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 4de8cad..eacdfcf 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -466,6 +466,235 @@ static void test_multiple_row_groups() {
     printf("  PASS test_multiple_row_groups\n");
 }
 
+static void test_writer_stats() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32()),
+        arrow::field("name", arrow::utf8()),
+        arrow::field("score", arrow::float64()),
+    });
+
+    arrow::Int32Builder id_b;
+    arrow::StringBuilder name_b;
+    arrow::DoubleBuilder score_b;
+    for (int i = 0; i < 10; i++) {
+        assert(id_b.Append(i * 10).ok());
+        assert(name_b.Append("item_" + std::to_string(i)).ok());
+        assert(score_b.Append(i * 1.1).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 10, {
+        id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(),
+        score_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    uint32_t stats_cols[] = {0, 2};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 2;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    ASSERT_EQ(writer.num_row_groups(), 1u);
+    auto stats = writer.get_row_group_statistics(0);
+    ASSERT_TRUE(stats.size() > 0);
+
+    for (auto& s : stats) {
+        ASSERT_TRUE(s.column_index == 0 || s.column_index == 2);
+        ASSERT_EQ(s.null_count, 0u);
+        ASSERT_TRUE(s.has_min_max());
+    }
+
+    auto id_stat = std::find_if(stats.begin(), stats.end(),
+        [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; });
+    ASSERT_TRUE(id_stat != stats.end());
+    ASSERT_EQ(id_stat->min_value.size(), 4u);
+    ASSERT_EQ(id_stat->max_value.size(), 4u);
+    int32_t min_id = 0, max_id = 0;
+    memcpy(&min_id, id_stat->min_value.data(), 4);
+    memcpy(&max_id, id_stat->max_value.data(), 4);
+    min_id = __builtin_bswap32(min_id);
+    max_id = __builtin_bswap32(max_id);
+    ASSERT_EQ(min_id, 0);
+    ASSERT_EQ(max_id, 90);
+    printf("  PASS test_writer_stats\n");
+}
+
+static void test_writer_stats_with_nulls() {
+    auto schema = arrow::schema({
+        arrow::field("a", arrow::int32()),
+        arrow::field("b", arrow::int64()),
+    });
+
+    arrow::Int32Builder a_b;
+    assert(a_b.Append(10).ok());
+    assert(a_b.AppendNull().ok());
+    assert(a_b.Append(5).ok());
+    assert(a_b.Append(20).ok());
+
+    arrow::Int64Builder b_b;
+    assert(b_b.AppendNull().ok());
+    assert(b_b.AppendNull().ok());
+    assert(b_b.Append(100).ok());
+    assert(b_b.Append(50).ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 4, {
+        a_b.Finish().ValueUnsafe(), b_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 1;
+    uint32_t stats_cols[] = {0, 1};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 2;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    ASSERT_EQ(writer.num_row_groups(), 1u);
+    auto stats = writer.get_row_group_statistics(0);
+    ASSERT_EQ(stats.size(), 2u);
+
+    auto a_stat = std::find_if(stats.begin(), stats.end(),
+        [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; });
+    ASSERT_TRUE(a_stat != stats.end());
+    ASSERT_EQ(a_stat->null_count, 1u);
+    ASSERT_TRUE(a_stat->has_min_max());
+    int32_t min_a = 0, max_a = 0;
+    memcpy(&min_a, a_stat->min_value.data(), 4);
+    memcpy(&max_a, a_stat->max_value.data(), 4);
+    min_a = __builtin_bswap32(min_a);
+    max_a = __builtin_bswap32(max_a);
+    ASSERT_EQ(min_a, 5);
+    ASSERT_EQ(max_a, 20);
+
+    auto b_stat = std::find_if(stats.begin(), stats.end(),
+        [](const mosaic::ColumnStatistics& s) { return s.column_index == 1; });
+    ASSERT_TRUE(b_stat != stats.end());
+    ASSERT_EQ(b_stat->null_count, 2u);
+    ASSERT_TRUE(b_stat->has_min_max());
+    int64_t min_b = 0, max_b = 0;
+    memcpy(&min_b, b_stat->min_value.data(), 8);
+    memcpy(&max_b, b_stat->max_value.data(), 8);
+    min_b = __builtin_bswap64(min_b);
+    max_b = __builtin_bswap64(max_b);
+    ASSERT_EQ(min_b, 50);
+    ASSERT_EQ(max_b, 100);
+    printf("  PASS test_writer_stats_with_nulls\n");
+}
+
+static void test_writer_stats_all_null() {
+    auto schema = arrow::schema({
+        arrow::field("x", arrow::int32()),
+    });
+
+    arrow::Int32Builder x_b;
+    assert(x_b.AppendNull().ok());
+    assert(x_b.AppendNull().ok());
+    assert(x_b.AppendNull().ok());
+
+    auto batch = arrow::RecordBatch::Make(schema, 3, {
+        x_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 1;
+    uint32_t stats_cols[] = {0};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 1;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    ASSERT_EQ(writer.num_row_groups(), 1u);
+    auto stats = writer.get_row_group_statistics(0);
+    ASSERT_EQ(stats.size(), 1u);
+    ASSERT_EQ(stats[0].null_count, 3u);
+    ASSERT_TRUE(!stats[0].has_min_max());
+    printf("  PASS test_writer_stats_all_null\n");
+}
+
+static void test_writer_stats_matches_reader() {
+    auto schema = arrow::schema({
+        arrow::field("id", arrow::int32()),
+        arrow::field("value", arrow::float64()),
+    });
+
+    arrow::Int32Builder id_b;
+    arrow::DoubleBuilder val_b;
+    for (int i = 0; i < 20; i++) {
+        assert(id_b.Append(i * 5).ok());
+        assert(val_b.Append(i * 2.5).ok());
+    }
+    auto batch = arrow::RecordBatch::Make(schema, 20, {
+        id_b.Finish().ValueUnsafe(), val_b.Finish().ValueUnsafe(),
+    });
+
+    mosaic::WriterOptions opts;
+    opts.num_buckets = 1;
+    uint32_t stats_cols[] = {0, 1};
+    opts.stats_columns = stats_cols;
+    opts.num_stats_columns = 2;
+
+    MemBuffer write_buf;
+    struct ArrowSchema c_schema;
+    auto st = arrow::ExportSchema(*schema, &c_schema);
+    assert(st.ok());
+    mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+    struct ArrowArray c_array;
+    struct ArrowSchema c_batch_schema;
+    st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+    assert(st.ok());
+    writer.write(&c_array, &c_batch_schema);
+    writer.close();
+
+    auto writer_stats = writer.get_row_group_statistics(0);
+
+    MemBuffer buf;
+    buf.data = write_buf.data;
+    auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+    auto reader_stats = reader.get_row_group_statistics(0);
+
+    ASSERT_EQ(writer_stats.size(), reader_stats.size());
+    for (size_t i = 0; i < writer_stats.size(); i++) {
+        ASSERT_EQ(writer_stats[i].column_index, reader_stats[i].column_index);
+        ASSERT_EQ(writer_stats[i].null_count, reader_stats[i].null_count);
+        ASSERT_EQ(writer_stats[i].min_value, reader_stats[i].min_value);
+        ASSERT_EQ(writer_stats[i].max_value, reader_stats[i].max_value);
+    }
+    printf("  PASS test_writer_stats_matches_reader\n");
+}
+
 int main() {
     printf("Running Mosaic C++ tests...\n");
     test_basic_roundtrip();
@@ -476,6 +705,10 @@ int main() {
     test_compression_zstd();
     test_schema_roundtrip();
     test_multiple_row_groups();
-    printf("All %d tests passed.\n", 8);
+    test_writer_stats();
+    test_writer_stats_with_nulls();
+    test_writer_stats_all_null();
+    test_writer_stats_matches_reader();
+    printf("All %d tests passed.\n", 12);
     return 0;
 }
diff --git a/docs/cpp-api.html b/docs/cpp-api.html
index a5f4294..79f2203 100644
--- a/docs/cpp-api.html
+++ b/docs/cpp-api.html
@@ -175,6 +175,8 @@ g++ -std=c++17 -I include/ example.cpp \
                     <tr><td><code>write(&amp;ffi_array, 
&amp;ffi_schema)</code></td><td><code>void</code></td><td>Write an Arrow 
RecordBatch via C Data Interface</td></tr>
                     
<tr><td><code>estimated_file_size()</code></td><td><code>int64_t</code></td><td>Estimated
 output file size in bytes (for file rolling)</td></tr>
                     
<tr><td><code>close()</code></td><td><code>void</code></td><td>Flush remaining 
data and write footer</td></tr>
+                    
<tr><td><code>num_row_groups()</code></td><td><code>uint32_t</code></td><td>Number
 of row groups written (available after close)</td></tr>
+                    
<tr><td><code>get_row_group_statistics(rg)</code></td><td><code>vector&lt;ColumnStatistics&gt;</code></td><td>Column
 statistics for a row group (available after close)</td></tr>
                 </tbody>
             </table>
 
@@ -262,8 +264,8 @@ reader.read_row_group(<span class="num">0</span>, 
projected, <span class="num">2
 
             <h3>Column Statistics (Filter Pushdown)</h3>
             <p>
-                When stats columns are configured during writing, the reader 
can access per-row-group
-                min/max statistics to skip row groups that don't match a 
filter predicate:
+                When stats columns are configured during writing, statistics 
are available both
+                from the writer (after close) and from the reader:
             </p>
 <pre><code><span class="cmt">// Writing with stats (arrow_schema is an 
ArrowSchema* from C Data Interface)</span>
 <span class="kw">uint32_t</span> stats_cols[] = { <span class="num">0</span>, 
<span class="num">2</span> };
@@ -272,7 +274,19 @@ opts.compression = <span class="num">1</span>;
 opts.stats_columns = stats_cols;
 opts.num_stats_columns = <span class="num">2</span>;
 <span class="ty">mosaic</span>::<span class="ty">Writer</span> 
writer(std::move(cbs), arrow_schema, opts);</code></pre>
-<pre><code><span class="cmt">// Reading stats</span>
+<pre><code><span class="cmt">// Get stats directly from the writer after 
close</span>
+writer.close();
+<span class="kw">for</span> (<span class="kw">uint32_t</span> rg = <span 
class="num">0</span>; rg &lt; writer.num_row_groups(); rg++) {
+    <span class="kw">auto</span> stats = writer.get_row_group_statistics(rg);
+    <span class="kw">for</span> (<span class="kw">const auto</span>&amp; stat 
: stats) {
+        <span class="kw">uint32_t</span> col_idx = stat.column_index;
+        <span class="kw">uint64_t</span> null_count = stat.null_count;
+        <span class="kw">if</span> (stat.has_min_max()) {
+            <span class="cmt">// stat.min_value / stat.max_value are 
std::vector&lt;uint8_t&gt;</span>
+        }
+    }
+}</code></pre>
+<pre><code><span class="cmt">// Or read stats from the reader</span>
 <span class="kw">for</span> (<span class="kw">uint32_t</span> rg = <span 
class="num">0</span>; rg &lt; reader.num_row_groups(); rg++) {
     <span class="kw">auto</span> stats = reader.get_row_group_statistics(rg);
     <span class="kw">for</span> (<span class="kw">const auto</span>&amp; stat 
: stats) {
diff --git a/docs/java-api.html b/docs/java-api.html
index edc4323..8e3446c 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -180,6 +180,8 @@
                     
<tr><td><code>write(VectorSchemaRoot)</code></td><td><code>void</code></td><td>Write
 an Arrow batch (zero-copy via C Data Interface)</td></tr>
                     
<tr><td><code>estimatedFileSize()</code></td><td><code>long</code></td><td>Estimated
 output file size in bytes (for file rolling)</td></tr>
                     
<tr><td><code>close()</code></td><td><code>void</code></td><td>Flush remaining 
data and write footer</td></tr>
+                    
<tr><td><code>numRowGroups()</code></td><td><code>int</code></td><td>Number of 
row groups written (available after close)</td></tr>
+                    
<tr><td><code>getRowGroupStatistics(rg)</code></td><td><code>List&lt;ColumnStatistics&gt;</code></td><td>Column
 statistics for a row group (available after close)</td></tr>
                 </tbody>
             </table>
 
@@ -262,8 +264,22 @@
 
             <h3>Column Statistics (Filter Pushdown)</h3>
             <p>
-                When stats columns are configured during writing, the reader 
can access per-row-group
-                min/max statistics to skip row groups that don't match a 
filter predicate:
+                When stats columns are configured during writing, statistics 
are available both
+                from the writer (after close) and from the reader. This allows 
you to obtain
+                min/max statistics immediately after writing without 
re-reading the file:
+            </p>
+<pre><code><span class="cmt">// Get stats directly from the writer after 
close</span>
+<span class="ty">MosaicWriter</span> writer = <span class="kw">new</span> 
<span class="ty">MosaicWriter</span>(baos, arrowSchema, opts, allocator);
+writer.write(root);
+writer.close();
+
+<span class="kw">for</span> (<span class="kw">int</span> rg = <span 
class="num">0</span>; rg &lt; writer.numRowGroups(); rg++) {
+    <span class="ty">List</span>&lt;<span 
class="ty">ColumnStatistics</span>&gt; stats = writer.getRowGroupStatistics(rg);
+    <span class="cmt">// use stats for indexing, metadata, etc.</span>
+}</code></pre>
+            <p>
+                The reader can also access per-row-group statistics to skip 
row groups
+                that don't match a filter predicate:
             </p>
 <pre><code><span class="ty">WriterOptions</span> opts = <span 
class="kw">new</span> <span class="ty">WriterOptions</span>()
     .statsColumns(<span class="num">0</span>, <span class="num">2</span>);  
<span class="cmt">// build stats for columns 0 and 2</span></code></pre>
diff --git a/docs/python-api.html b/docs/python-api.html
index 29f4631..b66761b 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -145,12 +145,14 @@ data = buf.getvalue()</code></pre>
             <h3>Writer Methods</h3>
             <table>
                 <thead>
-                    <tr><th>Method</th><th>Return</th><th>Description</th></tr>
+                    <tr><th>Method / 
Property</th><th>Return</th><th>Description</th></tr>
                 </thead>
                 <tbody>
                     
<tr><td><code>write(batch)</code></td><td><code>None</code></td><td>Write a 
<code>pyarrow.RecordBatch</code> or <code>pyarrow.Table</code></td></tr>
                     
<tr><td><code>estimated_file_size()</code></td><td><code>int</code></td><td>Estimated
 output file size in bytes (for file rolling)</td></tr>
                     
<tr><td><code>close()</code></td><td><code>None</code></td><td>Flush remaining 
data and write footer</td></tr>
+                    
<tr><td><code>num_row_groups</code></td><td><code>int</code></td><td>Number of 
row groups written (available after close)</td></tr>
+                    
<tr><td><code>get_row_group_statistics(rg)</code></td><td><code>list[ColumnStatistics]</code></td><td>Column
 statistics for a row group (available after close)</td></tr>
                 </tbody>
             </table>
 
@@ -216,11 +218,19 @@ batch = reader.read_row_group(rg, columns=projected)
 
             <h3>Column Statistics (Filter Pushdown)</h3>
             <p>
-                When stats columns are configured during writing, the reader 
can access per-row-group
-                min/max statistics to skip row groups that don't match a 
filter predicate:
+                When stats columns are configured during writing, statistics 
are available both
+                from the writer (after close) and from the reader:
             </p>
 <pre><code>opts = WriterOptions(stats_columns=[<span class="num">0</span>, 
<span class="num">2</span>])  <span class="cmt"># build stats for columns 0 and 
2</span></code></pre>
-<pre><code><span class="cmt"># Reading stats</span>
+<pre><code><span class="cmt"># Get stats directly from the writer after 
close</span>
+<span class="kw">with</span> MosaicWriter(buf, pa_schema, opts) <span 
class="kw">as</span> writer:
+    writer.write(batch)
+
+<span class="kw">for</span> rg <span class="kw">in</span> 
range(writer.num_row_groups):
+    <span class="kw">for</span> stat <span class="kw">in</span> 
writer.get_row_group_statistics(rg):
+        col_idx = stat.column_index
+        null_count = stat.null_count</code></pre>
+<pre><code><span class="cmt"># Or read stats from the reader</span>
 <span class="kw">for</span> rg <span class="kw">in</span> 
range(reader.num_row_groups):
     <span class="kw">for</span> stat <span class="kw">in</span> 
reader.get_row_group_statistics(rg):
         col_idx = stat.column_index
diff --git a/docs/rust-api.html b/docs/rust-api.html
index 81dcd13..bd82f7f 100644
--- a/docs/rust-api.html
+++ b/docs/rust-api.html
@@ -119,6 +119,8 @@ writer.close().unwrap();
                     
<tr><td><code>write_batch(&amp;RecordBatch)</code></td><td><code>io::Result&lt;()&gt;</code></td><td>Write
 an Arrow RecordBatch</td></tr>
                     
<tr><td><code>estimated_file_size()</code></td><td><code>u64</code></td><td>Estimated
 output file size in bytes (for file rolling)</td></tr>
                     
<tr><td><code>close()</code></td><td><code>io::Result&lt;()&gt;</code></td><td>Flush
 remaining data and write footer</td></tr>
+                    
<tr><td><code>num_row_groups()</code></td><td><code>usize</code></td><td>Number 
of row groups written (available after close)</td></tr>
+                    
<tr><td><code>row_group_stats(rg_index)</code></td><td><code>&amp;[ColumnStats]</code></td><td>Column
 statistics for a row group (available after close)</td></tr>
                 </tbody>
             </table>
 
@@ -225,7 +227,18 @@ writer.close().unwrap();
     ..Default::default()
 })?;</code></pre>
 
-            <h3>Reading Stats</h3>
+            <h3>Getting Stats from Writer (after close)</h3>
+<pre><code>writer.close()?;
+
+<span class="kw">for</span> rg_idx <span class="kw">in</span> <span 
class="num">0</span>..writer.num_row_groups() {
+    <span class="kw">let</span> stats = writer.row_group_stats(rg_idx);
+    <span class="kw">for</span> stat <span class="kw">in</span> stats {
+        println!(<span class="str">"col={} nulls={} min={:?} max={:?}"</span>,
+            stat.column_index, stat.null_count, stat.min, stat.max);
+    }
+}</code></pre>
+
+            <h3>Reading Stats from Reader</h3>
 <pre><code><span class="kw">for</span> rg_idx <span class="kw">in</span> <span 
class="num">0</span>..reader.num_row_groups() {
     <span class="kw">let</span> stats = reader.row_group_stats(rg_idx)?;
     <span class="kw">for</span> stat <span class="kw">in</span> stats {
diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs
index 7e1cd8f..e2a708a 100644
--- a/ffi/src/lib.rs
+++ b/ffi/src/lib.rs
@@ -255,6 +255,175 @@ pub unsafe extern "C" fn 
mosaic_writer_estimated_file_size(
     0
 }
 
+// ======================== Writer Stats ========================
+
+/// Get the number of row groups in a closed writer.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_num_row_groups(
+    handle: *const MosaicWriterHandle,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    *out = (&*handle).inner.num_row_groups() as u32;
+    0
+}
+
+/// Get number of stats entries for a writer row group.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_num_stats(
+    handle: *const MosaicWriterHandle,
+    rg_index: u32,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    let h = &*handle;
+    let rg = rg_index as usize;
+    if rg >= h.inner.num_row_groups() {
+        set_error("rg_index out of range".into());
+        return -1;
+    }
+    *out = h.inner.row_group_stats(rg).len() as u32;
+    0
+}
+
+/// Get the column index for a writer stats entry.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_column_index(
+    handle: *const MosaicWriterHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out: *mut u32,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    let h = &*handle;
+    let rg = rg_index as usize;
+    if rg >= h.inner.num_row_groups() {
+        set_error("rg_index out of range".into());
+        return -1;
+    }
+    let stats = h.inner.row_group_stats(rg);
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        set_error("stat_index out of range".into());
+        return -1;
+    }
+    *out = stats[idx].column_index as u32;
+    0
+}
+
+/// Get the null count for a writer stats entry.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_null_count(
+    handle: *const MosaicWriterHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out: *mut u64,
+) -> c_int {
+    if handle.is_null() || out.is_null() {
+        set_error("null pointer".into());
+        return -1;
+    }
+    let h = &*handle;
+    let rg = rg_index as usize;
+    if rg >= h.inner.num_row_groups() {
+        set_error("rg_index out of range".into());
+        return -1;
+    }
+    let stats = h.inner.row_group_stats(rg);
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        set_error("stat_index out of range".into());
+        return -1;
+    }
+    *out = stats[idx].null_count as u64;
+    0
+}
+
+thread_local! {
+    static WRITER_STAT_MIN_BUF: RefCell<Vec<u8>> = const { 
RefCell::new(Vec::new()) };
+    static WRITER_STAT_MAX_BUF: RefCell<Vec<u8>> = const { 
RefCell::new(Vec::new()) };
+}
+
+/// Get the min value for a writer stats entry as raw bytes.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_min(
+    handle: *const MosaicWriterHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out_len: *mut usize,
+) -> *const u8 {
+    writer_stat_value_ptr(handle, rg_index, stat_index, out_len, true)
+}
+
+/// Get the max value for a writer stats entry as raw bytes.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_max(
+    handle: *const MosaicWriterHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out_len: *mut usize,
+) -> *const u8 {
+    writer_stat_value_ptr(handle, rg_index, stat_index, out_len, false)
+}
+
+unsafe fn writer_stat_value_ptr(
+    handle: *const MosaicWriterHandle,
+    rg_index: u32,
+    stat_index: u32,
+    out_len: *mut usize,
+    is_min: bool,
+) -> *const u8 {
+    if handle.is_null() || out_len.is_null() {
+        return ptr::null();
+    }
+    let h = &*handle;
+    let rg = rg_index as usize;
+    if rg >= h.inner.num_row_groups() {
+        *out_len = 0;
+        return ptr::null();
+    }
+    let stats = h.inner.row_group_stats(rg);
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        *out_len = 0;
+        return ptr::null();
+    }
+    let value = if is_min {
+        &stats[idx].min
+    } else {
+        &stats[idx].max
+    };
+    match value {
+        Some(v) => {
+            let bytes = v.to_be_bytes();
+            let buf_ref = if is_min {
+                &WRITER_STAT_MIN_BUF
+            } else {
+                &WRITER_STAT_MAX_BUF
+            };
+            buf_ref.with(|buf| {
+                let mut b = buf.borrow_mut();
+                *b = bytes;
+                *out_len = b.len();
+                b.as_ptr()
+            })
+        }
+        None => {
+            *out_len = 0;
+            ptr::null()
+        }
+    }
+}
+
 /// Write an Arrow RecordBatch to the writer via the Arrow C Data Interface.
 /// The caller provides ArrowArray and ArrowSchema pointers that represent the 
batch.
 /// Ownership of both structs transfers to the callee; the caller's structs 
are zeroed.
diff --git a/include/mosaic.hpp b/include/mosaic.hpp
index 45c8fd1..68ce2e7 100644
--- a/include/mosaic.hpp
+++ b/include/mosaic.hpp
@@ -94,6 +94,16 @@ struct WriterOptions {
     uint32_t page_size_threshold = 32 * 1024;
 };
 
+// ======================== Statistics ========================
+
+struct ColumnStatistics {
+    uint32_t column_index;
+    uint64_t null_count;
+    std::vector<uint8_t> min_value;
+    std::vector<uint8_t> max_value;
+    bool has_min_max() const { return !min_value.empty(); }
+};
+
 class Writer {
 public:
     /// Construct a writer. `arrow_schema` is a pointer to an ArrowSchema 
(Arrow C Data Interface).
@@ -123,17 +133,18 @@ public:
     Writer(const Writer&) = delete;
     Writer& operator=(const Writer&) = delete;
     Writer(Writer&& other) noexcept
-        : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) {
+        : callbacks_(std::move(other.callbacks_)), handle_(other.handle_), 
closed_(other.closed_) {
         other.handle_ = nullptr;
     }
     Writer& operator=(Writer&& other) noexcept {
         if (this != &other) {
             if (handle_) {
-                mosaic_writer_close(handle_);
+                if (!closed_) mosaic_writer_close(handle_);
                 mosaic_writer_free(handle_);
             }
             callbacks_ = std::move(other.callbacks_);
             handle_ = other.handle_;
+            closed_ = other.closed_;
             other.handle_ = nullptr;
         }
         return *this;
@@ -141,7 +152,7 @@ public:
 
     ~Writer() {
         if (handle_) {
-            mosaic_writer_close(handle_);
+            if (!closed_) mosaic_writer_close(handle_);
             mosaic_writer_free(handle_);
         }
     }
@@ -160,22 +171,43 @@ public:
     }
 
     void close() {
-        check(mosaic_writer_close(handle_));
+        if (!closed_) {
+            check(mosaic_writer_close(handle_));
+            closed_ = true;
+        }
+    }
+
+    uint32_t num_row_groups() const {
+        uint32_t out = 0;
+        check(mosaic_writer_num_row_groups(handle_, &out));
+        return out;
+    }
+
+    std::vector<ColumnStatistics> get_row_group_statistics(uint32_t rg_index) 
const {
+        uint32_t n = 0;
+        check(mosaic_writer_row_group_num_stats(handle_, rg_index, &n));
+        std::vector<ColumnStatistics> result;
+        result.reserve(n);
+        for (uint32_t i = 0; i < n; i++) {
+            ColumnStatistics s;
+            check(mosaic_writer_row_group_stat_column_index(handle_, rg_index, 
i, &s.column_index));
+            check(mosaic_writer_row_group_stat_null_count(handle_, rg_index, 
i, &s.null_count));
+            size_t min_len = 0, max_len = 0;
+            const uint8_t* min_ptr = mosaic_writer_row_group_stat_min(handle_, 
rg_index, i, &min_len);
+            const uint8_t* max_ptr = mosaic_writer_row_group_stat_max(handle_, 
rg_index, i, &max_len);
+            if (min_ptr && min_len > 0)
+                s.min_value.assign(min_ptr, min_ptr + min_len);
+            if (max_ptr && max_len > 0)
+                s.max_value.assign(max_ptr, max_ptr + max_len);
+            result.push_back(std::move(s));
+        }
+        return result;
     }
 
 private:
     std::shared_ptr<OutputFile> callbacks_;
     MosaicWriterHandle* handle_ = nullptr;
-};
-
-// ======================== Statistics ========================
-
-struct ColumnStatistics {
-    uint32_t column_index;
-    uint64_t null_count;
-    std::vector<uint8_t> min_value;
-    std::vector<uint8_t> max_value;
-    bool has_min_max() const { return !min_value.empty(); }
+    bool closed_ = false;
 };
 
 // ======================== Reader ========================
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java 
b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
index 1fa49fe..9448199 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
@@ -20,6 +20,9 @@
 package org.apache.paimon.mosaic;
 
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.arrow.c.ArrowArray;
 import org.apache.arrow.c.ArrowSchema;
@@ -33,6 +36,7 @@ public class MosaicWriter implements AutoCloseable {
     private long handle;
     private boolean closed;
     private final BufferAllocator allocator;
+    private List<List<ColumnStatistics>> rowGroupStats;
 
     public MosaicWriter(OutputStream outputStream, Schema arrowSchema, 
BufferAllocator allocator) {
         this(outputStream, arrowSchema, new WriterOptions(), allocator);
@@ -95,16 +99,53 @@ public class MosaicWriter implements AutoCloseable {
         return NativeLib.nativeWriterEstimatedSize(handle);
     }
 
+    public int numRowGroups() {
+        if (rowGroupStats == null) {
+            throw new IllegalStateException("writer is not closed yet");
+        }
+        return rowGroupStats.size();
+    }
+
+    public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
+        if (rowGroupStats == null) {
+            throw new IllegalStateException("writer is not closed yet");
+        }
+        return rowGroupStats.get(rgIndex);
+    }
+
     @Override
     public void close() {
         if (!closed && handle != 0) {
             closed = true;
             try {
                 NativeLib.nativeWriterClose(handle);
+                collectStatistics();
             } finally {
                 NativeLib.nativeWriterFree(handle);
                 handle = 0;
             }
         }
     }
+
+    private void collectStatistics() {
+        int numRg = NativeLib.nativeWriterNumRowGroups(handle);
+        List<List<ColumnStatistics>> allStats = new ArrayList<>(numRg);
+        for (int rg = 0; rg < numRg; rg++) {
+            int n = NativeLib.nativeWriterRowGroupNumStats(handle, rg);
+            if (n <= 0) {
+                allStats.add(Collections.emptyList());
+                continue;
+            }
+            List<ColumnStatistics> rgStats = new ArrayList<>(n);
+            for (int i = 0; i < n; i++) {
+                rgStats.add(new ColumnStatistics(
+                        NativeLib.nativeWriterRowGroupStatColumnIndex(handle, 
rg, i),
+                        NativeLib.nativeWriterRowGroupStatNullCount(handle, 
rg, i),
+                        NativeLib.nativeWriterRowGroupStatMin(handle, rg, i),
+                        NativeLib.nativeWriterRowGroupStatMax(handle, rg, i)));
+            }
+            allStats.add(Collections.unmodifiableList(rgStats));
+        }
+        this.rowGroupStats = Collections.unmodifiableList(allStats);
+    }
 }
diff --git a/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java 
b/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
index d09cd9b..695173f 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
@@ -110,6 +110,12 @@ final class NativeLib {
     static native void nativeWriterFree(long handle);
     static native long nativeWriterEstimatedSize(long handle);
     static native void nativeWriterWriteBatch(long writerHandle, long 
arrayAddr, long schemaAddr);
+    static native int nativeWriterNumRowGroups(long handle);
+    static native int nativeWriterRowGroupNumStats(long handle, int rgIndex);
+    static native int nativeWriterRowGroupStatColumnIndex(long handle, int 
rgIndex, int statIndex);
+    static native long nativeWriterRowGroupStatNullCount(long handle, int 
rgIndex, int statIndex);
+    static native byte[] nativeWriterRowGroupStatMin(long handle, int rgIndex, 
int statIndex);
+    static native byte[] nativeWriterRowGroupStatMax(long handle, int rgIndex, 
int statIndex);
 
     // Reader
     static native long nativeReaderOpen(Object inputFile, long fileLength);
diff --git 
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java 
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 523c55e..1aaf9bb 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -727,4 +727,179 @@ public class MosaicRoundtripTest {
             assertTrue(readSchema.getFields().get(1).isNullable());
         }
     }
+
+    @Test
+    public void testWriterStats() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("name", ArrowType.Utf8.INSTANCE),
+                Field.nullable("score", new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 2);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            VarCharVector names = (VarCharVector) root.getVector("name");
+            Float8Vector scores = (Float8Vector) root.getVector("score");
+
+            int n = 10;
+            ids.allocateNew(n);
+            names.allocateNew(n);
+            scores.allocateNew(n);
+
+            for (int i = 0; i < n; i++) {
+                ids.set(i, i * 10);
+                names.setSafe(i, ("item_" + i).getBytes());
+                scores.set(i, i * 1.1);
+            }
+            root.setRowCount(n);
+            writer.write(root);
+        }
+        writer.close();
+
+        assertEquals(1, writer.numRowGroups());
+        List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+        assertTrue(stats.size() > 0);
+        for (ColumnStatistics stat : stats) {
+            assertTrue(stat.getColumnIndex() == 0 || stat.getColumnIndex() == 
2);
+            assertEquals(0, stat.getNullCount());
+            assertTrue(stat.hasMinMax());
+            assertNotNull(stat.getMin());
+            assertNotNull(stat.getMax());
+        }
+
+        ColumnStatistics idStat = stats.stream().filter(s -> 
s.getColumnIndex() == 0).findFirst().get();
+        int minId = 
ByteBuffer.wrap(idStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+        int maxId = 
ByteBuffer.wrap(idStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+        assertEquals(0, minId);
+        assertEquals(90, maxId);
+    }
+
+    @Test
+    public void testWriterStatsWithNulls() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("a", new ArrowType.Int(32, true)),
+                Field.nullable("b", new ArrowType.Int(64, true))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 
1).numBuckets(1);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector aVec = (IntVector) root.getVector("a");
+            BigIntVector bVec = (BigIntVector) root.getVector("b");
+            aVec.allocateNew(4);
+            bVec.allocateNew(4);
+
+            aVec.set(0, 10);
+            aVec.setNull(1);
+            aVec.set(2, 5);
+            aVec.set(3, 20);
+
+            bVec.setNull(0);
+            bVec.setNull(1);
+            bVec.set(2, 100);
+            bVec.set(3, 50);
+
+            root.setRowCount(4);
+            writer.write(root);
+        }
+        writer.close();
+
+        assertEquals(1, writer.numRowGroups());
+        List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+        assertEquals(2, stats.size());
+
+        ColumnStatistics aStat = stats.stream().filter(s -> s.getColumnIndex() 
== 0).findFirst().get();
+        assertEquals(1, aStat.getNullCount());
+        assertTrue(aStat.hasMinMax());
+        int minA = 
ByteBuffer.wrap(aStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+        int maxA = 
ByteBuffer.wrap(aStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+        assertEquals(5, minA);
+        assertEquals(20, maxA);
+
+        ColumnStatistics bStat = stats.stream().filter(s -> s.getColumnIndex() 
== 1).findFirst().get();
+        assertEquals(2, bStat.getNullCount());
+        assertTrue(bStat.hasMinMax());
+        long minB = 
ByteBuffer.wrap(bStat.getMin()).order(ByteOrder.BIG_ENDIAN).getLong();
+        long maxB = 
ByteBuffer.wrap(bStat.getMax()).order(ByteOrder.BIG_ENDIAN).getLong();
+        assertEquals(50, minB);
+        assertEquals(100, maxB);
+    }
+
+    @Test
+    public void testWriterStatsAllNull() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("x", new ArrowType.Int(32, true))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0).numBuckets(1);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector xVec = (IntVector) root.getVector("x");
+            xVec.allocateNew(3);
+            xVec.setNull(0);
+            xVec.setNull(1);
+            xVec.setNull(2);
+            root.setRowCount(3);
+            writer.write(root);
+        }
+        writer.close();
+
+        assertEquals(1, writer.numRowGroups());
+        List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+        assertEquals(1, stats.size());
+        assertEquals(3, stats.get(0).getNullCount());
+        assertFalse(stats.get(0).hasMinMax());
+    }
+
+    @Test
+    public void testWriterStatsMatchesReaderStats() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("value", new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 
1).numBuckets(1);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, 
allocator);
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            Float8Vector values = (Float8Vector) root.getVector("value");
+            int n = 20;
+            ids.allocateNew(n);
+            values.allocateNew(n);
+            for (int i = 0; i < n; i++) {
+                ids.set(i, i * 5);
+                values.set(i, i * 2.5);
+            }
+            root.setRowCount(n);
+            writer.write(root);
+        }
+        writer.close();
+
+        byte[] data = baos.toByteArray();
+        try (MosaicReader reader = readerFromBytes(data)) {
+            List<ColumnStatistics> writerStats = 
writer.getRowGroupStatistics(0);
+            List<ColumnStatistics> readerStats = 
reader.getRowGroupStatistics(0);
+
+            assertEquals(writerStats.size(), readerStats.size());
+            for (int i = 0; i < writerStats.size(); i++) {
+                ColumnStatistics ws = writerStats.get(i);
+                ColumnStatistics rs = readerStats.get(i);
+                assertEquals(ws.getColumnIndex(), rs.getColumnIndex());
+                assertEquals(ws.getNullCount(), rs.getNullCount());
+                assertEquals(ws.hasMinMax(), rs.hasMinMax());
+                assertArrayEquals(ws.getMin(), rs.getMin());
+                assertArrayEquals(ws.getMax(), rs.getMax());
+            }
+        }
+    }
 }
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
index e32ae1c..00d7f6e 100644
--- a/jni/src/lib.rs
+++ b/jni/src/lib.rs
@@ -369,6 +369,149 @@ pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterEstim
     writer.inner.estimated_file_size() as jlong
 }
 
+// ======================== Writer Stats ========================
+
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterNumRowGroups(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) -> jint {
+    if handle == 0 {
+        return 0;
+    }
+    let writer = unsafe { &*(handle as *const WriterHandle) };
+    writer.inner.num_row_groups() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupNumStats(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+) -> jint {
+    if handle == 0 {
+        return 0;
+    }
+    let writer = unsafe { &*(handle as *const WriterHandle) };
+    let rg = rg_index as usize;
+    if rg >= writer.inner.num_row_groups() {
+        return -1;
+    }
+    writer.inner.row_group_stats(rg).len() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatColumnIndex(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> jint {
+    if handle == 0 {
+        return -1;
+    }
+    let writer = unsafe { &*(handle as *const WriterHandle) };
+    let rg = rg_index as usize;
+    if rg >= writer.inner.num_row_groups() {
+        return -1;
+    }
+    let stats = writer.inner.row_group_stats(rg);
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return -1;
+    }
+    stats[idx].column_index as jint
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatNullCount(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> jlong {
+    if handle == 0 {
+        return 0;
+    }
+    let writer = unsafe { &*(handle as *const WriterHandle) };
+    let rg = rg_index as usize;
+    if rg >= writer.inner.num_row_groups() {
+        return 0;
+    }
+    let stats = writer.inner.row_group_stats(rg);
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return 0;
+    }
+    stats[idx].null_count as jlong
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatMin<'a>(
+    env: JNIEnv<'a>,
+    _class: JClass<'a>,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> JByteArray<'a> {
+    if handle == 0 {
+        return JByteArray::default();
+    }
+    let writer = unsafe { &*(handle as *const WriterHandle) };
+    let rg = rg_index as usize;
+    if rg >= writer.inner.num_row_groups() {
+        return JByteArray::default();
+    }
+    let stats = writer.inner.row_group_stats(rg);
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return JByteArray::default();
+    }
+    let buf = match &stats[idx].min {
+        Some(v) => v.to_be_bytes(),
+        None => return JByteArray::default(),
+    };
+    if buf.is_empty() {
+        return JByteArray::default();
+    }
+    env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatMax<'a>(
+    env: JNIEnv<'a>,
+    _class: JClass<'a>,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> JByteArray<'a> {
+    if handle == 0 {
+        return JByteArray::default();
+    }
+    let writer = unsafe { &*(handle as *const WriterHandle) };
+    let rg = rg_index as usize;
+    if rg >= writer.inner.num_row_groups() {
+        return JByteArray::default();
+    }
+    let stats = writer.inner.row_group_stats(rg);
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return JByteArray::default();
+    }
+    let buf = match &stats[idx].max {
+        Some(v) => v.to_be_bytes(),
+        None => return JByteArray::default(),
+    };
+    if buf.is_empty() {
+        return JByteArray::default();
+    }
+    env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
 // ======================== Writer.writeBatch (Arrow C Data Interface) 
========================
 
 #[no_mangle]
diff --git a/python/mosaic/_ffi.py b/python/mosaic/_ffi.py
index a8a6551..d7b0e56 100644
--- a/python/mosaic/_ffi.py
+++ b/python/mosaic/_ffi.py
@@ -146,6 +146,26 @@ lib.mosaic_writer_estimated_file_size.restype = c_int
 lib.mosaic_writer_write_batch.argtypes = [c_void_p, c_void_p, c_void_p]
 lib.mosaic_writer_write_batch.restype = c_int
 
+# ======================== Writer Stats ========================
+
+lib.mosaic_writer_num_row_groups.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_writer_num_row_groups.restype = c_int
+
+lib.mosaic_writer_row_group_num_stats.argtypes = [c_void_p, c_uint32, 
POINTER(c_uint32)]
+lib.mosaic_writer_row_group_num_stats.restype = c_int
+
+lib.mosaic_writer_row_group_stat_column_index.argtypes = [c_void_p, c_uint32, 
c_uint32, POINTER(c_uint32)]
+lib.mosaic_writer_row_group_stat_column_index.restype = c_int
+
+lib.mosaic_writer_row_group_stat_null_count.argtypes = [c_void_p, c_uint32, 
c_uint32, POINTER(c_uint64)]
+lib.mosaic_writer_row_group_stat_null_count.restype = c_int
+
+lib.mosaic_writer_row_group_stat_min.argtypes = [c_void_p, c_uint32, c_uint32, 
POINTER(c_size_t)]
+lib.mosaic_writer_row_group_stat_min.restype = POINTER(c_uint8)
+
+lib.mosaic_writer_row_group_stat_max.argtypes = [c_void_p, c_uint32, c_uint32, 
POINTER(c_size_t)]
+lib.mosaic_writer_row_group_stat_max.restype = POINTER(c_uint8)
+
 # ======================== Reader ========================
 
 lib.mosaic_reader_open.argtypes = [MosaicInputFile]
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 5f00749..70fa4f0 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -111,6 +111,7 @@ class MosaicWriter:
 
         self._stream = stream
         self._closed = False
+        self._row_group_stats = None
 
         self._write_callback = _ffi.WRITE_FN(self._on_write)
         self._flush_callback = _ffi.FLUSH_FN(self._on_flush)
@@ -187,14 +188,55 @@ class MosaicWriter:
             _check_error("estimated_file_size failed")
         return out.value
 
+    @property
+    def num_row_groups(self):
+        if self._row_group_stats is None:
+            raise RuntimeError("writer is not closed yet")
+        return len(self._row_group_stats)
+
+    def get_row_group_statistics(self, rg_index):
+        if self._row_group_stats is None:
+            raise RuntimeError("writer is not closed yet")
+        return self._row_group_stats[rg_index]
+
     def close(self):
         if not self._closed and self._handle:
             self._closed = True
             rc = lib.mosaic_writer_close(self._handle)
-            lib.mosaic_writer_free(self._handle)
-            self._handle = None
             if rc != 0:
+                lib.mosaic_writer_free(self._handle)
+                self._handle = None
                 _check_error("close failed")
+            self._collect_statistics()
+            lib.mosaic_writer_free(self._handle)
+            self._handle = None
+
+    def _collect_statistics(self):
+        n_rg = ctypes.c_uint32(0)
+        lib.mosaic_writer_num_row_groups(self._handle, ctypes.byref(n_rg))
+        all_stats = []
+        for rg in range(n_rg.value):
+            n_stats = ctypes.c_uint32(0)
+            lib.mosaic_writer_row_group_num_stats(self._handle, rg, 
ctypes.byref(n_stats))
+            rg_stats = []
+            for i in range(n_stats.value):
+                col_idx = ctypes.c_uint32(0)
+                null_count = ctypes.c_uint64(0)
+                lib.mosaic_writer_row_group_stat_column_index(
+                    self._handle, rg, i, ctypes.byref(col_idx))
+                lib.mosaic_writer_row_group_stat_null_count(
+                    self._handle, rg, i, ctypes.byref(null_count))
+                min_len = ctypes.c_size_t(0)
+                max_len = ctypes.c_size_t(0)
+                min_ptr = lib.mosaic_writer_row_group_stat_min(
+                    self._handle, rg, i, ctypes.byref(min_len))
+                max_ptr = lib.mosaic_writer_row_group_stat_max(
+                    self._handle, rg, i, ctypes.byref(max_len))
+                min_val = bytes(min_ptr[:min_len.value]) if min_ptr and 
min_len.value > 0 else None
+                max_val = bytes(max_ptr[:max_len.value]) if max_ptr and 
max_len.value > 0 else None
+                rg_stats.append(ColumnStatistics(col_idx.value, 
null_count.value, min_val, max_val))
+            all_stats.append(rg_stats)
+        self._row_group_stats = all_stats
 
     def __enter__(self):
         return self
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 6ec64fc..aff877e 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -588,3 +588,139 @@ class TestWriter:
 
         with pytest.raises(RuntimeError, match="writer is closed"):
             writer.write(batch)
+
+    def test_writer_stats_basic(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.utf8()),
+                pa.field("score", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([i * 10 for i in range(10)], type=pa.int32()),
+                pa.array([f"item_{i}" for i in range(10)]),
+                pa.array([i * 1.1 for i in range(10)]),
+            ],
+            names=["id", "name", "score"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 2])
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        assert writer.num_row_groups >= 1
+        stats = writer.get_row_group_statistics(0)
+        assert len(stats) > 0
+        for stat in stats:
+            assert isinstance(stat, ColumnStatistics)
+            assert stat.column_index in (0, 2)
+            assert stat.null_count == 0
+            assert stat.has_min_max
+            assert stat.min is not None
+            assert stat.max is not None
+
+        id_stat = next(s for s in stats if s.column_index == 0)
+        min_id = struct.unpack(">i", id_stat.min)[0]
+        max_id = struct.unpack(">i", id_stat.max)[0]
+        assert min_id == 0
+        assert max_id == 90
+
+    def test_writer_stats_with_nulls(self):
+        pa_schema = pa.schema(
+            [pa.field("a", pa.int32()), pa.field("b", pa.int64())]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([10, None, 5, 20], type=pa.int32()),
+                pa.array([None, None, 100, 50], type=pa.int64()),
+            ],
+            names=["a", "b"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        assert writer.num_row_groups == 1
+        stats = writer.get_row_group_statistics(0)
+        assert len(stats) == 2
+
+        a_stat = next(s for s in stats if s.column_index == 0)
+        assert a_stat.null_count == 1
+        assert a_stat.has_min_max
+        min_a = struct.unpack(">i", a_stat.min)[0]
+        max_a = struct.unpack(">i", a_stat.max)[0]
+        assert min_a == 5
+        assert max_a == 20
+
+        b_stat = next(s for s in stats if s.column_index == 1)
+        assert b_stat.null_count == 2
+        assert b_stat.has_min_max
+        min_b = struct.unpack(">q", b_stat.min)[0]
+        max_b = struct.unpack(">q", b_stat.max)[0]
+        assert min_b == 50
+        assert max_b == 100
+
+    def test_writer_stats_all_null(self):
+        pa_schema = pa.schema([pa.field("x", pa.int32())])
+
+        batch = pa.record_batch(
+            [pa.array([None, None, None], type=pa.int32())], names=["x"]
+        )
+
+        opts = WriterOptions(stats_columns=[0], num_buckets=1)
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        assert writer.num_row_groups == 1
+        stats = writer.get_row_group_statistics(0)
+        assert len(stats) == 1
+        assert stats[0].null_count == 3
+        assert not stats[0].has_min_max
+        assert stats[0].min is None
+        assert stats[0].max is None
+
+    def test_writer_stats_matches_reader_stats(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("score", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(50)), type=pa.int32()),
+                pa.array([i * 2.0 for i in range(50)]),
+            ],
+            names=["id", "score"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+        buf = io.BytesIO()
+        writer = MosaicWriter(buf, pa_schema, opts)
+        writer.write(batch)
+        writer.close()
+
+        data = buf.getvalue()
+        with _reader_from_bytes(data) as reader:
+            for rg in range(writer.num_row_groups):
+                w_stats = writer.get_row_group_statistics(rg)
+                r_stats = reader.get_row_group_statistics(rg)
+                assert len(w_stats) == len(r_stats)
+                for ws, rs in zip(w_stats, r_stats):
+                    assert ws.column_index == rs.column_index
+                    assert ws.null_count == rs.null_count
+                    assert ws.has_min_max == rs.has_min_max
+                    assert ws.min == rs.min
+                    assert ws.max == rs.max

Reply via email to