This is an automated email from the ASF dual-hosted git repository. JingsongLi pushed a commit to branch write_stats in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
commit 593c6433949dbcde5f8868e1002c8f7f1c349181 Author: JingsongLi <[email protected]> AuthorDate: Wed May 20 15:57:36 2026 +0800 Add C++ writer stats API and tests for all 4 languages - Add num_row_groups() and get_row_group_statistics() to C++ Writer class in mosaic.hpp, matching the documented API - Restructure C++ Writer lifecycle: close() no longer frees the handle, allowing stats access after close (destructor frees) - Move ColumnStatistics struct before Writer class to resolve ordering - Add writer stats tests for Rust, Java, Python, and C++ Co-Authored-By: Claude Opus 4.6 <[email protected]> --- core/src/reader_tests.rs | 165 +++++++++++++++ cpp/test_mosaic.cpp | 235 ++++++++++++++++++++- include/mosaic.hpp | 60 ++++-- .../apache/paimon/mosaic/MosaicRoundtripTest.java | 175 +++++++++++++++ python/tests/test_mosaic.py | 136 ++++++++++++ 5 files changed, 756 insertions(+), 15 deletions(-) diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs index b519379..d716b08 100644 --- a/core/src/reader_tests.rs +++ b/core/src/reader_tests.rs @@ -3457,3 +3457,168 @@ fn test_file_open_single_io() { "file open should use only 1 IO when metadata fits in tail prefetch" ); } + +#[test] +fn test_writer_stats_basic() { + let columns = vec![ + ("id".to_string(), DataType::Int32, true), + ("name".to_string(), DataType::Utf8, true), + ("score".to_string(), DataType::Float64, true), + ]; + let out = MemOutputFile::new(); + let mut writer = MosaicWriter::new( + out, + &columns_to_arrow_schema(&columns), + WriterOptions { + compression: COMPRESSION_NONE, + stats_columns: vec![0, 2], + num_buckets: 2, + ..Default::default() + }, + ) + .unwrap(); + + let rows: Vec<Vec<Value>> = (0..100) + .map(|i| { + vec![ + Value::Integer(i * 2), + Value::String(format!("row_{}", i).into_bytes()), + Value::Double(i as f64 * 0.5), + ] + }) + .collect(); + write_values(&mut writer, &columns, &rows); + writer.close().unwrap(); + + assert_eq!(writer.num_row_groups(), 1); + let stats = writer.row_group_stats(0); + assert_eq!(stats.len(), 2); + + assert_eq!(stats[0].column_index, 0); + assert_eq!(stats[0].null_count, 0); + assert!(matches!(&stats[0].min, Some(Value::Integer(0)))); + assert!(matches!(&stats[0].max, Some(Value::Integer(198)))); + + assert_eq!(stats[1].column_index, 2); + assert_eq!(stats[1].null_count, 0); + match &stats[1].min { + Some(Value::Double(v)) => assert!(v.abs() < 1e-10), + other => panic!("expected Double(0.0), got {:?}", other), + } + match &stats[1].max { + Some(Value::Double(v)) => assert!((*v - 49.5).abs() < 1e-10), + other => panic!("expected Double(49.5), got {:?}", other), + } +} + +#[test] +fn test_writer_stats_with_nulls() { + let columns = vec![ + ("a".to_string(), DataType::Int32, true), + ("b".to_string(), DataType::Int64, true), + ]; + let out = MemOutputFile::new(); + let mut writer = MosaicWriter::new( + out, + &columns_to_arrow_schema(&columns), + WriterOptions { + compression: COMPRESSION_NONE, + stats_columns: vec![0, 1], + num_buckets: 1, + ..Default::default() + }, + ) + .unwrap(); + + let rows = vec![ + vec![Value::Integer(10), Value::Null], + vec![Value::Null, Value::Null], + vec![Value::Integer(5), Value::BigInt(100)], + vec![Value::Integer(20), Value::BigInt(50)], + ]; + write_values(&mut writer, &columns, &rows); + writer.close().unwrap(); + + assert_eq!(writer.num_row_groups(), 1); + let stats = writer.row_group_stats(0); + assert_eq!(stats.len(), 2); + + assert_eq!(stats[0].column_index, 0); + assert_eq!(stats[0].null_count, 1); + assert!(matches!(&stats[0].min, Some(Value::Integer(5)))); + assert!(matches!(&stats[0].max, Some(Value::Integer(20)))); + + assert_eq!(stats[1].column_index, 1); + assert_eq!(stats[1].null_count, 2); + assert!(matches!(&stats[1].min, Some(Value::BigInt(50)))); + assert!(matches!(&stats[1].max, Some(Value::BigInt(100)))); +} + +#[test] +fn test_writer_stats_all_null() { + let columns = vec![("x".to_string(), DataType::Int32, true)]; + let out = MemOutputFile::new(); + let mut writer = MosaicWriter::new( + out, + &columns_to_arrow_schema(&columns), + WriterOptions { + compression: COMPRESSION_NONE, + stats_columns: vec![0], + num_buckets: 1, + ..Default::default() + }, + ) + .unwrap(); + + let rows: Vec<Vec<Value>> = (0..10).map(|_| vec![Value::Null]).collect(); + write_values(&mut writer, &columns, &rows); + writer.close().unwrap(); + + assert_eq!(writer.num_row_groups(), 1); + let stats = writer.row_group_stats(0); + assert_eq!(stats.len(), 1); + assert_eq!(stats[0].null_count, 10); + assert!(stats[0].min.is_none()); + assert!(stats[0].max.is_none()); +} + +#[test] +fn test_writer_stats_matches_reader_stats() { + let columns = vec![ + ("id".to_string(), DataType::Int32, true), + ("score".to_string(), DataType::Float64, true), + ]; + let out = MemOutputFile::new(); + let mut writer = MosaicWriter::new( + out, + &columns_to_arrow_schema(&columns), + WriterOptions { + compression: COMPRESSION_NONE, + stats_columns: vec![0, 1], + num_buckets: 1, + ..Default::default() + }, + ) + .unwrap(); + + let rows: Vec<Vec<Value>> = (0..50) + .map(|i| vec![Value::Integer(i), Value::Double(i as f64 * 2.0)]) + .collect(); + write_values(&mut writer, &columns, &rows); + writer.close().unwrap(); + + let writer_stats = writer.row_group_stats(0); + + let data = writer.output().buf.clone(); + let len = data.len() as u64; + let reader = MosaicReader::new(ByteArrayInputFile::new(data), len).unwrap(); + let reader_stats = reader.row_group_stats(0).unwrap(); + + assert_eq!(writer_stats.len(), reader_stats.len()); + for (ws, rs) in writer_stats.iter().zip(reader_stats.iter()) { + assert_eq!(ws.column_index, rs.column_index); + assert_eq!(ws.null_count, rs.null_count); + assert_eq!(format!("{:?}", ws.min), format!("{:?}", rs.min)); + assert_eq!(format!("{:?}", ws.max), format!("{:?}", rs.max)); + } +} diff --git a/cpp/test_mosaic.cpp b/cpp/test_mosaic.cpp index 4de8cad..eacdfcf 100644 --- a/cpp/test_mosaic.cpp +++ b/cpp/test_mosaic.cpp @@ -466,6 +466,235 @@ static void test_multiple_row_groups() { printf(" PASS test_multiple_row_groups\n"); } +static void test_writer_stats() { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("name", arrow::utf8()), + arrow::field("score", arrow::float64()), + }); + + arrow::Int32Builder id_b; + arrow::StringBuilder name_b; + arrow::DoubleBuilder score_b; + for (int i = 0; i < 10; i++) { + assert(id_b.Append(i * 10).ok()); + assert(name_b.Append("item_" + std::to_string(i)).ok()); + assert(score_b.Append(i * 1.1).ok()); + } + auto batch = arrow::RecordBatch::Make(schema, 10, { + id_b.Finish().ValueUnsafe(), name_b.Finish().ValueUnsafe(), + score_b.Finish().ValueUnsafe(), + }); + + mosaic::WriterOptions opts; + uint32_t stats_cols[] = {0, 2}; + opts.stats_columns = stats_cols; + opts.num_stats_columns = 2; + + MemBuffer write_buf; + struct ArrowSchema c_schema; + auto st = arrow::ExportSchema(*schema, &c_schema); + assert(st.ok()); + mosaic::Writer writer(make_output(write_buf), &c_schema, opts); + + struct ArrowArray c_array; + struct ArrowSchema c_batch_schema; + st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema); + assert(st.ok()); + writer.write(&c_array, &c_batch_schema); + writer.close(); + + ASSERT_EQ(writer.num_row_groups(), 1u); + auto stats = writer.get_row_group_statistics(0); + ASSERT_TRUE(stats.size() > 0); + + for (auto& s : stats) { + ASSERT_TRUE(s.column_index == 0 || s.column_index == 2); + ASSERT_EQ(s.null_count, 0u); + ASSERT_TRUE(s.has_min_max()); + } + + auto id_stat = std::find_if(stats.begin(), stats.end(), + [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; }); + ASSERT_TRUE(id_stat != stats.end()); + ASSERT_EQ(id_stat->min_value.size(), 4u); + ASSERT_EQ(id_stat->max_value.size(), 4u); + int32_t min_id = 0, max_id = 0; + memcpy(&min_id, id_stat->min_value.data(), 4); + memcpy(&max_id, id_stat->max_value.data(), 4); + min_id = __builtin_bswap32(min_id); + max_id = __builtin_bswap32(max_id); + ASSERT_EQ(min_id, 0); + ASSERT_EQ(max_id, 90); + printf(" PASS test_writer_stats\n"); +} + +static void test_writer_stats_with_nulls() { + auto schema = arrow::schema({ + arrow::field("a", arrow::int32()), + arrow::field("b", arrow::int64()), + }); + + arrow::Int32Builder a_b; + assert(a_b.Append(10).ok()); + assert(a_b.AppendNull().ok()); + assert(a_b.Append(5).ok()); + assert(a_b.Append(20).ok()); + + arrow::Int64Builder b_b; + assert(b_b.AppendNull().ok()); + assert(b_b.AppendNull().ok()); + assert(b_b.Append(100).ok()); + assert(b_b.Append(50).ok()); + + auto batch = arrow::RecordBatch::Make(schema, 4, { + a_b.Finish().ValueUnsafe(), b_b.Finish().ValueUnsafe(), + }); + + mosaic::WriterOptions opts; + opts.num_buckets = 1; + uint32_t stats_cols[] = {0, 1}; + opts.stats_columns = stats_cols; + opts.num_stats_columns = 2; + + MemBuffer write_buf; + struct ArrowSchema c_schema; + auto st = arrow::ExportSchema(*schema, &c_schema); + assert(st.ok()); + mosaic::Writer writer(make_output(write_buf), &c_schema, opts); + + struct ArrowArray c_array; + struct ArrowSchema c_batch_schema; + st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema); + assert(st.ok()); + writer.write(&c_array, &c_batch_schema); + writer.close(); + + ASSERT_EQ(writer.num_row_groups(), 1u); + auto stats = writer.get_row_group_statistics(0); + ASSERT_EQ(stats.size(), 2u); + + auto a_stat = std::find_if(stats.begin(), stats.end(), + [](const mosaic::ColumnStatistics& s) { return s.column_index == 0; }); + ASSERT_TRUE(a_stat != stats.end()); + ASSERT_EQ(a_stat->null_count, 1u); + ASSERT_TRUE(a_stat->has_min_max()); + int32_t min_a = 0, max_a = 0; + memcpy(&min_a, a_stat->min_value.data(), 4); + memcpy(&max_a, a_stat->max_value.data(), 4); + min_a = __builtin_bswap32(min_a); + max_a = __builtin_bswap32(max_a); + ASSERT_EQ(min_a, 5); + ASSERT_EQ(max_a, 20); + + auto b_stat = std::find_if(stats.begin(), stats.end(), + [](const mosaic::ColumnStatistics& s) { return s.column_index == 1; }); + ASSERT_TRUE(b_stat != stats.end()); + ASSERT_EQ(b_stat->null_count, 2u); + ASSERT_TRUE(b_stat->has_min_max()); + int64_t min_b = 0, max_b = 0; + memcpy(&min_b, b_stat->min_value.data(), 8); + memcpy(&max_b, b_stat->max_value.data(), 8); + min_b = __builtin_bswap64(min_b); + max_b = __builtin_bswap64(max_b); + ASSERT_EQ(min_b, 50); + ASSERT_EQ(max_b, 100); + printf(" PASS test_writer_stats_with_nulls\n"); +} + +static void test_writer_stats_all_null() { + auto schema = arrow::schema({ + arrow::field("x", arrow::int32()), + }); + + arrow::Int32Builder x_b; + assert(x_b.AppendNull().ok()); + assert(x_b.AppendNull().ok()); + assert(x_b.AppendNull().ok()); + + auto batch = arrow::RecordBatch::Make(schema, 3, { + x_b.Finish().ValueUnsafe(), + }); + + mosaic::WriterOptions opts; + opts.num_buckets = 1; + uint32_t stats_cols[] = {0}; + opts.stats_columns = stats_cols; + opts.num_stats_columns = 1; + + MemBuffer write_buf; + struct ArrowSchema c_schema; + auto st = arrow::ExportSchema(*schema, &c_schema); + assert(st.ok()); + mosaic::Writer writer(make_output(write_buf), &c_schema, opts); + + struct ArrowArray c_array; + struct ArrowSchema c_batch_schema; + st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema); + assert(st.ok()); + writer.write(&c_array, &c_batch_schema); + writer.close(); + + ASSERT_EQ(writer.num_row_groups(), 1u); + auto stats = writer.get_row_group_statistics(0); + ASSERT_EQ(stats.size(), 1u); + ASSERT_EQ(stats[0].null_count, 3u); + ASSERT_TRUE(!stats[0].has_min_max()); + printf(" PASS test_writer_stats_all_null\n"); +} + +static void test_writer_stats_matches_reader() { + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("value", arrow::float64()), + }); + + arrow::Int32Builder id_b; + arrow::DoubleBuilder val_b; + for (int i = 0; i < 20; i++) { + assert(id_b.Append(i * 5).ok()); + assert(val_b.Append(i * 2.5).ok()); + } + auto batch = arrow::RecordBatch::Make(schema, 20, { + id_b.Finish().ValueUnsafe(), val_b.Finish().ValueUnsafe(), + }); + + mosaic::WriterOptions opts; + opts.num_buckets = 1; + uint32_t stats_cols[] = {0, 1}; + opts.stats_columns = stats_cols; + opts.num_stats_columns = 2; + + MemBuffer write_buf; + struct ArrowSchema c_schema; + auto st = arrow::ExportSchema(*schema, &c_schema); + assert(st.ok()); + mosaic::Writer writer(make_output(write_buf), &c_schema, opts); + + struct ArrowArray c_array; + struct ArrowSchema c_batch_schema; + st = arrow::ExportRecordBatch(*batch, &c_array, &c_batch_schema); + assert(st.ok()); + writer.write(&c_array, &c_batch_schema); + writer.close(); + + auto writer_stats = writer.get_row_group_statistics(0); + + MemBuffer buf; + buf.data = write_buf.data; + auto reader = mosaic::make_reader(make_input(buf), buf.data.size()); + auto reader_stats = reader.get_row_group_statistics(0); + + ASSERT_EQ(writer_stats.size(), reader_stats.size()); + for (size_t i = 0; i < writer_stats.size(); i++) { + ASSERT_EQ(writer_stats[i].column_index, reader_stats[i].column_index); + ASSERT_EQ(writer_stats[i].null_count, reader_stats[i].null_count); + ASSERT_EQ(writer_stats[i].min_value, reader_stats[i].min_value); + ASSERT_EQ(writer_stats[i].max_value, reader_stats[i].max_value); + } + printf(" PASS test_writer_stats_matches_reader\n"); +} + int main() { printf("Running Mosaic C++ tests...\n"); test_basic_roundtrip(); @@ -476,6 +705,10 @@ int main() { test_compression_zstd(); test_schema_roundtrip(); test_multiple_row_groups(); - printf("All %d tests passed.\n", 8); + test_writer_stats(); + test_writer_stats_with_nulls(); + test_writer_stats_all_null(); + test_writer_stats_matches_reader(); + printf("All %d tests passed.\n", 12); return 0; } diff --git a/include/mosaic.hpp b/include/mosaic.hpp index 45c8fd1..68ce2e7 100644 --- a/include/mosaic.hpp +++ b/include/mosaic.hpp @@ -94,6 +94,16 @@ struct WriterOptions { uint32_t page_size_threshold = 32 * 1024; }; +// ======================== Statistics ======================== + +struct ColumnStatistics { + uint32_t column_index; + uint64_t null_count; + std::vector<uint8_t> min_value; + std::vector<uint8_t> max_value; + bool has_min_max() const { return !min_value.empty(); } +}; + class Writer { public: /// Construct a writer. `arrow_schema` is a pointer to an ArrowSchema (Arrow C Data Interface). @@ -123,17 +133,18 @@ public: Writer(const Writer&) = delete; Writer& operator=(const Writer&) = delete; Writer(Writer&& other) noexcept - : callbacks_(std::move(other.callbacks_)), handle_(other.handle_) { + : callbacks_(std::move(other.callbacks_)), handle_(other.handle_), closed_(other.closed_) { other.handle_ = nullptr; } Writer& operator=(Writer&& other) noexcept { if (this != &other) { if (handle_) { - mosaic_writer_close(handle_); + if (!closed_) mosaic_writer_close(handle_); mosaic_writer_free(handle_); } callbacks_ = std::move(other.callbacks_); handle_ = other.handle_; + closed_ = other.closed_; other.handle_ = nullptr; } return *this; @@ -141,7 +152,7 @@ public: ~Writer() { if (handle_) { - mosaic_writer_close(handle_); + if (!closed_) mosaic_writer_close(handle_); mosaic_writer_free(handle_); } } @@ -160,22 +171,43 @@ public: } void close() { - check(mosaic_writer_close(handle_)); + if (!closed_) { + check(mosaic_writer_close(handle_)); + closed_ = true; + } + } + + uint32_t num_row_groups() const { + uint32_t out = 0; + check(mosaic_writer_num_row_groups(handle_, &out)); + return out; + } + + std::vector<ColumnStatistics> get_row_group_statistics(uint32_t rg_index) const { + uint32_t n = 0; + check(mosaic_writer_row_group_num_stats(handle_, rg_index, &n)); + std::vector<ColumnStatistics> result; + result.reserve(n); + for (uint32_t i = 0; i < n; i++) { + ColumnStatistics s; + check(mosaic_writer_row_group_stat_column_index(handle_, rg_index, i, &s.column_index)); + check(mosaic_writer_row_group_stat_null_count(handle_, rg_index, i, &s.null_count)); + size_t min_len = 0, max_len = 0; + const uint8_t* min_ptr = mosaic_writer_row_group_stat_min(handle_, rg_index, i, &min_len); + const uint8_t* max_ptr = mosaic_writer_row_group_stat_max(handle_, rg_index, i, &max_len); + if (min_ptr && min_len > 0) + s.min_value.assign(min_ptr, min_ptr + min_len); + if (max_ptr && max_len > 0) + s.max_value.assign(max_ptr, max_ptr + max_len); + result.push_back(std::move(s)); + } + return result; } private: std::shared_ptr<OutputFile> callbacks_; MosaicWriterHandle* handle_ = nullptr; -}; - -// ======================== Statistics ======================== - -struct ColumnStatistics { - uint32_t column_index; - uint64_t null_count; - std::vector<uint8_t> min_value; - std::vector<uint8_t> max_value; - bool has_min_max() const { return !min_value.empty(); } + bool closed_ = false; }; // ======================== Reader ======================== diff --git a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java index 523c55e..1aaf9bb 100644 --- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java +++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java @@ -727,4 +727,179 @@ public class MosaicRoundtripTest { assertTrue(readSchema.getFields().get(1).isNullable()); } } + + @Test + public void testWriterStats() { + Schema arrowSchema = new Schema(Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("name", ArrowType.Utf8.INSTANCE), + Field.nullable("score", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)) + )); + + WriterOptions opts = new WriterOptions().statsColumns(0, 2); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, allocator); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + IntVector ids = (IntVector) root.getVector("id"); + VarCharVector names = (VarCharVector) root.getVector("name"); + Float8Vector scores = (Float8Vector) root.getVector("score"); + + int n = 10; + ids.allocateNew(n); + names.allocateNew(n); + scores.allocateNew(n); + + for (int i = 0; i < n; i++) { + ids.set(i, i * 10); + names.setSafe(i, ("item_" + i).getBytes()); + scores.set(i, i * 1.1); + } + root.setRowCount(n); + writer.write(root); + } + writer.close(); + + assertEquals(1, writer.numRowGroups()); + List<ColumnStatistics> stats = writer.getRowGroupStatistics(0); + assertTrue(stats.size() > 0); + for (ColumnStatistics stat : stats) { + assertTrue(stat.getColumnIndex() == 0 || stat.getColumnIndex() == 2); + assertEquals(0, stat.getNullCount()); + assertTrue(stat.hasMinMax()); + assertNotNull(stat.getMin()); + assertNotNull(stat.getMax()); + } + + ColumnStatistics idStat = stats.stream().filter(s -> s.getColumnIndex() == 0).findFirst().get(); + int minId = ByteBuffer.wrap(idStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt(); + int maxId = ByteBuffer.wrap(idStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt(); + assertEquals(0, minId); + assertEquals(90, maxId); + } + + @Test + public void testWriterStatsWithNulls() { + Schema arrowSchema = new Schema(Arrays.asList( + Field.nullable("a", new ArrowType.Int(32, true)), + Field.nullable("b", new ArrowType.Int(64, true)) + )); + + WriterOptions opts = new WriterOptions().statsColumns(0, 1).numBuckets(1); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, allocator); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + IntVector aVec = (IntVector) root.getVector("a"); + BigIntVector bVec = (BigIntVector) root.getVector("b"); + aVec.allocateNew(4); + bVec.allocateNew(4); + + aVec.set(0, 10); + aVec.setNull(1); + aVec.set(2, 5); + aVec.set(3, 20); + + bVec.setNull(0); + bVec.setNull(1); + bVec.set(2, 100); + bVec.set(3, 50); + + root.setRowCount(4); + writer.write(root); + } + writer.close(); + + assertEquals(1, writer.numRowGroups()); + List<ColumnStatistics> stats = writer.getRowGroupStatistics(0); + assertEquals(2, stats.size()); + + ColumnStatistics aStat = stats.stream().filter(s -> s.getColumnIndex() == 0).findFirst().get(); + assertEquals(1, aStat.getNullCount()); + assertTrue(aStat.hasMinMax()); + int minA = ByteBuffer.wrap(aStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt(); + int maxA = ByteBuffer.wrap(aStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt(); + assertEquals(5, minA); + assertEquals(20, maxA); + + ColumnStatistics bStat = stats.stream().filter(s -> s.getColumnIndex() == 1).findFirst().get(); + assertEquals(2, bStat.getNullCount()); + assertTrue(bStat.hasMinMax()); + long minB = ByteBuffer.wrap(bStat.getMin()).order(ByteOrder.BIG_ENDIAN).getLong(); + long maxB = ByteBuffer.wrap(bStat.getMax()).order(ByteOrder.BIG_ENDIAN).getLong(); + assertEquals(50, minB); + assertEquals(100, maxB); + } + + @Test + public void testWriterStatsAllNull() { + Schema arrowSchema = new Schema(Arrays.asList( + Field.nullable("x", new ArrowType.Int(32, true)) + )); + + WriterOptions opts = new WriterOptions().statsColumns(0).numBuckets(1); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, allocator); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + IntVector xVec = (IntVector) root.getVector("x"); + xVec.allocateNew(3); + xVec.setNull(0); + xVec.setNull(1); + xVec.setNull(2); + root.setRowCount(3); + writer.write(root); + } + writer.close(); + + assertEquals(1, writer.numRowGroups()); + List<ColumnStatistics> stats = writer.getRowGroupStatistics(0); + assertEquals(1, stats.size()); + assertEquals(3, stats.get(0).getNullCount()); + assertFalse(stats.get(0).hasMinMax()); + } + + @Test + public void testWriterStatsMatchesReaderStats() { + Schema arrowSchema = new Schema(Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("value", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)) + )); + + WriterOptions opts = new WriterOptions().statsColumns(0, 1).numBuckets(1); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + MosaicWriter writer = new MosaicWriter(baos, arrowSchema, opts, allocator); + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + IntVector ids = (IntVector) root.getVector("id"); + Float8Vector values = (Float8Vector) root.getVector("value"); + int n = 20; + ids.allocateNew(n); + values.allocateNew(n); + for (int i = 0; i < n; i++) { + ids.set(i, i * 5); + values.set(i, i * 2.5); + } + root.setRowCount(n); + writer.write(root); + } + writer.close(); + + byte[] data = baos.toByteArray(); + try (MosaicReader reader = readerFromBytes(data)) { + List<ColumnStatistics> writerStats = writer.getRowGroupStatistics(0); + List<ColumnStatistics> readerStats = reader.getRowGroupStatistics(0); + + assertEquals(writerStats.size(), readerStats.size()); + for (int i = 0; i < writerStats.size(); i++) { + ColumnStatistics ws = writerStats.get(i); + ColumnStatistics rs = readerStats.get(i); + assertEquals(ws.getColumnIndex(), rs.getColumnIndex()); + assertEquals(ws.getNullCount(), rs.getNullCount()); + assertEquals(ws.hasMinMax(), rs.hasMinMax()); + assertArrayEquals(ws.getMin(), rs.getMin()); + assertArrayEquals(ws.getMax(), rs.getMax()); + } + } + } } diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py index 6ec64fc..aff877e 100644 --- a/python/tests/test_mosaic.py +++ b/python/tests/test_mosaic.py @@ -588,3 +588,139 @@ class TestWriter: with pytest.raises(RuntimeError, match="writer is closed"): writer.write(batch) + + def test_writer_stats_basic(self): + pa_schema = pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("name", pa.utf8()), + pa.field("score", pa.float64()), + ] + ) + + batch = pa.record_batch( + [ + pa.array([i * 10 for i in range(10)], type=pa.int32()), + pa.array([f"item_{i}" for i in range(10)]), + pa.array([i * 1.1 for i in range(10)]), + ], + names=["id", "name", "score"], + ) + + opts = WriterOptions(stats_columns=[0, 2]) + buf = io.BytesIO() + writer = MosaicWriter(buf, pa_schema, opts) + writer.write(batch) + writer.close() + + assert writer.num_row_groups >= 1 + stats = writer.get_row_group_statistics(0) + assert len(stats) > 0 + for stat in stats: + assert isinstance(stat, ColumnStatistics) + assert stat.column_index in (0, 2) + assert stat.null_count == 0 + assert stat.has_min_max + assert stat.min is not None + assert stat.max is not None + + id_stat = next(s for s in stats if s.column_index == 0) + min_id = struct.unpack(">i", id_stat.min)[0] + max_id = struct.unpack(">i", id_stat.max)[0] + assert min_id == 0 + assert max_id == 90 + + def test_writer_stats_with_nulls(self): + pa_schema = pa.schema( + [pa.field("a", pa.int32()), pa.field("b", pa.int64())] + ) + + batch = pa.record_batch( + [ + pa.array([10, None, 5, 20], type=pa.int32()), + pa.array([None, None, 100, 50], type=pa.int64()), + ], + names=["a", "b"], + ) + + opts = WriterOptions(stats_columns=[0, 1], num_buckets=1) + buf = io.BytesIO() + writer = MosaicWriter(buf, pa_schema, opts) + writer.write(batch) + writer.close() + + assert writer.num_row_groups == 1 + stats = writer.get_row_group_statistics(0) + assert len(stats) == 2 + + a_stat = next(s for s in stats if s.column_index == 0) + assert a_stat.null_count == 1 + assert a_stat.has_min_max + min_a = struct.unpack(">i", a_stat.min)[0] + max_a = struct.unpack(">i", a_stat.max)[0] + assert min_a == 5 + assert max_a == 20 + + b_stat = next(s for s in stats if s.column_index == 1) + assert b_stat.null_count == 2 + assert b_stat.has_min_max + min_b = struct.unpack(">q", b_stat.min)[0] + max_b = struct.unpack(">q", b_stat.max)[0] + assert min_b == 50 + assert max_b == 100 + + def test_writer_stats_all_null(self): + pa_schema = pa.schema([pa.field("x", pa.int32())]) + + batch = pa.record_batch( + [pa.array([None, None, None], type=pa.int32())], names=["x"] + ) + + opts = WriterOptions(stats_columns=[0], num_buckets=1) + buf = io.BytesIO() + writer = MosaicWriter(buf, pa_schema, opts) + writer.write(batch) + writer.close() + + assert writer.num_row_groups == 1 + stats = writer.get_row_group_statistics(0) + assert len(stats) == 1 + assert stats[0].null_count == 3 + assert not stats[0].has_min_max + assert stats[0].min is None + assert stats[0].max is None + + def test_writer_stats_matches_reader_stats(self): + pa_schema = pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("score", pa.float64()), + ] + ) + + batch = pa.record_batch( + [ + pa.array(list(range(50)), type=pa.int32()), + pa.array([i * 2.0 for i in range(50)]), + ], + names=["id", "score"], + ) + + opts = WriterOptions(stats_columns=[0, 1], num_buckets=1) + buf = io.BytesIO() + writer = MosaicWriter(buf, pa_schema, opts) + writer.write(batch) + writer.close() + + data = buf.getvalue() + with _reader_from_bytes(data) as reader: + for rg in range(writer.num_row_groups): + w_stats = writer.get_row_group_statistics(rg) + r_stats = reader.get_row_group_statistics(rg) + assert len(w_stats) == len(r_stats) + for ws, rs in zip(w_stats, r_stats): + assert ws.column_index == rs.column_index + assert ws.null_count == rs.null_count + assert ws.has_min_max == rs.has_min_max + assert ws.min == rs.min + assert ws.max == rs.max
