This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 3f50e0b Expose row_group_num_rows from MosaicReader for all language
bindings (#17)
3f50e0b is described below
commit 3f50e0b9d6f6398ff299fb2fd0557e05e8090c67
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 20 18:08:10 2026 +0800
Expose row_group_num_rows from MosaicReader for all language bindings (#17)
Add a lightweight API to get the row count of a specific row group
directly from file metadata without opening a full RowGroupReader.
---
core/src/reader.rs | 15 +++++
core/src/reader_tests.rs | 66 ++++++++++++++++++++++
docs/cpp-api.html | 1 +
docs/java-api.html | 1 +
docs/python-api.html | 1 +
docs/rust-api.html | 6 ++
ffi/src/lib.rs | 25 ++++++++
include/mosaic.hpp | 6 ++
.../org/apache/paimon/mosaic/MosaicReader.java | 8 +++
.../java/org/apache/paimon/mosaic/NativeLib.java | 3 +
.../apache/paimon/mosaic/MosaicRoundtripTest.java | 66 ++++++++++++++++++++++
jni/src/lib.rs | 19 +++++++
python/mosaic/_ffi.py | 5 ++
python/mosaic/mosaic.py | 7 +++
python/tests/test_mosaic.py | 48 ++++++++++++++++
15 files changed, 277 insertions(+)
diff --git a/core/src/reader.rs b/core/src/reader.rs
index a759ae5..d408519 100644
--- a/core/src/reader.rs
+++ b/core/src/reader.rs
@@ -197,6 +197,7 @@ pub trait ReaderAccess {
columns: &[usize],
) -> io::Result<RowGroupReader>;
fn row_group_stats(&self, rg_index: usize) -> io::Result<&[ColumnStats]>;
+ fn row_group_num_rows(&self, rg_index: usize) -> io::Result<usize>;
}
pub struct MosaicReader<I: InputFile> {
@@ -478,6 +479,20 @@ impl<I: InputFile> ReaderAccess for MosaicReader<I> {
Ok(&self.row_group_metas[rg_index].stats)
}
+ fn row_group_num_rows(&self, rg_index: usize) -> io::Result<usize> {
+ if rg_index >= self.row_group_metas.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!(
+ "row group index {} out of range (num_row_groups={})",
+ rg_index,
+ self.row_group_metas.len()
+ ),
+ ));
+ }
+ Ok(self.row_group_metas[rg_index].num_rows)
+ }
+
fn row_group_reader(&self, rg_index: usize) -> io::Result<RowGroupReader> {
let all_columns: Vec<usize> = (0..self.schema.columns.len()).collect();
self.row_group_reader_projected(rg_index, &all_columns)
diff --git a/core/src/reader_tests.rs b/core/src/reader_tests.rs
index d716b08..cdbce0c 100644
--- a/core/src/reader_tests.rs
+++ b/core/src/reader_tests.rs
@@ -3622,3 +3622,69 @@ fn test_writer_stats_matches_reader_stats() {
assert_eq!(format!("{:?}", ws.max), format!("{:?}", rs.max));
}
}
+
+#[test]
+fn test_row_group_num_rows() {
+ let columns = vec![
+ ("id".to_string(), DataType::Int32, false),
+ ("val".to_string(), DataType::Int64, true),
+ ];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions {
+ compression: COMPRESSION_NONE,
+ row_group_max_size: 200,
+ num_buckets: 1,
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let total_rows = 500;
+ let rows: Vec<Vec<Value>> = (0..total_rows)
+ .map(|i| vec![Value::Integer(i), Value::BigInt(i as i64 * 2)])
+ .collect();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+ let data = writer.output().buf.clone();
+ let len = data.len() as u64;
+ let reader = MosaicReader::new(ByteArrayInputFile::new(data),
len).unwrap();
+
+ assert!(reader.num_row_groups() > 1);
+
+ let mut total = 0usize;
+ for rg_idx in 0..reader.num_row_groups() {
+ let num_rows = reader.row_group_num_rows(rg_idx).unwrap();
+ assert!(num_rows > 0);
+ let mut rg = reader.row_group_reader(rg_idx).unwrap();
+ let batch = rg.read_columns().unwrap();
+ assert_eq!(num_rows, batch.num_rows());
+ total += num_rows;
+ }
+ assert_eq!(total, total_rows as usize);
+}
+
+#[test]
+fn test_row_group_num_rows_out_of_range() {
+ let columns = vec![("x".to_string(), DataType::Int32, false)];
+ let out = MemOutputFile::new();
+ let mut writer = MosaicWriter::new(
+ out,
+ &columns_to_arrow_schema(&columns),
+ WriterOptions::default(),
+ )
+ .unwrap();
+ let rows: Vec<Vec<Value>> = (0..10).map(|i|
vec![Value::Integer(i)]).collect();
+ write_values(&mut writer, &columns, &rows);
+ writer.close().unwrap();
+ let data = writer.output().buf.clone();
+ let len = data.len() as u64;
+ let reader = MosaicReader::new(ByteArrayInputFile::new(data),
len).unwrap();
+
+ assert_eq!(reader.num_row_groups(), 1);
+ assert_eq!(reader.row_group_num_rows(0).unwrap(), 10);
+ assert!(reader.row_group_num_rows(1).is_err());
+ assert!(reader.row_group_num_rows(999).is_err());
+}
diff --git a/docs/cpp-api.html b/docs/cpp-api.html
index 79f2203..60ab6a4 100644
--- a/docs/cpp-api.html
+++ b/docs/cpp-api.html
@@ -213,6 +213,7 @@ reader.export_schema(&ffi_schema);
</thead>
<tbody>
<tr><td><code>num_row_groups()</code></td><td><code>uint32_t</code></td><td>Row
group count</td></tr>
+
<tr><td><code>row_group_num_rows(rg)</code></td><td><code>uint32_t</code></td><td>Number
of rows in a specific row group</td></tr>
<tr><td><code>export_schema(&ffi_schema)</code></td><td><code>void</code></td><td>Export
schema via Arrow C Data Interface</td></tr>
<tr><td><code>read_row_group(rg, &array,
&schema)</code></td><td><code>void</code></td><td>Read all columns of a row
group</td></tr>
<tr><td><code>read_row_group(rg, cols, n, &array,
&schema)</code></td><td><code>void</code></td><td>Read projected columns of
a row group</td></tr>
diff --git a/docs/java-api.html b/docs/java-api.html
index 8e3446c..ebbac87 100644
--- a/docs/java-api.html
+++ b/docs/java-api.html
@@ -222,6 +222,7 @@
<tbody>
<tr><td><code>getSchema()</code></td><td><code>Schema</code></td><td>Arrow
Schema for the file</td></tr>
<tr><td><code>numRowGroups()</code></td><td><code>int</code></td><td>Row group
count</td></tr>
+
<tr><td><code>rowGroupNumRows(rg)</code></td><td><code>int</code></td><td>Number
of rows in a specific row group</td></tr>
<tr><td><code>readRowGroup(rg,
allocator)</code></td><td><code>VectorSchemaRoot</code></td><td>Read all
columns of a row group</td></tr>
<tr><td><code>readRowGroup(rg, cols,
allocator)</code></td><td><code>VectorSchemaRoot</code></td><td>Read projected
columns of a row group</td></tr>
<tr><td><code>getRowGroupStatistics(rg)</code></td><td><code>List<ColumnStatistics></code></td><td>Column
statistics for a row group</td></tr>
diff --git a/docs/python-api.html b/docs/python-api.html
index b66761b..cdf844f 100644
--- a/docs/python-api.html
+++ b/docs/python-api.html
@@ -189,6 +189,7 @@ score_col = reader.schema.get_field_index(<span
class="str">"score"</span>)</cod
<tbody>
<tr><td><code>schema</code></td><td><code>pa.Schema</code></td><td>Arrow Schema
for the file</td></tr>
<tr><td><code>num_row_groups</code></td><td><code>int</code></td><td>Row group
count</td></tr>
+
<tr><td><code>row_group_num_rows(rg)</code></td><td><code>int</code></td><td>Number
of rows in a specific row group</td></tr>
<tr><td><code>read_row_group(rg,
columns=None)</code></td><td><code>pa.RecordBatch</code></td><td>Read a row
group (optionally with projection)</td></tr>
<tr><td><code>read_all(columns=None)</code></td><td><code>pa.Table</code></td><td>Read
entire file as a Table</td></tr>
<tr><td><code>get_row_group_statistics(rg)</code></td><td><code>list[ColumnStatistics]</code></td><td>Column
statistics for a row group</td></tr>
diff --git a/docs/rust-api.html b/docs/rust-api.html
index bd82f7f..0ca9070 100644
--- a/docs/rust-api.html
+++ b/docs/rust-api.html
@@ -238,6 +238,12 @@ writer.close().unwrap();
}
}</code></pre>
+ <h3>Getting Row Group Row Count</h3>
+<pre><code><span class="kw">for</span> rg_idx <span class="kw">in</span> <span
class="num">0</span>..reader.num_row_groups() {
+ <span class="kw">let</span> num_rows = reader.row_group_num_rows(rg_idx)?;
+ println!(<span class="str">"row group {} has {} rows"</span>, rg_idx,
num_rows);
+}</code></pre>
+
<h3>Reading Stats from Reader</h3>
<pre><code><span class="kw">for</span> rg_idx <span class="kw">in</span> <span
class="num">0</span>..reader.num_row_groups() {
<span class="kw">let</span> stats = reader.row_group_stats(rg_idx)?;
diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs
index e2a708a..abff8a4 100644
--- a/ffi/src/lib.rs
+++ b/ffi/src/lib.rs
@@ -703,6 +703,31 @@ pub unsafe extern "C" fn mosaic_row_group_reader_num_rows(
// ======================== Row Group Stats ========================
+/// Get number of rows in a row group.
+/// Returns 0 on success, -1 on error. Writes result to `out`.
+#[no_mangle]
+pub unsafe extern "C" fn mosaic_reader_row_group_num_rows(
+ handle: *const MosaicReaderHandle,
+ rg_index: u32,
+ out: *mut u32,
+) -> c_int {
+ if handle.is_null() || out.is_null() {
+ set_error("null pointer".into());
+ return -1;
+ }
+ let h = &*handle;
+ match h.reader.row_group_num_rows(rg_index as usize) {
+ Ok(n) => {
+ *out = n as u32;
+ 0
+ }
+ Err(e) => {
+ set_error(e.to_string());
+ -1
+ }
+ }
+}
+
/// Get number of stats entries for a row group.
/// Returns 0 on success, -1 on error. Writes result to `out`.
#[no_mangle]
diff --git a/include/mosaic.hpp b/include/mosaic.hpp
index 68ce2e7..a3379e3 100644
--- a/include/mosaic.hpp
+++ b/include/mosaic.hpp
@@ -289,6 +289,12 @@ public:
if (rc != 0) throw Error("record_batch_export failed");
}
+ uint32_t row_group_num_rows(uint32_t rg_index) const {
+ uint32_t out = 0;
+ check(mosaic_reader_row_group_num_rows(handle_, rg_index, &out));
+ return out;
+ }
+
std::vector<ColumnStatistics> get_row_group_statistics(uint32_t rg_index)
const {
uint32_t n = 0;
check(mosaic_reader_row_group_num_stats(handle_, rg_index, &n));
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
index 0b85c9b..f24822e 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
@@ -102,6 +102,14 @@ public class MosaicReader implements AutoCloseable {
}
}
+ public int rowGroupNumRows(int rgIndex) {
+ int result = NativeLib.nativeReaderRowGroupNumRows(handle, rgIndex);
+ if (result < 0) {
+ throw new RuntimeException("failed to get row group num rows for
index " + rgIndex);
+ }
+ return result;
+ }
+
public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
int n = NativeLib.nativeReaderRowGroupNumStats(handle, rgIndex);
if (n < 0) {
diff --git a/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
b/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
index 695173f..fc8b77e 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/NativeLib.java
@@ -130,6 +130,9 @@ final class NativeLib {
static native int nativeRowGroupReaderReadColumns(long handle, long
arrayAddr, long schemaAddr);
static native void nativeRowGroupReaderFree(long handle);
+ // Row group num rows
+ static native int nativeReaderRowGroupNumRows(long handle, int rgIndex);
+
// Row group stats
static native int nativeReaderRowGroupNumStats(long handle, int rgIndex);
static native int nativeReaderRowGroupStatColumnIndex(long handle, int
rgIndex, int statIndex);
diff --git
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index 1aaf9bb..25e36b7 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -902,4 +902,70 @@ public class MosaicRoundtripTest {
}
}
}
+
+ @Test
+ public void testRowGroupNumRows() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("data", new ArrowType.Int(64, true))
+ ));
+
+ WriterOptions opts = new
WriterOptions().compression(0).numBuckets(1).rowGroupMaxSize(200);
+
+ int totalRows = 500;
+ int batchSize = 10;
+ byte[] data = writeToBytes(arrowSchema, opts, writer -> {
+ for (int start = 0; start < totalRows; start += batchSize) {
+ try (VectorSchemaRoot root =
VectorSchemaRoot.create(arrowSchema, allocator)) {
+ IntVector idVec = (IntVector) root.getVector("id");
+ BigIntVector dataVec = (BigIntVector)
root.getVector("data");
+ idVec.allocateNew(batchSize);
+ dataVec.allocateNew(batchSize);
+ for (int i = 0; i < batchSize; i++) {
+ idVec.set(i, start + i);
+ dataVec.set(i, (long) (start + i) * 2);
+ }
+ root.setRowCount(batchSize);
+ writer.write(root);
+ }
+ }
+ });
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ assertTrue(reader.numRowGroups() > 1);
+ int sum = 0;
+ for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+ int numRows = reader.rowGroupNumRows(rg);
+ assertTrue(numRows > 0);
+ try (VectorSchemaRoot batch = reader.readRowGroup(rg,
allocator)) {
+ assertEquals(numRows, batch.getRowCount());
+ }
+ sum += numRows;
+ }
+ assertEquals(totalRows, sum);
+ }
+ }
+
+ @Test
+ public void testRowGroupNumRowsSingleRowGroup() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true))
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector xVec = (IntVector) root.getVector("x");
+ xVec.allocateNew(10);
+ for (int i = 0; i < 10; i++) {
+ xVec.set(i, i);
+ }
+ root.setRowCount(10);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ assertEquals(1, reader.numRowGroups());
+ assertEquals(10, reader.rowGroupNumRows(0));
+ }
+ }
}
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
index 00d7f6e..c82c819 100644
--- a/jni/src/lib.rs
+++ b/jni/src/lib.rs
@@ -791,6 +791,25 @@ pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeRowGroupRea
}
}
+// ======================== Row Group Num Rows ========================
+
+#[no_mangle]
+pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeReaderRowGroupNumRows(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+) -> jint {
+ if handle == 0 {
+ return -1;
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ match rh.reader.row_group_num_rows(rg_index as usize) {
+ Ok(n) => n as jint,
+ Err(_) => -1,
+ }
+}
+
// ======================== Row Group Stats ========================
#[no_mangle]
diff --git a/python/mosaic/_ffi.py b/python/mosaic/_ffi.py
index d7b0e56..8b54ae2 100644
--- a/python/mosaic/_ffi.py
+++ b/python/mosaic/_ffi.py
@@ -213,6 +213,11 @@ lib.mosaic_record_batch_export.restype = c_int
lib.mosaic_record_batch_free.argtypes = [c_void_p]
lib.mosaic_record_batch_free.restype = None
+# ======================== Row Group Num Rows ========================
+
+lib.mosaic_reader_row_group_num_rows.argtypes = [c_void_p, c_uint32,
POINTER(c_uint32)]
+lib.mosaic_reader_row_group_num_rows.restype = c_int
+
# ======================== Row Group Stats ========================
lib.mosaic_reader_row_group_num_stats.argtypes = [c_void_p, c_uint32,
POINTER(c_uint32)]
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 70fa4f0..66178a8 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -357,6 +357,13 @@ class MosaicReader:
schema = pa.schema([self._schema.field(i) for i in columns])
return pa.Table.from_batches([], schema=schema)
+ def row_group_num_rows(self, rg_index):
+ out = ctypes.c_uint32(0)
+ rc = lib.mosaic_reader_row_group_num_rows(self._handle, rg_index,
ctypes.byref(out))
+ if rc != 0:
+ _check_error("row_group_num_rows failed")
+ return out.value
+
def get_row_group_statistics(self, rg_index):
n_out = ctypes.c_uint32(0)
rc = lib.mosaic_reader_row_group_num_stats(self._handle, rg_index,
ctypes.byref(n_out))
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index aff877e..8373965 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -724,3 +724,51 @@ class TestWriter:
assert ws.has_min_max == rs.has_min_max
assert ws.min == rs.min
assert ws.max == rs.max
+
+ def test_row_group_num_rows(self):
+ pa_schema = pa.schema(
+ [pa.field("id", pa.int32()), pa.field("data", pa.int64())]
+ )
+
+ opts = WriterOptions(
+ compression=WriterOptions.COMPRESSION_NONE,
+ num_buckets=1,
+ row_group_max_size=200,
+ )
+ buf = io.BytesIO()
+ with MosaicWriter(buf, pa_schema, opts) as writer:
+ for start in range(0, 500, 50):
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(start, start + 50)),
type=pa.int32()),
+ pa.array(
+ [i * 2 for i in range(start, start + 50)],
+ type=pa.int64(),
+ ),
+ ],
+ names=["id", "data"],
+ )
+ writer.write(batch)
+ data = buf.getvalue()
+
+ with _reader_from_bytes(data) as reader:
+ assert reader.num_row_groups > 1
+ total = 0
+ for rg in range(reader.num_row_groups):
+ num_rows = reader.row_group_num_rows(rg)
+ assert num_rows > 0
+ rb = reader.read_row_group(rg)
+ assert num_rows == rb.num_rows
+ total += num_rows
+ assert total == 500
+
+ def test_row_group_num_rows_single(self):
+ pa_schema = pa.schema([pa.field("x", pa.int32())])
+ batch = pa.record_batch(
+ [pa.array(list(range(10)), type=pa.int32())], names=["x"]
+ )
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ assert reader.num_row_groups == 1
+ assert reader.row_group_num_rows(0) == 10