This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 775f0a7 Expose ColumnStatistics from MosaicWriter for all language
bindings (#16)
775f0a7 is described below
commit 775f0a75e3490b4b4c8ea6eae7a4d95213203167
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 20 16:10:57 2026 +0800
Expose ColumnStatistics from MosaicWriter for all language bindings (#16)
After close(), writer stats are available without re-reading the file.
This is useful for Paimon to obtain min/max statistics immediately
after writing for indexing and metadata purposes.
Changes across all layers:
- Rust core: add num_row_groups() and row_group_stats() to MosaicWriter
- JNI: add 6 native functions for writer stats access
- FFI: add C API functions for writer stats (used by Python and C++)
- Java: MosaicWriter caches stats on close(), exposes via
numRowGroups() and getRowGroupStatistics(rg)
- Python: MosaicWriter collects stats on close(), exposes via
num_row_groups property and get_row_group_statistics(rg)
- Docs: updated all 4 language API docs (Rust, Java, Python, C++)
---
.gitignore | 4 +
core/src/reader_tests.rs | 165 +++++++++++++++
core/src/writer.rs | 8 +
cpp/test_mosaic.cpp | 235 ++++++++++++++++++++-
docs/cpp-api.html | 20 +-
docs/java-api.html | 20 +-
docs/python-api.html | 18 +-
docs/rust-api.html | 15 +-
ffi/src/lib.rs | 169 +++++++++++++++
include/mosaic.hpp | 60 ++++--
.../org/apache/paimon/mosaic/MosaicWriter.java | 41 ++++
.../java/org/apache/paimon/mosaic/NativeLib.java | 6 +
.../apache/paimon/mosaic/MosaicRoundtripTest.java | 175 +++++++++++++++
jni/src/lib.rs | 143 +++++++++++++
python/mosaic/_ffi.py | 20 ++
python/mosaic/mosaic.py | 46 +++-
python/tests/test_mosaic.py | 136 ++++++++++++
17 files changed, 1254 insertions(+), 27 deletions(-)
diff --git a/.gitignore b/.gitignore
index 005d39a..f0b6b17 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,8 +21,12 @@ Cargo.lock
# Java
java/target/
+java/src/main/resources/native/
*.class
+# C++
+cpp/build/
+
# Python
__pycache__/
*.pyc
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index b519379..d716b08 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -3457,3 +3457,168 @@ fn test_file_open_single_io() {
"file open should use only 1 IO when metadata fits in tail prefetch"
);
}
+
+#[test]
+fn test_writer_stats_basic() {
+ let columns = vec![
+ ("id".to_string(), DataType::Int32, true),
+ ("name".to_string(), DataType::Utf8, true),
+ ("score".to_string(), DataType::Float64, true),
+ ];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ compression: COMPRESSION_NONE,
+ stats_columns: vec![0, 2],
+ num_buckets: 2,
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let rows: Vec<Vec<Value>> = (0..100)
+ .map(|i| {
+ vec![
+ Value::Integer(i * 2),
+ Value::String(format!("row_{}", i).into_bytes()),
+ Value::Double(i as f64 * 0.5),
+ ]
+ })
+ .collect();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+
+ assert_eq!(writer.num_row_groups(), 1);
+ let stats = writer.row_group_stats(0);
+ assert_eq!(stats.len(), 2);
+
+ assert_eq!(stats[0].column_index, 0);
+ assert_eq!(stats[0].null_count, 0);
+ assert!(matches!(&stats[0].min, Some(Value::Integer(0))));
+ assert!(matches!(&stats[0].max, Some(Value::Integer(198))));
+
+ assert_eq!(stats[1].column_index, 2);
+ assert_eq!(stats[1].null_count, 0);
+ match &stats[1].min {
+ Some(Value::Double(v)) => assert!(v.abs() < 1e-10),
+ other => panic!("expected Double(0.0), got {:?}", other),
+ }
+ match &stats[1].max {
+ Some(Value::Double(v)) => assert!((*v - 49.5).abs() < 1e-10),
+ other => panic!("expected Double(49.5), got {:?}", other),
+ }
+}
+
+#[test]
+fn test_writer_stats_with_nulls() {
+ let columns = vec![
+ ("a".to_string(), DataType::Int32, true),
+ ("b".to_string(), DataType::Int64, true),
+ ];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ compression: COMPRESSION_NONE,
+ stats_columns: vec![0, 1],
+ num_buckets: 1,
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let rows = vec![
+ vec![Value::Integer(10), Value::Null],
+ vec![Value::Null, Value::Null],
+ vec![Value::Integer(5), Value::BigInt(100)],
+ vec![Value::Integer(20), Value::BigInt(50)],
+ ];
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+
+ assert_eq!(writer.num_row_groups(), 1);
+ let stats = writer.row_group_stats(0);
+ assert_eq!(stats.len(), 2);
+
+ assert_eq!(stats[0].column_index, 0);
+ assert_eq!(stats[0].null_count, 1);
+ assert!(matches!(&stats[0].min, Some(Value::Integer(5))));
+ assert!(matches!(&stats[0].max, Some(Value::Integer(20))));
+
+ assert_eq!(stats[1].column_index, 1);
+ assert_eq!(stats[1].null_count, 2);
+ assert!(matches!(&stats[1].min, Some(Value::BigInt(50))));
+ assert!(matches!(&stats[1].max, Some(Value::BigInt(100))));
+}
+
+#[test]
+fn test_writer_stats_all_null() {
+ let columns = vec![("x".to_string(), DataType::Int32, true)];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ compression: COMPRESSION_NONE,
+ stats_columns: vec![0],
+ num_buckets: 1,
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let rows: Vec<Vec<Value>> = (0..10).map(|_| vec![Value::Null]).collect();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+
+ assert_eq!(writer.num_row_groups(), 1);
+ let stats = writer.row_group_stats(0);
+ assert_eq!(stats.len(), 1);
+ assert_eq!(stats[0].null_count, 10);
+ assert!(stats[0].min.is_none());
+ assert!(stats[0].max.is_none());
+}
+
+#[test]
+fn test_writer_stats_matches_reader_stats() {
+ let columns = vec![
+ ("id".to_string(), DataType::Int32, true),
+ ("score".to_string(), DataType::Float64, true),
+ ];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ compression: COMPRESSION_NONE,
+ stats_columns: vec![0, 1],
+ num_buckets: 1,
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let rows: Vec<Vec<Value>> = (0..50)
+ .map(|i| vec![Value::Integer(i), Value::Double(i as f64 * 2.0)])
+ .collect();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+
+ let writer_stats = writer.row_group_stats(0);
+
+ let data = writer.output().buf.clone();
+ let len = data.len() as u64;
+ let reader = MosaicReader::new(ByteArrayInputFile::new(data),
len).unwrap();
+ let reader_stats = reader.row_group_stats(0).unwrap();
+
+ assert_eq!(writer_stats.len(), reader_stats.len());
+ for (ws, rs) in writer_stats.iter().zip(reader_stats.iter()) {
+ assert_eq!(ws.column_index, rs.column_index);
+ assert_eq!(ws.null_count, rs.null_count);
+ assert_eq!(format!("{:?}", ws.min), format!("{:?}", rs.min));
+ assert_eq!(format!("{:?}", ws.max), format!("{:?}", rs.max));
+ }
+}
diff --git a/core/src/writer.rs b/core/src/writer.rs
index fff3668..a529bcf 100644
--- a/core/src/writer.rs
+++ b/core/src/writer.rs
@@ -187,6 +187,14 @@ impl<S: OutputFile> MosaicWriter<S> {
&mut self.out
}
+ pub fn num_row_groups(&self) -> usize {
+ self.row_group_metas.len()
+ }
+
+ pub fn row_group_stats(&self, rg_index: usize) -> &[ColumnStats] {
+ &self.row_group_metas[rg_index].stats
+ }
+
pub fn estimated_file_size(&self) -> u64 {
let written = self.out.pos();
let buffered_estimate = (self.current_buffered_size as f64 *
self.compression_ratio) as u64;
diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp
index 4de8cad..eacdfcf 100644
--- a/cpp/test_mosaic.cpp
+++ b/cpp/test_mosaic.cpp
@@ -466,6 +466,235 @@ static void test_multiple_row_groups() {
printf(" PASS test_multiple_row_groups\n");
}
+static void test_writer_stats() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32()),
+ arrow::field("name", arrow::utf8()),
+ arrow::field("score", arrow::float64()),
+ });
+
+ arrow::Int32Builder id_b;
+ arrow::StringBuilder name_b;
+ arrow::DoubleBuilder score_b;
+ for (int i = 0; i < 10; i++) {
+ assert(id_b.Append(i * 10).ok());
+ assert(name_b.Append("item_" + std::to_string(i)).ok());
+ assert(score_b.Append(i * 1.1).ok());
+ }
+ auto batch = arrow::RecordBatch::Make(schema, 10, {
+ id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(),
+ score_b.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ uint32_t stats_cols[] = {0, 2};
+ opts.stats_columns = stats_cols;
+ opts.num_stats_columns = 2;
+
+ MemBuffer write_buf;
+ struct ArrowSchema c_schema;
+ auto st = arrow::ExportSchema(*schema, &c_schema);
+ assert(st.ok());
+ mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+ struct ArrowArray c_array;
+ struct ArrowSchema c_batch_schema;
+ st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+ assert(st.ok());
+ writer.write(&c_array, &c_batch_schema);
+ writer.close();
+
+ ASSERT_EQ(writer.num_row_groups(), 1u);
+ auto stats = writer.get_row_group_statistics(0);
+ ASSERT_TRUE(stats.size() > 0);
+
+ for (auto& s : stats) {
+ ASSERT_TRUE(s.column_index == 0 || s.column_index == 2);
+ ASSERT_EQ(s.null_count, 0u);
+ ASSERT_TRUE(s.has_min_max());
+ }
+
+ auto id_stat = std::find_if(stats.begin(), stats.end(),
+ [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; });
+ ASSERT_TRUE(id_stat != stats.end());
+ ASSERT_EQ(id_stat->min_value.size(), 4u);
+ ASSERT_EQ(id_stat->max_value.size(), 4u);
+ int32_t min_id = 0, max_id = 0;
+ memcpy(&min_id, id_stat->min_value.data(), 4);
+ memcpy(&max_id, id_stat->max_value.data(), 4);
+ min_id = __builtin_bswap32(min_id);
+ max_id = __builtin_bswap32(max_id);
+ ASSERT_EQ(min_id, 0);
+ ASSERT_EQ(max_id, 90);
+ printf(" PASS test_writer_stats\n");
+}
+
+static void test_writer_stats_with_nulls() {
+ auto schema = arrow::schema({
+ arrow::field("a", arrow::int32()),
+ arrow::field("b", arrow::int64()),
+ });
+
+ arrow::Int32Builder a_b;
+ assert(a_b.Append(10).ok());
+ assert(a_b.AppendNull().ok());
+ assert(a_b.Append(5).ok());
+ assert(a_b.Append(20).ok());
+
+ arrow::Int64Builder b_b;
+ assert(b_b.AppendNull().ok());
+ assert(b_b.AppendNull().ok());
+ assert(b_b.Append(100).ok());
+ assert(b_b.Append(50).ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 4, {
+ a_b.Finish().ValueUnsafe(), b_b.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ opts.num_buckets = 1;
+ uint32_t stats_cols[] = {0, 1};
+ opts.stats_columns = stats_cols;
+ opts.num_stats_columns = 2;
+
+ MemBuffer write_buf;
+ struct ArrowSchema c_schema;
+ auto st = arrow::ExportSchema(*schema, &c_schema);
+ assert(st.ok());
+ mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+ struct ArrowArray c_array;
+ struct ArrowSchema c_batch_schema;
+ st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+ assert(st.ok());
+ writer.write(&c_array, &c_batch_schema);
+ writer.close();
+
+ ASSERT_EQ(writer.num_row_groups(), 1u);
+ auto stats = writer.get_row_group_statistics(0);
+ ASSERT_EQ(stats.size(), 2u);
+
+ auto a_stat = std::find_if(stats.begin(), stats.end(),
+ [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; });
+ ASSERT_TRUE(a_stat != stats.end());
+ ASSERT_EQ(a_stat->null_count, 1u);
+ ASSERT_TRUE(a_stat->has_min_max());
+ int32_t min_a = 0, max_a = 0;
+ memcpy(&min_a, a_stat->min_value.data(), 4);
+ memcpy(&max_a, a_stat->max_value.data(), 4);
+ min_a = __builtin_bswap32(min_a);
+ max_a = __builtin_bswap32(max_a);
+ ASSERT_EQ(min_a, 5);
+ ASSERT_EQ(max_a, 20);
+
+ auto b_stat = std::find_if(stats.begin(), stats.end(),
+ [](const mosaic::ColumnStatistics& s) { return s.column_index == 1; });
+ ASSERT_TRUE(b_stat != stats.end());
+ ASSERT_EQ(b_stat->null_count, 2u);
+ ASSERT_TRUE(b_stat->has_min_max());
+ int64_t min_b = 0, max_b = 0;
+ memcpy(&min_b, b_stat->min_value.data(), 8);
+ memcpy(&max_b, b_stat->max_value.data(), 8);
+ min_b = __builtin_bswap64(min_b);
+ max_b = __builtin_bswap64(max_b);
+ ASSERT_EQ(min_b, 50);
+ ASSERT_EQ(max_b, 100);
+ printf(" PASS test_writer_stats_with_nulls\n");
+}
+
+static void test_writer_stats_all_null() {
+ auto schema = arrow::schema({
+ arrow::field("x", arrow::int32()),
+ });
+
+ arrow::Int32Builder x_b;
+ assert(x_b.AppendNull().ok());
+ assert(x_b.AppendNull().ok());
+ assert(x_b.AppendNull().ok());
+
+ auto batch = arrow::RecordBatch::Make(schema, 3, {
+ x_b.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ opts.num_buckets = 1;
+ uint32_t stats_cols[] = {0};
+ opts.stats_columns = stats_cols;
+ opts.num_stats_columns = 1;
+
+ MemBuffer write_buf;
+ struct ArrowSchema c_schema;
+ auto st = arrow::ExportSchema(*schema, &c_schema);
+ assert(st.ok());
+ mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+ struct ArrowArray c_array;
+ struct ArrowSchema c_batch_schema;
+ st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+ assert(st.ok());
+ writer.write(&c_array, &c_batch_schema);
+ writer.close();
+
+ ASSERT_EQ(writer.num_row_groups(), 1u);
+ auto stats = writer.get_row_group_statistics(0);
+ ASSERT_EQ(stats.size(), 1u);
+ ASSERT_EQ(stats[0].null_count, 3u);
+ ASSERT_TRUE(!stats[0].has_min_max());
+ printf(" PASS test_writer_stats_all_null\n");
+}
+
+static void test_writer_stats_matches_reader() {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32()),
+ arrow::field("value", arrow::float64()),
+ });
+
+ arrow::Int32Builder id_b;
+ arrow::DoubleBuilder val_b;
+ for (int i = 0; i < 20; i++) {
+ assert(id_b.Append(i * 5).ok());
+ assert(val_b.Append(i * 2.5).ok());
+ }
+ auto batch = arrow::RecordBatch::Make(schema, 20, {
+ id_b.Finish().ValueUnsafe(), val_b.Finish().ValueUnsafe(),
+ });
+
+ mosaic::WriterOptions opts;
+ opts.num_buckets = 1;
+ uint32_t stats_cols[] = {0, 1};
+ opts.stats_columns = stats_cols;
+ opts.num_stats_columns = 2;
+
+ MemBuffer write_buf;
+ struct ArrowSchema c_schema;
+ auto st = arrow::ExportSchema(*schema, &c_schema);
+ assert(st.ok());
+ mosaic::Writer writer(make_output(write_buf), &c_schema, opts);
+
+ struct ArrowArray c_array;
+ struct ArrowSchema c_batch_schema;
+ st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema);
+ assert(st.ok());
+ writer.write(&c_array, &c_batch_schema);
+ writer.close();
+
+ auto writer_stats = writer.get_row_group_statistics(0);
+
+ MemBuffer buf;
+ buf.data = write_buf.data;
+ auto reader = mosaic::make_reader(make_input(buf), buf.data.size());
+ auto reader_stats = reader.get_row_group_statistics(0);
+
+ ASSERT_EQ(writer_stats.size(), reader_stats.size());
+ for (size_t i = 0; i < writer_stats.size(); i++) {
+ ASSERT_EQ(writer_stats[i].column_index, reader_stats[i].column_index);
+ ASSERT_EQ(writer_stats[i].null_count, reader_stats[i].null_count);
+ ASSERT_EQ(writer_stats[i].min_value, reader_stats[i].min_value);
+ ASSERT_EQ(writer_stats[i].max_value, reader_stats[i].max_value);
+ }
+ printf(" PASS test_writer_stats_matches_reader\n");
+}
+
int main() {
printf("Running Mosaic C++ tests...\n");
test_basic_roundtrip();
@@ -476,6 +705,10 @@ int main() {
test_compression_zstd();
test_schema_roundtrip();
test_multiple_row_groups();
- printf("All %d tests passed.\n", 8);
+ test_writer_stats();
+ test_writer_stats_with_nulls();
+ test_writer_stats_all_null();
+ test_writer_stats_matches_reader();
+ printf("All %d tests passed.\n", 12);
return 0;
}
diff --git a/docs/cpp-api.html b/docs/cpp-api.html
index a5f4294..79f2203 100644
--- a/docs/cpp-api.html
+++ b/docs/cpp-api.html
@@ -175,6 +175,8 @@ g++ -std=c++17 -I include/ example.cpp \
<tr><td><code>write(&ffi_array,
&ffi_schema)</code></td><td><code>void</code></td><td>Write an Arrow
RecordBatch via C Data Interface</td></tr>
<tr><td><code>estimated_file_size()</code></td><td><code>int64_t</code></td><td>Estimated
output file size in bytes (for file rolling)</td></tr>
<tr><td><code>close()</code></td><td><code>void</code></td><td>Flush remaining
data and write footer</td></tr>
+
<tr><td><code>num_row_groups()</code></td><td><code>uint32_t</code></td><td>Number
of row groups written (available after close)</td></tr>
+
<tr><td><code>get_row_group_statistics(rg)</code></td><td><code>vector<ColumnStatistics></code></td><td>Column
statistics for a row group (available after close)</td></tr>
</tbody>
</table>
@@ -262,8 +264,8 @@ reader.read_row_group(<span class="num">0</span>,
projected, <span class="num">2
<h3>Column Statistics (Filter Pushdown)</h3>
<p>
- When stats columns are configured during writing, the reader
can access per-row-group
- min/max statistics to skip row groups that don't match a
filter predicate:
+ When stats columns are configured during writing, statistics
are available both
+ from the writer (after close) and from the reader:
</p>
<pre><code><span class="cmt">// Writing with stats (arrow_schema is an
ArrowSchema* from C Data Interface)</span>
<span class="kw">uint32_t</span> stats_cols[] = { <span class="num">0</span>,
<span class="num">2</span> };
@@ -272,7 +274,19 @@ opts.compression = <span class="num">1</span>;
opts.stats_columns = stats_cols;
opts.num_stats_columns = <span class="num">2</span>;
<span class="ty">mosaic</span>::<span class="ty">Writer</span>
writer(std::move(cbs), arrow_schema, opts);</code></pre>
-<pre><code><span class="cmt">// Reading stats</span>
+<pre><code><span class="cmt">// Get stats directly from the writer after
close</span>
+writer.close();
+<span class="kw">for</span> (<span class="kw">uint32_t</span> rg = <span
class="num">0</span>; rg < writer.num_row_groups(); rg++) {
+ <span class="kw">auto</span> stats = writer.get_row_group_statistics(rg);
+ <span class="kw">for</span> (<span class="kw">const auto</span>& stat
: stats) {
+ <span class="kw">uint32_t</span> col_idx = stat.column_index;
+ <span class="kw">uint64_t</span> null_count = stat.null_count;
+ <span class="kw">if</span> (stat.has_min_max()) {
+ <span class="cmt">// stat.min_value / stat.max_value are
std::vector<uint8_t></span>
+ }
+ }
+}</code></pre>
+<pre><code><span class="cmt">// Or read stats from the reader</span>
<span class="kw">for</span> (<span class="kw">uint32_t</span> rg = <span
class="num">0</span>; rg < reader.num_row_groups(); rg++) {
<span class="kw">auto</span> stats = reader.get_row_group_statistics(rg);
<span class="kw">for</span> (<span class="kw">const auto</span>& stat
: stats) {
diff --git a/docs/java-api.html b/docs/java-api.html
index edc4323..8e3446c 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -180,6 +180,8 @@
<tr><td><code>write(VectorSchemaRoot)</code></td><td><code>void</code></td><td>Write
an Arrow batch (zero-copy via C Data Interface)</td></tr>
<tr><td><code>estimatedFileSize()</code></td><td><code>long</code></td><td>Estimated
output file size in bytes (for file rolling)</td></tr>
<tr><td><code>close()</code></td><td><code>void</code></td><td>Flush remaining
data and write footer</td></tr>
+
<tr><td><code>numRowGroups()</code></td><td><code>int</code></td><td>Number of
row groups written (available after close)</td></tr>
+
<tr><td><code>getRowGroupStatistics(rg)</code></td><td><code>List<ColumnStatistics></code></td><td>Column
statistics for a row group (available after close)</td></tr>
</tbody>
</table>
@@ -262,8 +264,22 @@
<h3>Column Statistics (Filter Pushdown)</h3>
<p>
- When stats columns are configured during writing, the reader
can access per-row-group
- min/max statistics to skip row groups that don't match a
filter predicate:
+ When stats columns are configured during writing, statistics
are available both
+ from the writer (after close) and from the reader. This allows
you to obtain
+ min/max statistics immediately after writing without
re-reading the file:
+ </p>
+<pre><code><span class="cmt">// Get stats directly from the writer after
close</span>
+<span class="ty">MosaicWriter</span> writer = <span class="kw">new</span>
<span class="ty">MosaicWriter</span>(baos, arrowSchema, opts, allocator);
+writer.write(root);
+writer.close();
+
+<span class="kw">for</span> (<span class="kw">int</span> rg = <span
class="num">0</span>; rg < writer.numRowGroups(); rg++) {
+ <span class="ty">List</span><<span
class="ty">ColumnStatistics</span>> stats = writer.getRowGroupStatistics(rg);
+ <span class="cmt">// use stats for indexing, metadata, etc.</span>
+}</code></pre>
+ <p>
+ The reader can also access per-row-group statistics to skip
row groups
+ that don't match a filter predicate:
</p>
<pre><code><span class="ty">WriterOptions</span> opts = <span
class="kw">new</span> <span class="ty">WriterOptions</span>()
.statsColumns(<span class="num">0</span>, <span class="num">2</span>);
<span class="cmt">// build stats for columns 0 and 2</span></code></pre>
diff --git a/docs/python-api.html b/docs/python-api.html
index 29f4631..b66761b 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -145,12 +145,14 @@ data = buf.getvalue()</code></pre>
<h3>Writer Methods</h3>
<table>
<thead>
- <tr><th>Method</th><th>Return</th><th>Description</th></tr>
+ <tr><th>Method /
Property</th><th>Return</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>write(batch)</code></td><td><code>None</code></td><td>Write a
<code>pyarrow.RecordBatch</code> or <code>pyarrow.Table</code></td></tr>
<tr><td><code>estimated_file_size()</code></td><td><code>int</code></td><td>Estimated
output file size in bytes (for file rolling)</td></tr>
<tr><td><code>close()</code></td><td><code>None</code></td><td>Flush remaining
data and write footer</td></tr>
+
<tr><td><code>num_row_groups</code></td><td><code>int</code></td><td>Number of
row groups written (available after close)</td></tr>
+
<tr><td><code>get_row_group_statistics(rg)</code></td><td><code>list[ColumnStatistics]</code></td><td>Column
statistics for a row group (available after close)</td></tr>
</tbody>
</table>
@@ -216,11 +218,19 @@ batch = reader.read_row_group(rg, columns=projected)
<h3>Column Statistics (Filter Pushdown)</h3>
<p>
- When stats columns are configured during writing, the reader
can access per-row-group
- min/max statistics to skip row groups that don't match a
filter predicate:
+ When stats columns are configured during writing, statistics
are available both
+ from the writer (after close) and from the reader:
</p>
<pre><code>opts = WriterOptions(stats_columns=[<span class="num">0</span>,
<span class="num">2</span>]) <span class="cmt"># build stats for columns 0 and
2</span></code></pre>
-<pre><code><span class="cmt"># Reading stats</span>
+<pre><code><span class="cmt"># Get stats directly from the writer after
close</span>
+<span class="kw">with</span> MosaicWriter(buf, pa_schema, opts) <span
class="kw">as</span> writer:
+ writer.write(batch)
+
+<span class="kw">for</span> rg <span class="kw">in</span>
range(writer.num_row_groups):
+ <span class="kw">for</span> stat <span class="kw">in</span>
writer.get_row_group_statistics(rg):
+ col_idx = stat.column_index
+ null_count = stat.null_count</code></pre>
+<pre><code><span class="cmt"># Or read stats from the reader</span>
<span class="kw">for</span> rg <span class="kw">in</span>
range(reader.num_row_groups):
<span class="kw">for</span> stat <span class="kw">in</span>
reader.get_row_group_statistics(rg):
col_idx = stat.column_index
diff --git a/docs/rust-api.html b/docs/rust-api.html
index 81dcd13..bd82f7f 100644
--- a/docs/rust-api.html
+++ b/docs/rust-api.html
@@ -119,6 +119,8 @@ writer.close().unwrap();
<tr><td><code>write_batch(&RecordBatch)</code></td><td><code>io::Result<()></code></td><td>Write
an Arrow RecordBatch</td></tr>
<tr><td><code>estimated_file_size()</code></td><td><code>u64</code></td><td>Estimated
output file size in bytes (for file rolling)</td></tr>
<tr><td><code>close()</code></td><td><code>io::Result<()></code></td><td>Flush
remaining data and write footer</td></tr>
+
<tr><td><code>num_row_groups()</code></td><td><code>usize</code></td><td>Number
of row groups written (available after close)</td></tr>
+
<tr><td><code>row_group_stats(rg_index)</code></td><td><code>&[ColumnStats]</code></td><td>Column
statistics for a row group (available after close)</td></tr>
</tbody>
</table>
@@ -225,7 +227,18 @@ writer.close().unwrap();
..Default::default()
})?;</code></pre>
- <h3>Reading Stats</h3>
+ <h3>Getting Stats from Writer (after close)</h3>
+<pre><code>writer.close()?;
+
+<span class="kw">for</span> rg_idx <span class="kw">in</span> <span
class="num">0</span>..writer.num_row_groups() {
+ <span class="kw">let</span> stats = writer.row_group_stats(rg_idx);
+ <span class="kw">for</span> stat <span class="kw">in</span> stats {
+ println!(<span class="str">"col={} nulls={} min={:?} max={:?}"</span>,
+ stat.column_index, stat.null_count, stat.min, stat.max);
+ }
+}</code></pre>
+
+ <h3>Reading Stats from Reader</h3>
<pre><code><span class="kw">for</span> rg_idx <span class="kw">in</span> <span
class="num">0</span>..reader.num_row_groups() {
<span class="kw">let</span> stats = reader.row_group_stats(rg_idx)?;
<span class="kw">for</span> stat <span class="kw">in</span> stats {
diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs
index 7e1cd8f..e2a708a 100644
--- a/ffi/src/lib.rs
+++ b/ffi/src/lib.rs
@@ -255,6 +255,175 @@ pub unsafe extern "C" fn
mosaic_writer_estimated_file_size(
0
}
+// ======================== Writer Stats ========================
+
+/// Get the number of row groups in a closed writer.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_num_row_groups(
+ handle: *const MosaicWriterHandle,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ *out = (&*handle).inner.num_row_groups() as u32;
+ 0
+}
+
+/// Get number of stats entries for a writer row group.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_num_stats(
+ handle: *const MosaicWriterHandle,
+ rg_index: u32,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let rg = rg_index as usize;
+ if rg >= h.inner.num_row_groups() {
+ set_error("rg_index out of range".into());
+ return -1;
+ }
+ *out = h.inner.row_group_stats(rg).len() as u32;
+ 0
+}
+
+/// Get the column index for a writer stats entry.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_column_index(
+ handle: *const MosaicWriterHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let rg = rg_index as usize;
+ if rg >= h.inner.num_row_groups() {
+ set_error("rg_index out of range".into());
+ return -1;
+ }
+ let stats = h.inner.row_group_stats(rg);
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ set_error("stat_index out of range".into());
+ return -1;
+ }
+ *out = stats[idx].column_index as u32;
+ 0
+}
+
+/// Get the null count for a writer stats entry.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_null_count(
+ handle: *const MosaicWriterHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out: *mut u64,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ let rg = rg_index as usize;
+ if rg >= h.inner.num_row_groups() {
+ set_error("rg_index out of range".into());
+ return -1;
+ }
+ let stats = h.inner.row_group_stats(rg);
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ set_error("stat_index out of range".into());
+ return -1;
+ }
+ *out = stats[idx].null_count as u64;
+ 0
+}
+
+thread_local! {
+ static WRITER_STAT_MIN_BUF: RefCell<Vec<u8>> = const {
RefCell::new(Vec::new()) };
+ static WRITER_STAT_MAX_BUF: RefCell<Vec<u8>> = const {
RefCell::new(Vec::new()) };
+}
+
+/// Get the min value for a writer stats entry as raw bytes.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_min(
+ handle: *const MosaicWriterHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out_len: *mut usize,
+) -> *const u8 {
+ writer_stat_value_ptr(handle, rg_index, stat_index, out_len, true)
+}
+
+/// Get the max value for a writer stats entry as raw bytes.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_writer_row_group_stat_max(
+ handle: *const MosaicWriterHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out_len: *mut usize,
+) -> *const u8 {
+ writer_stat_value_ptr(handle, rg_index, stat_index, out_len, false)
+}
+
+unsafe fn writer_stat_value_ptr(
+ handle: *const MosaicWriterHandle,
+ rg_index: u32,
+ stat_index: u32,
+ out_len: *mut usize,
+ is_min: bool,
+) -> *const u8 {
+ if handle.is_null() || out_len.is_null() {
+ return ptr::null();
+ }
+ let h = &*handle;
+ let rg = rg_index as usize;
+ if rg >= h.inner.num_row_groups() {
+ *out_len = 0;
+ return ptr::null();
+ }
+ let stats = h.inner.row_group_stats(rg);
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ *out_len = 0;
+ return ptr::null();
+ }
+ let value = if is_min {
+ &stats[idx].min
+ } else {
+ &stats[idx].max
+ };
+ match value {
+ Some(v) => {
+ let bytes = v.to_be_bytes();
+ let buf_ref = if is_min {
+ &WRITER_STAT_MIN_BUF
+ } else {
+ &WRITER_STAT_MAX_BUF
+ };
+ buf_ref.with(|buf| {
+ let mut b = buf.borrow_mut();
+ *b = bytes;
+ *out_len = b.len();
+ b.as_ptr()
+ })
+ }
+ None => {
+ *out_len = 0;
+ ptr::null()
+ }
+ }
+}
+
/// Write an Arrow RecordBatch to the writer via the Arrow C Data Interface.
/// The caller provides ArrowArray and ArrowSchema pointers that represent the
batch.
/// Ownership of both structs transfers to the callee; the caller's structs
are zeroed.
diff --git a/include/mosaic.hpp b/include/mosaic.hpp
index 45c8fd1..68ce2e7 100644
--- a/include/mosaic.hpp
+++ b/include/mosaic.hpp
@@ -94,6 +94,16 @@ struct WriterOptions {
uint32_t page_size_threshold = 32 * 1024;
};
+// ======================== Statistics ========================
+
+struct ColumnStatistics {
+ uint32_t column_index;
+ uint64_t null_count;
+ std::vector<uint8_t> min_value;
+ std::vector<uint8_t> max_value;
+ bool has_min_max() const { return !min_value.empty(); }
+};
+
class Writer {
public:
/// Construct a writer. `arrow_schema` is a pointer to an ArrowSchema
(Arrow C Data Interface).
@@ -123,17 +133,18 @@ public:
Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;
Writer(Writer&& other) noexcept
- : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) {
+ : callbacks_(std::move(other.callbacks_)), handle_(other.handle_),
closed_(other.closed_) {
other.handle_ = nullptr;
}
Writer& operator=(Writer&& other) noexcept {
if (this != &other) {
if (handle_) {
- mosaic_writer_close(handle_);
+ if (!closed_) mosaic_writer_close(handle_);
mosaic_writer_free(handle_);
}
callbacks_ = std::move(other.callbacks_);
handle_ = other.handle_;
+ closed_ = other.closed_;
other.handle_ = nullptr;
}
return *this;
@@ -141,7 +152,7 @@ public:
~Writer() {
if (handle_) {
- mosaic_writer_close(handle_);
+ if (!closed_) mosaic_writer_close(handle_);
mosaic_writer_free(handle_);
}
}
@@ -160,22 +171,43 @@ public:
}
void close() {
- check(mosaic_writer_close(handle_));
+ if (!closed_) {
+ check(mosaic_writer_close(handle_));
+ closed_ = true;
+ }
+ }
+
+ uint32_t num_row_groups() const {
+ uint32_t out = 0;
+ check(mosaic_writer_num_row_groups(handle_, &out));
+ return out;
+ }
+
+ std::vector<ColumnStatistics> get_row_group_statistics(uint32_t rg_index)
const {
+ uint32_t n = 0;
+ check(mosaic_writer_row_group_num_stats(handle_, rg_index, &n));
+ std::vector<ColumnStatistics> result;
+ result.reserve(n);
+ for (uint32_t i = 0; i < n; i++) {
+ ColumnStatistics s;
+ check(mosaic_writer_row_group_stat_column_index(handle_, rg_index,
i, &s.column_index));
+ check(mosaic_writer_row_group_stat_null_count(handle_, rg_index,
i, &s.null_count));
+ size_t min_len = 0, max_len = 0;
+ const uint8_t* min_ptr = mosaic_writer_row_group_stat_min(handle_,
rg_index, i, &min_len);
+ const uint8_t* max_ptr = mosaic_writer_row_group_stat_max(handle_,
rg_index, i, &max_len);
+ if (min_ptr && min_len > 0)
+ s.min_value.assign(min_ptr, min_ptr + min_len);
+ if (max_ptr && max_len > 0)
+ s.max_value.assign(max_ptr, max_ptr + max_len);
+ result.push_back(std::move(s));
+ }
+ return result;
}
private:
std::shared_ptr<OutputFile> callbacks_;
MosaicWriterHandle* handle_ = nullptr;
-};
-
-// ======================== Statistics ========================
-
-struct ColumnStatistics {
- uint32_t column_index;
- uint64_t null_count;
- std::vector<uint8_t> min_value;
- std::vector<uint8_t> max_value;
- bool has_min_max() const { return !min_value.empty(); }
+ bool closed_ = false;
};
// ======================== Reader ========================
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
index 1fa49fe..9448199 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
@@ -20,6 +20,9 @@
package org.apache.paimon.mosaic;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
@@ -33,6 +36,7 @@ public class MosaicWriter implements AutoCloseable {
private long handle;
private boolean closed;
private final BufferAllocator allocator;
+ private List<List<ColumnStatistics>> rowGroupStats;
public MosaicWriter(OutputStream outputStream, Schema arrowSchema,
BufferAllocator allocator) {
this(outputStream, arrowSchema, new WriterOptions(), allocator);
@@ -95,16 +99,53 @@ public class MosaicWriter implements AutoCloseable {
return NativeLib.nativeWriterEstimatedSize(handle);
}
+ public int numRowGroups() {
+ if (rowGroupStats == null) {
+ throw new IllegalStateException("writer is not closed yet");
+ }
+ return rowGroupStats.size();
+ }
+
+ public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
+ if (rowGroupStats == null) {
+ throw new IllegalStateException("writer is not closed yet");
+ }
+ return rowGroupStats.get(rgIndex);
+ }
+
@Override
public void close() {
if (!closed && handle != 0) {
closed = true;
try {
NativeLib.nativeWriterClose(handle);
+ collectStatistics();
} finally {
NativeLib.nativeWriterFree(handle);
handle = 0;
}
}
}
+
+ private void collectStatistics() {
+ int numRg = NativeLib.nativeWriterNumRowGroups(handle);
+ List<List<ColumnStatistics>> allStats = new ArrayList<>(numRg);
+ for (int rg = 0; rg < numRg; rg++) {
+ int n = NativeLib.nativeWriterRowGroupNumStats(handle, rg);
+ if (n <= 0) {
+ allStats.add(Collections.emptyList());
+ continue;
+ }
+ List<ColumnStatistics> rgStats = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ rgStats.add(new ColumnStatistics(
+ NativeLib.nativeWriterRowGroupStatColumnIndex(handle,
rg, i),
+ NativeLib.nativeWriterRowGroupStatNullCount(handle,
rg, i),
+ NativeLib.nativeWriterRowGroupStatMin(handle, rg, i),
+ NativeLib.nativeWriterRowGroupStatMax(handle, rg, i)));
+ }
+ allStats.add(Collections.unmodifiableList(rgStats));
+ }
+ this.rowGroupStats = Collections.unmodifiableList(allStats);
+ }
}
diff --git a/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
b/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
index d09cd9b..695173f 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
@@ -110,6 +110,12 @@ final class NativeLib {
static native void nativeWriterFree(long handle);
static native long nativeWriterEstimatedSize(long handle);
static native void nativeWriterWriteBatch(long writerHandle, long
arrayAddr, long schemaAddr);
+ static native int nativeWriterNumRowGroups(long handle);
+ static native int nativeWriterRowGroupNumStats(long handle, int rgIndex);
+ static native int nativeWriterRowGroupStatColumnIndex(long handle, int
rgIndex, int statIndex);
+ static native long nativeWriterRowGroupStatNullCount(long handle, int
rgIndex, int statIndex);
+ static native byte[] nativeWriterRowGroupStatMin(long handle, int rgIndex,
int statIndex);
+ static native byte[] nativeWriterRowGroupStatMax(long handle, int rgIndex,
int statIndex);
// Reader
static native long nativeReaderOpen(Object inputFile, long fileLength);
diff --git
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 523c55e..1aaf9bb 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -727,4 +727,179 @@ public class MosaicRoundtripTest {
assertTrue(readSchema.getFields().get(1).isNullable());
}
}
+
+ @Test
+ public void testWriterStats() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", ArrowType.Utf8.INSTANCE),
+ Field.nullable("score", new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+ ));
+
+ WriterOptions opts = new WriterOptions().statsColumns(0, 2);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts,
allocator);
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ VarCharVector names = (VarCharVector) root.getVector("name");
+ Float8Vector scores = (Float8Vector) root.getVector("score");
+
+ int n = 10;
+ ids.allocateNew(n);
+ names.allocateNew(n);
+ scores.allocateNew(n);
+
+ for (int i = 0; i < n; i++) {
+ ids.set(i, i * 10);
+ names.setSafe(i, ("item_" + i).getBytes());
+ scores.set(i, i * 1.1);
+ }
+ root.setRowCount(n);
+ writer.write(root);
+ }
+ writer.close();
+
+ assertEquals(1, writer.numRowGroups());
+ List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+ assertTrue(stats.size() > 0);
+ for (ColumnStatistics stat : stats) {
+ assertTrue(stat.getColumnIndex() == 0 || stat.getColumnIndex() ==
2);
+ assertEquals(0, stat.getNullCount());
+ assertTrue(stat.hasMinMax());
+ assertNotNull(stat.getMin());
+ assertNotNull(stat.getMax());
+ }
+
+ ColumnStatistics idStat = stats.stream().filter(s ->
s.getColumnIndex() == 0).findFirst().get();
+ int minId =
ByteBuffer.wrap(idStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+ int maxId =
ByteBuffer.wrap(idStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+ assertEquals(0, minId);
+ assertEquals(90, maxId);
+ }
+
+ @Test
+ public void testWriterStatsWithNulls() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("a", new ArrowType.Int(32, true)),
+ Field.nullable("b", new ArrowType.Int(64, true))
+ ));
+
+ WriterOptions opts = new WriterOptions().statsColumns(0,
1).numBuckets(1);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts,
allocator);
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector aVec = (IntVector) root.getVector("a");
+ BigIntVector bVec = (BigIntVector) root.getVector("b");
+ aVec.allocateNew(4);
+ bVec.allocateNew(4);
+
+ aVec.set(0, 10);
+ aVec.setNull(1);
+ aVec.set(2, 5);
+ aVec.set(3, 20);
+
+ bVec.setNull(0);
+ bVec.setNull(1);
+ bVec.set(2, 100);
+ bVec.set(3, 50);
+
+ root.setRowCount(4);
+ writer.write(root);
+ }
+ writer.close();
+
+ assertEquals(1, writer.numRowGroups());
+ List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+ assertEquals(2, stats.size());
+
+ ColumnStatistics aStat = stats.stream().filter(s -> s.getColumnIndex()
== 0).findFirst().get();
+ assertEquals(1, aStat.getNullCount());
+ assertTrue(aStat.hasMinMax());
+ int minA =
ByteBuffer.wrap(aStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+ int maxA =
ByteBuffer.wrap(aStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+ assertEquals(5, minA);
+ assertEquals(20, maxA);
+
+ ColumnStatistics bStat = stats.stream().filter(s -> s.getColumnIndex()
== 1).findFirst().get();
+ assertEquals(2, bStat.getNullCount());
+ assertTrue(bStat.hasMinMax());
+ long minB =
ByteBuffer.wrap(bStat.getMin()).order(ByteOrder.BIG_ENDIAN).getLong();
+ long maxB =
ByteBuffer.wrap(bStat.getMax()).order(ByteOrder.BIG_ENDIAN).getLong();
+ assertEquals(50, minB);
+ assertEquals(100, maxB);
+ }
+
+ @Test
+ public void testWriterStatsAllNull() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true))
+ ));
+
+ WriterOptions opts = new WriterOptions().statsColumns(0).numBuckets(1);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts,
allocator);
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector xVec = (IntVector) root.getVector("x");
+ xVec.allocateNew(3);
+ xVec.setNull(0);
+ xVec.setNull(1);
+ xVec.setNull(2);
+ root.setRowCount(3);
+ writer.write(root);
+ }
+ writer.close();
+
+ assertEquals(1, writer.numRowGroups());
+ List<ColumnStatistics> stats = writer.getRowGroupStatistics(0);
+ assertEquals(1, stats.size());
+ assertEquals(3, stats.get(0).getNullCount());
+ assertFalse(stats.get(0).hasMinMax());
+ }
+
+ @Test
+ public void testWriterStatsMatchesReaderStats() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("value", new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+ ));
+
+ WriterOptions opts = new WriterOptions().statsColumns(0,
1).numBuckets(1);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts,
allocator);
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ Float8Vector values = (Float8Vector) root.getVector("value");
+ int n = 20;
+ ids.allocateNew(n);
+ values.allocateNew(n);
+ for (int i = 0; i < n; i++) {
+ ids.set(i, i * 5);
+ values.set(i, i * 2.5);
+ }
+ root.setRowCount(n);
+ writer.write(root);
+ }
+ writer.close();
+
+ byte[] data = baos.toByteArray();
+ try (MosaicReader reader = readerFromBytes(data)) {
+ List<ColumnStatistics> writerStats =
writer.getRowGroupStatistics(0);
+ List<ColumnStatistics> readerStats =
reader.getRowGroupStatistics(0);
+
+ assertEquals(writerStats.size(), readerStats.size());
+ for (int i = 0; i < writerStats.size(); i++) {
+ ColumnStatistics ws = writerStats.get(i);
+ ColumnStatistics rs = readerStats.get(i);
+ assertEquals(ws.getColumnIndex(), rs.getColumnIndex());
+ assertEquals(ws.getNullCount(), rs.getNullCount());
+ assertEquals(ws.hasMinMax(), rs.hasMinMax());
+ assertArrayEquals(ws.getMin(), rs.getMin());
+ assertArrayEquals(ws.getMax(), rs.getMax());
+ }
+ }
+ }
}
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
index e32ae1c..00d7f6e 100644
--- a/jni/src/lib.rs
+++ b/jni/src/lib.rs
@@ -369,6 +369,149 @@ pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterEstim
writer.inner.estimated_file_size() as jlong
}
+// ======================== Writer Stats ========================
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterNumRowGroups(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) -> jint {
+ if handle == 0 {
+ return 0;
+ }
+ let writer = unsafe { &*(handle as *const WriterHandle) };
+ writer.inner.num_row_groups() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupNumStats(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+) -> jint {
+ if handle == 0 {
+ return 0;
+ }
+ let writer = unsafe { &*(handle as *const WriterHandle) };
+ let rg = rg_index as usize;
+ if rg >= writer.inner.num_row_groups() {
+ return -1;
+ }
+ writer.inner.row_group_stats(rg).len() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatColumnIndex(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> jint {
+ if handle == 0 {
+ return -1;
+ }
+ let writer = unsafe { &*(handle as *const WriterHandle) };
+ let rg = rg_index as usize;
+ if rg >= writer.inner.num_row_groups() {
+ return -1;
+ }
+ let stats = writer.inner.row_group_stats(rg);
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return -1;
+ }
+ stats[idx].column_index as jint
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatNullCount(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> jlong {
+ if handle == 0 {
+ return 0;
+ }
+ let writer = unsafe { &*(handle as *const WriterHandle) };
+ let rg = rg_index as usize;
+ if rg >= writer.inner.num_row_groups() {
+ return 0;
+ }
+ let stats = writer.inner.row_group_stats(rg);
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return 0;
+ }
+ stats[idx].null_count as jlong
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatMin<'a>(
+ env: JNIEnv<'a>,
+ _class: JClass<'a>,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> JByteArray<'a> {
+ if handle == 0 {
+ return JByteArray::default();
+ }
+ let writer = unsafe { &*(handle as *const WriterHandle) };
+ let rg = rg_index as usize;
+ if rg >= writer.inner.num_row_groups() {
+ return JByteArray::default();
+ }
+ let stats = writer.inner.row_group_stats(rg);
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return JByteArray::default();
+ }
+ let buf = match &stats[idx].min {
+ Some(v) => v.to_be_bytes(),
+ None => return JByteArray::default(),
+ };
+ if buf.is_empty() {
+ return JByteArray::default();
+ }
+ env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterRowGroupStatMax<'a>(
+ env: JNIEnv<'a>,
+ _class: JClass<'a>,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> JByteArray<'a> {
+ if handle == 0 {
+ return JByteArray::default();
+ }
+ let writer = unsafe { &*(handle as *const WriterHandle) };
+ let rg = rg_index as usize;
+ if rg >= writer.inner.num_row_groups() {
+ return JByteArray::default();
+ }
+ let stats = writer.inner.row_group_stats(rg);
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return JByteArray::default();
+ }
+ let buf = match &stats[idx].max {
+ Some(v) => v.to_be_bytes(),
+ None => return JByteArray::default(),
+ };
+ if buf.is_empty() {
+ return JByteArray::default();
+ }
+ env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
// ======================== Writer.writeBatch (Arrow C Data Interface)
========================
#[no_mangle]
diff --git a/python/mosaic/_ffi.py b/python/mosaic/_ffi.py
index a8a6551..d7b0e56 100644
--- a/python/mosaic/_ffi.py
+++ b/python/mosaic/_ffi.py
@@ -146,6 +146,26 @@ lib.mosaic_writer_estimated_file_size.restype = c_int
lib.mosaic_writer_write_batch.argtypes = [c_void_p, c_void_p, c_void_p]
lib.mosaic_writer_write_batch.restype = c_int
+# ======================== Writer Stats ========================
+
+lib.mosaic_writer_num_row_groups.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_writer_num_row_groups.restype = c_int
+
+lib.mosaic_writer_row_group_num_stats.argtypes = [c_void_p, c_uint32,
POINTER(c_uint32)]
+lib.mosaic_writer_row_group_num_stats.restype = c_int
+
+lib.mosaic_writer_row_group_stat_column_index.argtypes = [c_void_p, c_uint32,
c_uint32, POINTER(c_uint32)]
+lib.mosaic_writer_row_group_stat_column_index.restype = c_int
+
+lib.mosaic_writer_row_group_stat_null_count.argtypes = [c_void_p, c_uint32,
c_uint32, POINTER(c_uint64)]
+lib.mosaic_writer_row_group_stat_null_count.restype = c_int
+
+lib.mosaic_writer_row_group_stat_min.argtypes = [c_void_p, c_uint32, c_uint32,
POINTER(c_size_t)]
+lib.mosaic_writer_row_group_stat_min.restype = POINTER(c_uint8)
+
+lib.mosaic_writer_row_group_stat_max.argtypes = [c_void_p, c_uint32, c_uint32,
POINTER(c_size_t)]
+lib.mosaic_writer_row_group_stat_max.restype = POINTER(c_uint8)
+
# ======================== Reader ========================
lib.mosaic_reader_open.argtypes = [MosaicInputFile]
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 5f00749..70fa4f0 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -111,6 +111,7 @@ class MosaicWriter:
self._stream = stream
self._closed = False
+ self._row_group_stats = None
self._write_callback = _ffi.WRITE_FN(self._on_write)
self._flush_callback = _ffi.FLUSH_FN(self._on_flush)
@@ -187,14 +188,55 @@ class MosaicWriter:
_check_error("estimated_file_size failed")
return out.value
+ @property
+ def num_row_groups(self):
+ if self._row_group_stats is None:
+ raise RuntimeError("writer is not closed yet")
+ return len(self._row_group_stats)
+
+ def get_row_group_statistics(self, rg_index):
+ if self._row_group_stats is None:
+ raise RuntimeError("writer is not closed yet")
+ return self._row_group_stats[rg_index]
+
def close(self):
if not self._closed and self._handle:
self._closed = True
rc = lib.mosaic_writer_close(self._handle)
- lib.mosaic_writer_free(self._handle)
- self._handle = None
if rc != 0:
+ lib.mosaic_writer_free(self._handle)
+ self._handle = None
_check_error("close failed")
+ self._collect_statistics()
+ lib.mosaic_writer_free(self._handle)
+ self._handle = None
+
+ def _collect_statistics(self):
+ n_rg = ctypes.c_uint32(0)
+ lib.mosaic_writer_num_row_groups(self._handle, ctypes.byref(n_rg))
+ all_stats = []
+ for rg in range(n_rg.value):
+ n_stats = ctypes.c_uint32(0)
+ lib.mosaic_writer_row_group_num_stats(self._handle, rg,
ctypes.byref(n_stats))
+ rg_stats = []
+ for i in range(n_stats.value):
+ col_idx = ctypes.c_uint32(0)
+ null_count = ctypes.c_uint64(0)
+ lib.mosaic_writer_row_group_stat_column_index(
+ self._handle, rg, i, ctypes.byref(col_idx))
+ lib.mosaic_writer_row_group_stat_null_count(
+ self._handle, rg, i, ctypes.byref(null_count))
+ min_len = ctypes.c_size_t(0)
+ max_len = ctypes.c_size_t(0)
+ min_ptr = lib.mosaic_writer_row_group_stat_min(
+ self._handle, rg, i, ctypes.byref(min_len))
+ max_ptr = lib.mosaic_writer_row_group_stat_max(
+ self._handle, rg, i, ctypes.byref(max_len))
+ min_val = bytes(min_ptr[:min_len.value]) if min_ptr and
min_len.value > 0 else None
+ max_val = bytes(max_ptr[:max_len.value]) if max_ptr and
max_len.value > 0 else None
+ rg_stats.append(ColumnStatistics(col_idx.value,
null_count.value, min_val, max_val))
+ all_stats.append(rg_stats)
+ self._row_group_stats = all_stats
def __enter__(self):
return self
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 6ec64fc..aff877e 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -588,3 +588,139 @@ class TestWriter:
with pytest.raises(RuntimeError, match="writer is closed"):
writer.write(batch)
+
+ def test_writer_stats_basic(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.utf8()),
+ pa.field("score", pa.float64()),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([i * 10 for i in range(10)], type=pa.int32()),
+ pa.array([f"item_{i}" for i in range(10)]),
+ pa.array([i * 1.1 for i in range(10)]),
+ ],
+ names=["id", "name", "score"],
+ )
+
+ opts = WriterOptions(stats_columns=[0, 2])
+ buf = io.BytesIO()
+ writer = MosaicWriter(buf, pa_schema, opts)
+ writer.write(batch)
+ writer.close()
+
+ assert writer.num_row_groups >= 1
+ stats = writer.get_row_group_statistics(0)
+ assert len(stats) > 0
+ for stat in stats:
+ assert isinstance(stat, ColumnStatistics)
+ assert stat.column_index in (0, 2)
+ assert stat.null_count == 0
+ assert stat.has_min_max
+ assert stat.min is not None
+ assert stat.max is not None
+
+ id_stat = next(s for s in stats if s.column_index == 0)
+ min_id = struct.unpack(">i", id_stat.min)[0]
+ max_id = struct.unpack(">i", id_stat.max)[0]
+ assert min_id == 0
+ assert max_id == 90
+
+ def test_writer_stats_with_nulls(self):
+ pa_schema = pa.schema(
+ [pa.field("a", pa.int32()), pa.field("b", pa.int64())]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([10, None, 5, 20], type=pa.int32()),
+ pa.array([None, None, 100, 50], type=pa.int64()),
+ ],
+ names=["a", "b"],
+ )
+
+ opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+ buf = io.BytesIO()
+ writer = MosaicWriter(buf, pa_schema, opts)
+ writer.write(batch)
+ writer.close()
+
+ assert writer.num_row_groups == 1
+ stats = writer.get_row_group_statistics(0)
+ assert len(stats) == 2
+
+ a_stat = next(s for s in stats if s.column_index == 0)
+ assert a_stat.null_count == 1
+ assert a_stat.has_min_max
+ min_a = struct.unpack(">i", a_stat.min)[0]
+ max_a = struct.unpack(">i", a_stat.max)[0]
+ assert min_a == 5
+ assert max_a == 20
+
+ b_stat = next(s for s in stats if s.column_index == 1)
+ assert b_stat.null_count == 2
+ assert b_stat.has_min_max
+ min_b = struct.unpack(">q", b_stat.min)[0]
+ max_b = struct.unpack(">q", b_stat.max)[0]
+ assert min_b == 50
+ assert max_b == 100
+
+ def test_writer_stats_all_null(self):
+ pa_schema = pa.schema([pa.field("x", pa.int32())])
+
+ batch = pa.record_batch(
+ [pa.array([None, None, None], type=pa.int32())], names=["x"]
+ )
+
+ opts = WriterOptions(stats_columns=[0], num_buckets=1)
+ buf = io.BytesIO()
+ writer = MosaicWriter(buf, pa_schema, opts)
+ writer.write(batch)
+ writer.close()
+
+ assert writer.num_row_groups == 1
+ stats = writer.get_row_group_statistics(0)
+ assert len(stats) == 1
+ assert stats[0].null_count == 3
+ assert not stats[0].has_min_max
+ assert stats[0].min is None
+ assert stats[0].max is None
+
+ def test_writer_stats_matches_reader_stats(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("score", pa.float64()),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(50)), type=pa.int32()),
+ pa.array([i * 2.0 for i in range(50)]),
+ ],
+ names=["id", "score"],
+ )
+
+ opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+ buf = io.BytesIO()
+ writer = MosaicWriter(buf, pa_schema, opts)
+ writer.write(batch)
+ writer.close()
+
+ data = buf.getvalue()
+ with _reader_from_bytes(data) as reader:
+ for rg in range(writer.num_row_groups):
+ w_stats = writer.get_row_group_statistics(rg)
+ r_stats = reader.get_row_group_statistics(rg)
+ assert len(w_stats) == len(r_stats)
+ for ws, rs in zip(w_stats, r_stats):
+ assert ws.column_index == rs.column_index
+ assert ws.null_count == rs.null_count
+ assert ws.has_min_max == rs.has_min_max
+ assert ws.min == rs.min
+ assert ws.max == rs.max