This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 35b5bfb Add Java/JNI bindings and CI test job (#6)
35b5bfb is described below
commit 35b5bfbec6d6bcf05cbf63721187a0af49610a55
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 11:06:37 2026 +0800
Add Java/JNI bindings and CI test job (#6)
Enabling Java applications to use MosaicWriter/MosaicReader via Arrow C
Data Interface over JNI.
---
.github/workflows/ci.yml | 29 +
Cargo.toml | 2 +-
java/pom.xml | 73 ++
java/src/main/java/io/mosaic/ColumnStatistics.java | 55 ++
java/src/main/java/io/mosaic/InputFile.java | 33 +
java/src/main/java/io/mosaic/MosaicReader.java | 123 +++
java/src/main/java/io/mosaic/MosaicWriter.java | 87 +++
java/src/main/java/io/mosaic/NativeLib.java | 62 ++
java/src/main/java/io/mosaic/WriterOptions.java | 85 +++
.../test/java/io/mosaic/MosaicRoundtripTest.java | 670 +++++++++++++++++
Cargo.toml => jni/Cargo.toml | 18 +-
jni/src/lib.rs | 832 +++++++++++++++++++++
12 files changed, 2064 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 70b4fec..53ee1d3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -102,3 +102,32 @@ jobs:
run: cpp/build/test_mosaic
env:
LD_LIBRARY_PATH: ${{ github.workspace }}/target/release
+
+ java-test:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v6
+
+ - name: Rust Cache
+ uses: actions/cache@v5
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+ restore-keys: |
+ ${{ runner.os }}-cargo-
+
+ - name: Set up JDK 11
+ uses: actions/setup-java@v4
+ with:
+ java-version: '11'
+ distribution: 'temurin'
+
+ - name: Build JNI library
+ run: cargo build -p mosaic-jni --release
+
+ - name: Run Java tests
+ working-directory: java
+ run: mvn test "-DargLine=-Djava.library.path=${{ github.workspace
}}/target/release"
diff --git a/Cargo.toml b/Cargo.toml
index 39c103c..240d079 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[workspace]
-members = ["core", "ffi"]
+members = ["core", "ffi", "jni"]
resolver = "2"
[profile.release]
diff --git a/java/pom.xml b/java/pom.xml
new file mode 100644
index 0000000..f295fcc
--- /dev/null
+++ b/java/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>io.mosaic</groupId>
+ <artifactId>mosaic-writer</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>Mosaic Writer</name>
+ <description>Mosaic file format writer for Java (backed by Rust via
JNI)</description>
+
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <arrow.version>18.1.0</arrow.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-bom</artifactId>
+ <version>${arrow.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-c-data</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.2</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/java/src/main/java/io/mosaic/ColumnStatistics.java
b/java/src/main/java/io/mosaic/ColumnStatistics.java
new file mode 100644
index 0000000..e4733d7
--- /dev/null
+++ b/java/src/main/java/io/mosaic/ColumnStatistics.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+public class ColumnStatistics {
+
+ private final int columnIndex;
+ private final long nullCount;
+ private final byte[] min;
+ private final byte[] max;
+
+ ColumnStatistics(int columnIndex, long nullCount, byte[] min, byte[] max) {
+ this.columnIndex = columnIndex;
+ this.nullCount = nullCount;
+ this.min = min;
+ this.max = max;
+ }
+
+ public int getColumnIndex() {
+ return columnIndex;
+ }
+
+ public long getNullCount() {
+ return nullCount;
+ }
+
+ public boolean hasMinMax() {
+ return min != null;
+ }
+
+ public byte[] getMin() {
+ return min;
+ }
+
+ public byte[] getMax() {
+ return max;
+ }
+}
diff --git a/java/src/main/java/io/mosaic/InputFile.java
b/java/src/main/java/io/mosaic/InputFile.java
new file mode 100644
index 0000000..9808d1f
--- /dev/null
+++ b/java/src/main/java/io/mosaic/InputFile.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.IOException;
+
+public interface InputFile {
+
+ /**
+ * Read {@code length} bytes starting at {@code position} into {@code
buffer}.
+ *
+ * <p>This method must be thread-safe: the reader may call it concurrently
+ * from multiple threads to perform parallel IO.
+ */
+ void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
+}
diff --git a/java/src/main/java/io/mosaic/MosaicReader.java
b/java/src/main/java/io/mosaic/MosaicReader.java
new file mode 100644
index 0000000..95d8f47
--- /dev/null
+++ b/java/src/main/java/io/mosaic/MosaicReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class MosaicReader implements AutoCloseable {
+
+ private long handle;
+ private final Schema schema;
+
+ private MosaicReader(long handle, BufferAllocator allocator) {
+ this.handle = handle;
+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
+ int rc = NativeLib.nativeReaderExportSchema(handle,
cSchema.memoryAddress());
+ if (rc != 0) {
+ throw new RuntimeException("failed to export schema");
+ }
+ this.schema = Data.importSchema(allocator, cSchema, null);
+ }
+ }
+
+ public static MosaicReader open(InputFile inputFile, long fileLength,
BufferAllocator allocator) {
+ long h = NativeLib.nativeReaderOpen(inputFile, fileLength);
+ if (h == 0) {
+ throw new RuntimeException("failed to open reader");
+ }
+ return new MosaicReader(h, allocator);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public int numRowGroups() {
+ return NativeLib.nativeReaderNumRowGroups(handle);
+ }
+
+ public VectorSchemaRoot readRowGroup(int rgIndex, BufferAllocator
allocator) {
+ long rgHandle = NativeLib.nativeReaderOpenRowGroup(handle, rgIndex);
+ if (rgHandle == 0) {
+ throw new RuntimeException("failed to open row group " + rgIndex);
+ }
+ try {
+ return readRowGroupHandle(rgHandle, allocator);
+ } finally {
+ NativeLib.nativeRowGroupReaderFree(rgHandle);
+ }
+ }
+
+ public VectorSchemaRoot readRowGroup(int rgIndex, int[] columns,
BufferAllocator allocator) {
+ long rgHandle = NativeLib.nativeReaderOpenRowGroupProjected(handle,
rgIndex, columns);
+ if (rgHandle == 0) {
+ throw new RuntimeException("failed to open row group " + rgIndex);
+ }
+ try {
+ return readRowGroupHandle(rgHandle, allocator);
+ } finally {
+ NativeLib.nativeRowGroupReaderFree(rgHandle);
+ }
+ }
+
+ private VectorSchemaRoot readRowGroupHandle(long rgHandle, BufferAllocator
allocator) {
+ try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
+ ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator)) {
+ int rc = NativeLib.nativeRowGroupReaderReadColumns(
+ rgHandle, arrowArray.memoryAddress(),
arrowSchema.memoryAddress());
+ if (rc != 0) {
+ throw new RuntimeException("readColumns failed");
+ }
+ return Data.importVectorSchemaRoot(allocator, arrowArray,
arrowSchema, null);
+ }
+ }
+
+ public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
+ int n = NativeLib.nativeReaderRowGroupNumStats(handle, rgIndex);
+ if (n < 0) {
+ throw new RuntimeException("failed to get row group statistics for
index " + rgIndex);
+ }
+ List<ColumnStatistics> result = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ result.add(new ColumnStatistics(
+ NativeLib.nativeReaderRowGroupStatColumnIndex(handle,
rgIndex, i),
+ NativeLib.nativeReaderRowGroupStatNullCount(handle,
rgIndex, i),
+ NativeLib.nativeReaderRowGroupStatMin(handle, rgIndex, i),
+ NativeLib.nativeReaderRowGroupStatMax(handle, rgIndex,
i)));
+ }
+ return result;
+ }
+
+ @Override
+ public void close() {
+ if (handle != 0) {
+ NativeLib.nativeReaderFree(handle);
+ handle = 0;
+ }
+ }
+}
diff --git a/java/src/main/java/io/mosaic/MosaicWriter.java
b/java/src/main/java/io/mosaic/MosaicWriter.java
new file mode 100644
index 0000000..09f6a3a
--- /dev/null
+++ b/java/src/main/java/io/mosaic/MosaicWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.OutputStream;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class MosaicWriter implements AutoCloseable {
+
+ private long handle;
+ private boolean closed;
+ private final BufferAllocator allocator;
+
+ public MosaicWriter(OutputStream outputStream, Schema arrowSchema,
BufferAllocator allocator) {
+ this(outputStream, arrowSchema, new WriterOptions(), allocator);
+ }
+
+ public MosaicWriter(OutputStream outputStream, Schema arrowSchema,
WriterOptions options, BufferAllocator allocator) {
+ this.allocator = allocator;
+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
+ Data.exportSchema(allocator, arrowSchema, null, cSchema);
+ this.handle = NativeLib.nativeWriterOpen(
+ outputStream,
+ cSchema.memoryAddress(),
+ options.getNumBuckets(),
+ options.getCompression(),
+ options.getZstdLevel(),
+ options.getRowGroupMaxSize(),
+ options.getMaxDictTotalBytes(),
+ options.getMaxDictEntries(),
+ options.getStatsColumns(),
+ options.getPageSizeThreshold());
+ cSchema.release();
+ }
+ if (this.handle == 0) {
+ throw new RuntimeException("failed to open writer");
+ }
+ }
+
+ public void write(VectorSchemaRoot root) {
+ try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
+ ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator)) {
+ Data.exportVectorSchemaRoot(allocator, root, null, arrowArray,
arrowSchema);
+ NativeLib.nativeWriterWriteBatch(handle,
arrowArray.memoryAddress(), arrowSchema.memoryAddress());
+ }
+ }
+
+ public long estimatedFileSize() {
+ return NativeLib.nativeWriterEstimatedSize(handle);
+ }
+
+ @Override
+ public void close() {
+ if (!closed && handle != 0) {
+ closed = true;
+ try {
+ NativeLib.nativeWriterClose(handle);
+ } finally {
+ NativeLib.nativeWriterFree(handle);
+ handle = 0;
+ }
+ }
+ }
+}
diff --git a/java/src/main/java/io/mosaic/NativeLib.java
b/java/src/main/java/io/mosaic/NativeLib.java
new file mode 100644
index 0000000..1805269
--- /dev/null
+++ b/java/src/main/java/io/mosaic/NativeLib.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.OutputStream;
+
+final class NativeLib {
+
+ static {
+ System.loadLibrary("mosaic_jni");
+ }
+
+ private NativeLib() {}
+
+ // Writer
+ static native long nativeWriterOpen(OutputStream stream, long
arrowSchemaAddr,
+ int numBuckets, int compression, int
zstdLevel,
+ long rowGroupMaxSize, int
maxDictTotalBytes,
+ int maxDictEntries, int[] statsColumns,
+ int pageSizeThreshold);
+ static native void nativeWriterClose(long handle);
+ static native void nativeWriterFree(long handle);
+ static native long nativeWriterEstimatedSize(long handle);
+ static native void nativeWriterWriteBatch(long writerHandle, long
arrayAddr, long schemaAddr);
+
+ // Reader
+ static native long nativeReaderOpen(Object inputFile, long fileLength);
+ static native void nativeReaderFree(long handle);
+ static native int nativeReaderExportSchema(long handle, long schemaAddr);
+ static native int nativeReaderNumRowGroups(long handle);
+ static native long nativeReaderOpenRowGroup(long handle, int rgIndex);
+ static native long nativeReaderOpenRowGroupProjected(long handle, int
rgIndex, int[] columns);
+
+ // RowGroupReader
+ static native int nativeRowGroupReaderNumRows(long handle);
+ static native int nativeRowGroupReaderReadColumns(long handle, long
arrayAddr, long schemaAddr);
+ static native void nativeRowGroupReaderFree(long handle);
+
+ // Row group stats
+ static native int nativeReaderRowGroupNumStats(long handle, int rgIndex);
+ static native int nativeReaderRowGroupStatColumnIndex(long handle, int
rgIndex, int statIndex);
+ static native long nativeReaderRowGroupStatNullCount(long handle, int
rgIndex, int statIndex);
+ static native byte[] nativeReaderRowGroupStatMin(long handle, int rgIndex,
int statIndex);
+ static native byte[] nativeReaderRowGroupStatMax(long handle, int rgIndex,
int statIndex);
+}
diff --git a/java/src/main/java/io/mosaic/WriterOptions.java
b/java/src/main/java/io/mosaic/WriterOptions.java
new file mode 100644
index 0000000..8b8d687
--- /dev/null
+++ b/java/src/main/java/io/mosaic/WriterOptions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+public class WriterOptions {
+
+ public static final int COMPRESSION_ZSTD = 1;
+
+ private int compression = COMPRESSION_ZSTD;
+ private int zstdLevel = 1;
+ private int numBuckets = 0;
+ private long rowGroupMaxSize = 256L * 1024 * 1024;
+ private int maxDictTotalBytes = 32 * 1024;
+ private int maxDictEntries = 255;
+ private int[] statsColumns = new int[0];
+ private int pageSizeThreshold = 32 * 1024;
+
+ public WriterOptions() {}
+
+ public WriterOptions compression(int compression) {
+ this.compression = compression;
+ return this;
+ }
+
+ public WriterOptions zstdLevel(int level) {
+ this.zstdLevel = level;
+ return this;
+ }
+
+ public WriterOptions numBuckets(int numBuckets) {
+ this.numBuckets = numBuckets;
+ return this;
+ }
+
+ public WriterOptions rowGroupMaxSize(long size) {
+ this.rowGroupMaxSize = size;
+ return this;
+ }
+
+ public WriterOptions maxDictTotalBytes(int bytes) {
+ this.maxDictTotalBytes = bytes;
+ return this;
+ }
+
+ public WriterOptions maxDictEntries(int entries) {
+ this.maxDictEntries = entries;
+ return this;
+ }
+
+ public WriterOptions statsColumns(int... columns) {
+ this.statsColumns = columns.clone();
+ return this;
+ }
+
+ public WriterOptions pageSizeThreshold(int threshold) {
+ this.pageSizeThreshold = threshold;
+ return this;
+ }
+
+ int getCompression() { return compression; }
+ int getZstdLevel() { return zstdLevel; }
+ int getNumBuckets() { return numBuckets; }
+ long getRowGroupMaxSize() { return rowGroupMaxSize; }
+ int getMaxDictTotalBytes() { return maxDictTotalBytes; }
+ int getMaxDictEntries() { return maxDictEntries; }
+ int[] getStatsColumns() { return statsColumns; }
+ int getPageSizeThreshold() { return pageSizeThreshold; }
+}
diff --git a/java/src/test/java/io/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/io/mosaic/MosaicRoundtripTest.java
new file mode 100644
index 0000000..7fd777a
--- /dev/null
+++ b/java/src/test/java/io/mosaic/MosaicRoundtripTest.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MosaicRoundtripTest {
+
+ private BufferAllocator allocator;
+
+ @Before
+ public void setUp() {
+ allocator = new RootAllocator();
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ private byte[] writeToBytes(Schema schema,
java.util.function.Consumer<MosaicWriter> writeFn) {
+ return writeToBytes(schema, new WriterOptions(), writeFn);
+ }
+
+ private byte[] writeToBytes(Schema schema, WriterOptions opts,
java.util.function.Consumer<MosaicWriter> writeFn) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (MosaicWriter writer = new MosaicWriter(baos, schema, opts,
allocator)) {
+ writeFn.accept(writer);
+ }
+ return baos.toByteArray();
+ }
+
+ private MosaicReader readerFromBytes(byte[] data) {
+ InputFile inputFile = (position, buffer, offset, length) -> {
+ System.arraycopy(data, (int) position, buffer, offset, length);
+ };
+ return MosaicReader.open(inputFile, data.length, allocator);
+ }
+
+ @Test
+ public void testBasicRoundtrip() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.notNullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", ArrowType.Utf8.INSTANCE),
+ Field.nullable("score", new
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ VarCharVector names = (VarCharVector) root.getVector("name");
+ Float8Vector scores = (Float8Vector) root.getVector("score");
+
+ ids.allocateNew(50);
+ names.allocateNew(50);
+ scores.allocateNew(50);
+
+ for (int i = 0; i < 50; i++) {
+ ids.set(i, i);
+ names.setSafe(i, ("user_" + i).getBytes());
+ scores.set(i, i * 1.5);
+ }
+ root.setRowCount(50);
+
+ data = writeToBytes(arrowSchema, new
WriterOptions().numBuckets(2), writer -> writer.write(root));
+ }
+
+ assertTrue(data.length > 32);
+ assertEquals('M', data[data.length - 4]);
+ assertEquals('O', data[data.length - 3]);
+ assertEquals('S', data[data.length - 2]);
+ assertEquals('A', data[data.length - 1]);
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ assertEquals(3, reader.getSchema().getFields().size());
+ assertTrue(reader.numRowGroups() >= 1);
+
+ int idCol =
reader.getSchema().getFields().indexOf(reader.getSchema().findField("id"));
+ int nameCol =
reader.getSchema().getFields().indexOf(reader.getSchema().findField("name"));
+ int scoreCol =
reader.getSchema().getFields().indexOf(reader.getSchema().findField("score"));
+ assertTrue(idCol >= 0);
+ assertTrue(nameCol >= 0);
+ assertTrue(scoreCol >= 0);
+
+ int totalRows = 0;
+ for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(rg,
allocator)) {
+ int rows = batch.getRowCount();
+ totalRows += rows;
+
+ IntVector readIds = (IntVector) batch.getVector(idCol);
+ VarCharVector readNames = (VarCharVector)
batch.getVector(nameCol);
+ Float8Vector readScores = (Float8Vector)
batch.getVector(scoreCol);
+
+ for (int i = 0; i < rows; i++) {
+ int id = readIds.get(i);
+ String name = new String(readNames.get(i));
+ double score = readScores.get(i);
+ assertEquals("user_" + id, name);
+ assertEquals(id * 1.5, score, 1e-9);
+ }
+ }
+ }
+ assertEquals(50, totalRows);
+ }
+ }
+
+ @Test
+ public void testNullValues() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", ArrowType.Utf8.INSTANCE),
+ Field.nullable("value", new
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ VarCharVector names = (VarCharVector) root.getVector("name");
+ Float8Vector values = (Float8Vector) root.getVector("value");
+
+ ids.allocateNew(3);
+ names.allocateNew(3);
+ values.allocateNew(3);
+
+ ids.set(0, 1);
+ names.setSafe(0, "hello".getBytes());
+ values.set(0, 1.0);
+
+ ids.set(1, 2);
+ names.setNull(1);
+ values.set(1, 2.0);
+
+ ids.set(2, 3);
+ names.setSafe(2, "world".getBytes());
+ values.setNull(2);
+
+ root.setRowCount(3);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ int nameCol =
reader.getSchema().getFields().indexOf(reader.getSchema().findField("name"));
+ int valueCol =
reader.getSchema().getFields().indexOf(reader.getSchema().findField("value"));
+
+ for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(rg,
allocator)) {
+ assertEquals(3, batch.getRowCount());
+
+ VarCharVector readNames = (VarCharVector)
batch.getVector(nameCol);
+ Float8Vector readValues = (Float8Vector)
batch.getVector(valueCol);
+
+ assertFalse(readNames.isNull(0));
+ assertEquals("hello", new String(readNames.get(0)));
+
+ assertTrue(readNames.isNull(1));
+
+ assertFalse(readNames.isNull(2));
+ assertEquals("world", new String(readNames.get(2)));
+
+ assertFalse(readValues.isNull(0));
+ assertTrue(readValues.isNull(2));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testProjection() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("a", new ArrowType.Int(32, true)),
+ Field.nullable("b", ArrowType.Utf8.INSTANCE),
+ Field.nullable("c", new
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE)),
+ Field.nullable("d", ArrowType.Utf8.INSTANCE)
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector aVec = (IntVector) root.getVector("a");
+ VarCharVector bVec = (VarCharVector) root.getVector("b");
+ Float8Vector cVec = (Float8Vector) root.getVector("c");
+ VarCharVector dVec = (VarCharVector) root.getVector("d");
+
+ int n = 20;
+ aVec.allocateNew(n);
+ bVec.allocateNew(n);
+ cVec.allocateNew(n);
+ dVec.allocateNew(n);
+
+ for (int i = 0; i < n; i++) {
+ aVec.set(i, i);
+ bVec.setSafe(i, ("val_" + i).getBytes());
+ cVec.set(i, (double) i);
+ dVec.setSafe(i, ("extra_" + i).getBytes());
+ }
+ root.setRowCount(n);
+ data = writeToBytes(arrowSchema, new
WriterOptions().numBuckets(2), writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ int aCol =
reader.getSchema().getFields().indexOf(reader.getSchema().findField("a"));
+ int bCol =
reader.getSchema().getFields().indexOf(reader.getSchema().findField("b"));
+ int[] projected = {aCol, bCol};
+
+ int totalRows = 0;
+ for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(rg,
projected, allocator)) {
+ totalRows += batch.getRowCount();
+ assertEquals(2, batch.getFieldVectors().size());
+ }
+ }
+ assertEquals(20, totalRows);
+ }
+ }
+
+ @Test
+ public void testStats() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", ArrowType.Utf8.INSTANCE),
+ Field.nullable("value", new
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))
+ ));
+
+ WriterOptions opts = new WriterOptions().statsColumns(0, 2);
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ VarCharVector names = (VarCharVector) root.getVector("name");
+ Float8Vector values = (Float8Vector) root.getVector("value");
+
+ int n = 10;
+ ids.allocateNew(n);
+ names.allocateNew(n);
+ values.allocateNew(n);
+
+ for (int i = 0; i < n; i++) {
+ ids.set(i, i * 10);
+ names.setSafe(i, ("item_" + i).getBytes());
+ values.set(i, i * 1.1);
+ }
+ root.setRowCount(n);
+ data = writeToBytes(arrowSchema, opts, writer ->
writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+ java.util.List<ColumnStatistics> stats =
reader.getRowGroupStatistics(rg);
+ assertTrue(stats.size() > 0);
+ for (ColumnStatistics stat : stats) {
+ assertTrue(stat.getColumnIndex() == 0 ||
stat.getColumnIndex() == 2);
+ assertEquals(0, stat.getNullCount());
+ assertTrue(stat.hasMinMax());
+ assertNotNull(stat.getMin());
+ assertNotNull(stat.getMax());
+ assertTrue(stat.getMin().length > 0);
+ assertTrue(stat.getMax().length > 0);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAllTypes() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("f_bool", ArrowType.Bool.INSTANCE),
+ Field.nullable("f_int8", new ArrowType.Int(8, true)),
+ Field.nullable("f_int16", new ArrowType.Int(16, true)),
+ Field.nullable("f_int32", new ArrowType.Int(32, true)),
+ Field.nullable("f_int64", new ArrowType.Int(64, true)),
+ Field.nullable("f_float32", new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)),
+ Field.nullable("f_float64", new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
+ Field.nullable("f_utf8", ArrowType.Utf8.INSTANCE),
+ Field.nullable("f_binary", ArrowType.Binary.INSTANCE)
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ BitVector boolVec = (BitVector) root.getVector("f_bool");
+ TinyIntVector int8Vec = (TinyIntVector) root.getVector("f_int8");
+ SmallIntVector int16Vec = (SmallIntVector)
root.getVector("f_int16");
+ IntVector int32Vec = (IntVector) root.getVector("f_int32");
+ BigIntVector int64Vec = (BigIntVector) root.getVector("f_int64");
+ Float4Vector f32Vec = (Float4Vector) root.getVector("f_float32");
+ Float8Vector f64Vec = (Float8Vector) root.getVector("f_float64");
+ VarCharVector utf8Vec = (VarCharVector) root.getVector("f_utf8");
+ VarBinaryVector binVec = (VarBinaryVector)
root.getVector("f_binary");
+
+ int n = 2;
+ boolVec.allocateNew(n);
+ int8Vec.allocateNew(n);
+ int16Vec.allocateNew(n);
+ int32Vec.allocateNew(n);
+ int64Vec.allocateNew(n);
+ f32Vec.allocateNew(n);
+ f64Vec.allocateNew(n);
+ utf8Vec.allocateNew(n);
+ binVec.allocateNew(n);
+
+ boolVec.set(0, 1); boolVec.set(1, 0);
+ int8Vec.set(0, 42); int8Vec.set(1, -1);
+ int16Vec.set(0, 1234); int16Vec.set(1, -5678);
+ int32Vec.set(0, 100000); int32Vec.set(1, -200000);
+ int64Vec.set(0, 9999999999L); int64Vec.set(1, -9999999999L);
+ f32Vec.set(0, 3.14f); f32Vec.set(1, -2.71f);
+ f64Vec.set(0, 2.718281828); f64Vec.set(1, -3.141592653);
+ utf8Vec.setSafe(0, "hello".getBytes()); utf8Vec.setSafe(1,
"world".getBytes());
+ binVec.setSafe(0, new byte[]{1, 2, 3}); binVec.setSafe(1, new
byte[]{(byte) 0xff, 0});
+
+ root.setRowCount(n);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+ assertEquals(2, batch.getRowCount());
+ assertEquals(1, ((BitVector)
batch.getVector("f_bool")).get(0));
+ assertEquals(0, ((BitVector)
batch.getVector("f_bool")).get(1));
+ assertEquals(42, ((TinyIntVector)
batch.getVector("f_int8")).get(0));
+ assertEquals(-1, ((TinyIntVector)
batch.getVector("f_int8")).get(1));
+ assertEquals(1234, ((SmallIntVector)
batch.getVector("f_int16")).get(0));
+ assertEquals(-5678, ((SmallIntVector)
batch.getVector("f_int16")).get(1));
+ assertEquals(100000, ((IntVector)
batch.getVector("f_int32")).get(0));
+ assertEquals(-200000, ((IntVector)
batch.getVector("f_int32")).get(1));
+ assertEquals(9999999999L, ((BigIntVector)
batch.getVector("f_int64")).get(0));
+ assertEquals(-9999999999L, ((BigIntVector)
batch.getVector("f_int64")).get(1));
+ assertEquals(3.14f, ((Float4Vector)
batch.getVector("f_float32")).get(0), 1e-5f);
+ assertEquals(-2.71f, ((Float4Vector)
batch.getVector("f_float32")).get(1), 1e-5f);
+ assertEquals(2.718281828, ((Float8Vector)
batch.getVector("f_float64")).get(0), 1e-9);
+ assertEquals(-3.141592653, ((Float8Vector)
batch.getVector("f_float64")).get(1), 1e-9);
+ assertEquals("hello", new String(((VarCharVector)
batch.getVector("f_utf8")).get(0)));
+ assertEquals("world", new String(((VarCharVector)
batch.getVector("f_utf8")).get(1)));
+ assertArrayEquals(new byte[]{1, 2, 3}, ((VarBinaryVector)
batch.getVector("f_binary")).get(0));
+ assertArrayEquals(new byte[]{(byte) 0xff, 0},
((VarBinaryVector) batch.getVector("f_binary")).get(1));
+ }
+ }
+ }
+
+ @Test
+ public void testCompressionNone() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true)),
+ Field.nullable("y", ArrowType.Utf8.INSTANCE)
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector xVec = (IntVector) root.getVector("x");
+ VarCharVector yVec = (VarCharVector) root.getVector("y");
+ int n = 20;
+ xVec.allocateNew(n);
+ yVec.allocateNew(n);
+ for (int i = 0; i < n; i++) {
+ xVec.set(i, i);
+ yVec.setSafe(i, ("v_" + i).getBytes());
+ }
+ root.setRowCount(n);
+ data = writeToBytes(arrowSchema, new
WriterOptions().compression(0), writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+ assertEquals(20, batch.getRowCount());
+ for (int i = 0; i < 20; i++) {
+ assertEquals(i, ((IntVector) batch.getVector("x")).get(i));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleRowGroups() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("data", new ArrowType.Int(64, true))
+ ));
+
+ WriterOptions opts = new
WriterOptions().compression(0).numBuckets(1).rowGroupMaxSize(200);
+
+ byte[] data;
+ int totalRows = 500;
+ int batchSize = 10;
+ data = writeToBytes(arrowSchema, opts, writer -> {
+ for (int start = 0; start < totalRows; start += batchSize) {
+ try (VectorSchemaRoot root =
VectorSchemaRoot.create(arrowSchema, allocator)) {
+ IntVector idVec = (IntVector) root.getVector("id");
+ BigIntVector dataVec = (BigIntVector)
root.getVector("data");
+ idVec.allocateNew(batchSize);
+ dataVec.allocateNew(batchSize);
+ for (int i = 0; i < batchSize; i++) {
+ idVec.set(i, start + i);
+ dataVec.set(i, (long) (start + i) * 3);
+ }
+ root.setRowCount(batchSize);
+ writer.write(root);
+ }
+ }
+ });
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ assertTrue(reader.numRowGroups() > 1);
+ int offset = 0;
+ for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(rg,
allocator)) {
+ IntVector ids = (IntVector) batch.getVector("id");
+ BigIntVector datas = (BigIntVector)
batch.getVector("data");
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ assertEquals(offset + i, ids.get(i));
+ assertEquals((long) (offset + i) * 3, datas.get(i));
+ }
+ offset += batch.getRowCount();
+ }
+ }
+ assertEquals(500, offset);
+ }
+ }
+
+ @Test
+ public void testMultipleWrites() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true))
+ ));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (MosaicWriter writer = new MosaicWriter(baos, arrowSchema,
allocator)) {
+ for (int start = 0; start < 30; start += 10) {
+ try (VectorSchemaRoot root =
VectorSchemaRoot.create(arrowSchema, allocator)) {
+ IntVector xVec = (IntVector) root.getVector("x");
+ xVec.allocateNew(10);
+ for (int i = 0; i < 10; i++) {
+ xVec.set(i, start + i);
+ }
+ root.setRowCount(10);
+ writer.write(root);
+ }
+ }
+ }
+ byte[] data = baos.toByteArray();
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ int totalRows = 0;
+ for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(rg,
allocator)) {
+ totalRows += batch.getRowCount();
+ }
+ }
+ assertEquals(30, totalRows);
+ }
+ }
+
+ @Test
+ public void testSingleRow() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("v", new ArrowType.Int(32, true))
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector vVec = (IntVector) root.getVector("v");
+ vVec.allocateNew(1);
+ vVec.set(0, 42);
+ root.setRowCount(1);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+ assertEquals(1, batch.getRowCount());
+ assertEquals(42, ((IntVector) batch.getVector("v")).get(0));
+ }
+ }
+ }
+
+ @Test
+ public void testZeroRows() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("v", new ArrowType.Int(32, true))
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ root.getVector("v").allocateNew();
+ root.setRowCount(0);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ assertEquals(0, reader.numRowGroups());
+ }
+ }
+
+ @Test
+ public void testStatsWithNulls() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("a", new ArrowType.Int(32, true)),
+ Field.nullable("b", new ArrowType.Int(64, true))
+ ));
+
+ WriterOptions opts = new WriterOptions().statsColumns(0,
1).numBuckets(1);
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector aVec = (IntVector) root.getVector("a");
+ BigIntVector bVec = (BigIntVector) root.getVector("b");
+ aVec.allocateNew(4);
+ bVec.allocateNew(4);
+
+ aVec.set(0, 10);
+ aVec.setNull(1);
+ aVec.set(2, 5);
+ aVec.set(3, 20);
+
+ bVec.setNull(0);
+ bVec.setNull(1);
+ bVec.set(2, 100);
+ bVec.set(3, 50);
+
+ root.setRowCount(4);
+ data = writeToBytes(arrowSchema, opts, writer ->
writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ List<ColumnStatistics> stats = reader.getRowGroupStatistics(0);
+ assertEquals(2, stats.size());
+
+ ColumnStatistics aStat = stats.stream().filter(s ->
s.getColumnIndex() == 0).findFirst().get();
+ assertEquals(1, aStat.getNullCount());
+ assertTrue(aStat.hasMinMax());
+ int minA =
ByteBuffer.wrap(aStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+ int maxA =
ByteBuffer.wrap(aStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+ assertEquals(5, minA);
+ assertEquals(20, maxA);
+
+ ColumnStatistics bStat = stats.stream().filter(s ->
s.getColumnIndex() == 1).findFirst().get();
+ assertEquals(2, bStat.getNullCount());
+ assertTrue(bStat.hasMinMax());
+ long minB =
ByteBuffer.wrap(bStat.getMin()).order(ByteOrder.BIG_ENDIAN).getLong();
+ long maxB =
ByteBuffer.wrap(bStat.getMax()).order(ByteOrder.BIG_ENDIAN).getLong();
+ assertEquals(50, minB);
+ assertEquals(100, maxB);
+ }
+ }
+
+ @Test
+ public void testStatsAllNull() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true))
+ ));
+
+ WriterOptions opts = new WriterOptions().statsColumns(0).numBuckets(1);
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector xVec = (IntVector) root.getVector("x");
+ xVec.allocateNew(3);
+ xVec.setNull(0);
+ xVec.setNull(1);
+ xVec.setNull(2);
+ root.setRowCount(3);
+ data = writeToBytes(arrowSchema, opts, writer ->
writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ List<ColumnStatistics> stats = reader.getRowGroupStatistics(0);
+ assertEquals(1, stats.size());
+ assertEquals(3, stats.get(0).getNullCount());
+ assertFalse(stats.get(0).hasMinMax());
+ }
+ }
+
+ @Test
+ public void testEstimatedFileSize() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true)),
+ Field.nullable("y", ArrowType.Utf8.INSTANCE)
+ ));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (MosaicWriter writer = new MosaicWriter(baos, arrowSchema,
allocator)) {
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector xVec = (IntVector) root.getVector("x");
+ VarCharVector yVec = (VarCharVector) root.getVector("y");
+ int n = 100;
+ xVec.allocateNew(n);
+ yVec.allocateNew(n);
+ for (int i = 0; i < n; i++) {
+ xVec.set(i, i);
+ yVec.setSafe(i, ("value_" + i).getBytes());
+ }
+ root.setRowCount(n);
+ writer.write(root);
+ }
+ assertTrue(writer.estimatedFileSize() > 0);
+ }
+ }
+
+ @Test
+ public void testSchemaRoundtrip() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.notNullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("name", ArrowType.Utf8.INSTANCE),
+ Field.nullable("score", new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector ids = (IntVector) root.getVector("id");
+ VarCharVector names = (VarCharVector) root.getVector("name");
+ Float8Vector scores = (Float8Vector) root.getVector("score");
+ ids.allocateNew(1); names.allocateNew(1); scores.allocateNew(1);
+ ids.set(0, 1); names.setSafe(0, "x".getBytes()); scores.set(0,
1.0);
+ root.setRowCount(1);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ try (MosaicReader reader = readerFromBytes(data)) {
+ Schema readSchema = reader.getSchema();
+ assertEquals(3, readSchema.getFields().size());
+ assertEquals("id", readSchema.getFields().get(0).getName());
+ assertEquals("name", readSchema.getFields().get(1).getName());
+ assertEquals("score", readSchema.getFields().get(2).getName());
+ assertFalse(readSchema.getFields().get(0).isNullable());
+ assertTrue(readSchema.getFields().get(1).isNullable());
+ }
+ }
+}
diff --git a/Cargo.toml b/jni/Cargo.toml
similarity index 71%
copy from Cargo.toml
copy to jni/Cargo.toml
index 39c103c..1cf0142 100644
--- a/Cargo.toml
+++ b/jni/Cargo.toml
@@ -15,8 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-members = ["core", "ffi"]
-resolver = "2"
+[package]
+name = "mosaic-jni"
+version = "0.1.0"
+edition = "2021"
+description = "Mosaic file format — JNI bindings for Java"
+license = "Apache-2.0"
-[profile.release]
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+mosaic-core = { path = "../core" }
+jni = "0.21"
+arrow-schema = "58"
+arrow-array = { version = "58", features = ["ffi"] }
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
new file mode 100644
index 0000000..2c845c6
--- /dev/null
+++ b/jni/src/lib.rs
@@ -0,0 +1,832 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io;
+use std::panic::{self, AssertUnwindSafe};
+use std::ptr;
+use std::sync::Arc;
+
+use jni::objects::{GlobalRef, JByteArray, JClass, JIntArray, JMethodID,
JObject, JValue};
+use jni::sys::{jint, jlong};
+use jni::JNIEnv;
+use jni::JavaVM;
+
+use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::Schema;
+
+use mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess,
RowGroupReader};
+use mosaic_core::spec::*;
+use mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+fn panic_message(e: &Box<dyn std::any::Any + Send>) -> String {
+ if let Some(s) = e.downcast_ref::<String>() {
+ format!("native panic: {}", s)
+ } else if let Some(s) = e.downcast_ref::<&str>() {
+ format!("native panic: {}", s)
+ } else {
+ "native panic: unknown".to_string()
+ }
+}
+
+struct JniOutputFile {
+ jvm: Arc<JavaVM>,
+ stream_ref: GlobalRef,
+ write_mid: JMethodID,
+ flush_mid: JMethodID,
+ pos: u64,
+ cached_array: Option<GlobalRef>,
+ cached_array_len: usize,
+}
+
+unsafe impl Send for JniOutputFile {}
+
+impl OutputFile for JniOutputFile {
+ fn write(&mut self, data: &[u8]) -> io::Result<()> {
+ let mut env = self
+ .jvm
+ .attach_current_thread()
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ let len = data.len() as i32;
+
+ let need_new = match &self.cached_array {
+ Some(_) => data.len() > self.cached_array_len,
+ None => true,
+ };
+
+ if need_new {
+ let byte_array = env
+ .new_byte_array(len)
+ .map_err(|e| io::Error::other(e.to_string()))?;
+ let global = env
+ .new_global_ref(&byte_array)
+ .map_err(|e| io::Error::other(e.to_string()))?;
+ self.cached_array = Some(global);
+ self.cached_array_len = data.len();
+ }
+
+ let raw = self.cached_array.as_ref().unwrap().as_raw();
+ let byte_array = unsafe { JByteArray::from_raw(raw) };
+
+ env.set_byte_array_region(&byte_array, 0, bytemuck_cast(data))
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ unsafe {
+ env.call_method_unchecked(
+ &self.stream_ref,
+ self.write_mid,
+
jni::signature::ReturnType::Primitive(jni::signature::Primitive::Void),
+ &[
+ jni::sys::jvalue { l: raw },
+ jni::sys::jvalue { i: 0 },
+ jni::sys::jvalue { i: len },
+ ],
+ )
+ .map_err(|e| io::Error::other(e.to_string()))?;
+ }
+ #[allow(clippy::forget_non_drop)]
+ std::mem::forget(byte_array);
+ self.pos += data.len() as u64;
+ Ok(())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ let mut env = self
+ .jvm
+ .attach_current_thread()
+ .map_err(|e| io::Error::other(e.to_string()))?;
+ unsafe {
+ env.call_method_unchecked(
+ &self.stream_ref,
+ self.flush_mid,
+
jni::signature::ReturnType::Primitive(jni::signature::Primitive::Void),
+ &[],
+ )
+ .map_err(|e| io::Error::other(e.to_string()))?;
+ }
+ Ok(())
+ }
+
+ fn pos(&self) -> u64 {
+ self.pos
+ }
+}
+
+// ======================== JniInputFile ========================
+
+struct JniInputFile {
+ jvm: Arc<JavaVM>,
+ input_file_ref: GlobalRef,
+}
+
+unsafe impl Send for JniInputFile {}
+unsafe impl Sync for JniInputFile {}
+
+impl InputFile for JniInputFile {
+ fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+ if buf.is_empty() {
+ return Ok(());
+ }
+ let mut env = self
+ .jvm
+ .attach_current_thread()
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ let java_buf = env
+ .new_byte_array(buf.len() as i32)
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ env.call_method(
+ &self.input_file_ref,
+ "readFully",
+ "(J[BII)V",
+ &[
+ JValue::Long(offset as jlong),
+ JValue::Object(&java_buf),
+ JValue::Int(0),
+ JValue::Int(buf.len() as jint),
+ ],
+ )
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ let i8_buf: &mut [i8] =
+ unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut
i8, buf.len()) };
+ env.get_byte_array_region(&java_buf, 0, i8_buf)
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ Ok(())
+ }
+}
+
+struct ReaderHandle {
+ reader: Box<dyn ReaderAccess>,
+ _input_file_ref: Option<GlobalRef>,
+}
+
+fn bytemuck_cast(data: &[u8]) -> &[i8] {
+ unsafe { std::slice::from_raw_parts(data.as_ptr() as *const i8,
data.len()) }
+}
+
+fn throw(env: &mut JNIEnv, msg: &str) {
+ let _ = env.throw_new("java/lang/RuntimeException", msg);
+}
+
+struct WriterHandle {
+ inner: MosaicWriter<JniOutputFile>,
+ _stream_ref: GlobalRef,
+}
+
+// ======================== Writer ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterOpen(
+ mut env: JNIEnv,
+ _class: JClass,
+ stream: JObject,
+ arrow_schema_addr: jlong,
+ num_buckets: jint,
+ compression: jint,
+ zstd_level: jint,
+ row_group_max_size: jlong,
+ max_dict_total_bytes: jint,
+ max_dict_entries: jint,
+ stats_columns: jni::objects::JIntArray,
+ page_size_threshold: jint,
+) -> jlong {
+ let raw_env = env.get_raw();
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if arrow_schema_addr == 0 {
+ throw(&mut env, "null Arrow schema address");
+ return 0;
+ }
+
+ let ffi_schema = unsafe { &*(arrow_schema_addr as *const
FFI_ArrowSchema) };
+ let arrow_schema = match Schema::try_from(ffi_schema) {
+ Ok(s) => s,
+ Err(e) => {
+ throw(&mut env, &format!("Arrow schema import failed: {}", e));
+ return 0;
+ }
+ };
+
+ let stream_global = match env.new_global_ref(&stream) {
+ Ok(g) => g,
+ Err(e) => {
+ throw(&mut env, &format!("failed to create global ref: {}",
e));
+ return 0;
+ }
+ };
+
+ let write_mid = match env.get_method_id("java/io/OutputStream",
"write", "([BII)V") {
+ Ok(m) => m,
+ Err(e) => {
+ throw(&mut env, &format!("cannot find OutputStream.write: {}",
e));
+ return 0;
+ }
+ };
+ let flush_mid = match env.get_method_id("java/io/OutputStream",
"flush", "()V") {
+ Ok(m) => m,
+ Err(e) => {
+ throw(&mut env, &format!("cannot find OutputStream.flush: {}",
e));
+ return 0;
+ }
+ };
+
+ let jvm = match env.get_java_vm() {
+ Ok(vm) => Arc::new(vm),
+ Err(e) => {
+ throw(&mut env, &format!("cannot get JavaVM: {}", e));
+ return 0;
+ }
+ };
+
+ let jni_stream = JniOutputFile {
+ jvm,
+ stream_ref: stream_global.clone(),
+ write_mid,
+ flush_mid,
+ pos: 0,
+ cached_array: None,
+ cached_array_len: 0,
+ };
+
+ let stats_cols: Vec<usize> = match
env.get_array_length(&stats_columns) {
+ Ok(len) if len > 0 => {
+ let mut buf = vec![0i32; len as usize];
+ if env
+ .get_int_array_region(&stats_columns, 0, &mut buf)
+ .is_ok()
+ {
+ buf.iter().map(|&v| v as usize).collect()
+ } else {
+ Vec::new()
+ }
+ }
+ _ => Vec::new(),
+ };
+
+ let buckets = if num_buckets <= 0 {
+ DEFAULT_NUM_BUCKETS
+ } else {
+ num_buckets as usize
+ };
+
+ let opts = WriterOptions {
+ compression: compression as u8,
+ zstd_level,
+ num_buckets: buckets,
+ row_group_max_size: row_group_max_size as u64,
+ max_dict_total_bytes: max_dict_total_bytes as usize,
+ max_dict_entries: max_dict_entries as usize,
+ stats_columns: stats_cols,
+ page_size_threshold: page_size_threshold as usize,
+ };
+
+ let writer = match MosaicWriter::new(jni_stream, &arrow_schema, opts) {
+ Ok(w) => w,
+ Err(e) => {
+ throw(&mut env, &format!("writer open failed: {}", e));
+ return 0;
+ }
+ };
+ let handle = Box::new(WriterHandle {
+ inner: writer,
+ _stream_ref: stream_global,
+ });
+ Box::into_raw(handle) as jlong
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+ throw(&mut env, &panic_message(&e));
+ 0
+ }
+ }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterClose(
+ mut env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) {
+ let raw_env = env.get_raw();
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle == 0 {
+ return;
+ }
+ let writer = unsafe { &mut *(handle as *mut WriterHandle) };
+ if let Err(e) = writer.inner.close() {
+ throw(&mut env, &format!("close failed: {}", e));
+ }
+ }));
+ if let Err(e) = result {
+ let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+ throw(&mut env, &panic_message(&e));
+ }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterFree(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) {
+ if handle != 0 {
+ unsafe { drop(Box::from_raw(handle as *mut WriterHandle)) };
+ }
+}
+
+// ======================== Writer.estimatedSize ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterEstimatedSize(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) -> jlong {
+ if handle == 0 {
+ return 0;
+ }
+ let writer = unsafe { &*(handle as *const WriterHandle) };
+ writer.inner.estimated_file_size() as jlong
+}
+
+// ======================== Writer.writeBatch (Arrow C Data Interface)
========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterWriteBatch(
+ mut env: JNIEnv,
+ _class: JClass,
+ writer_handle: jlong,
+ array_addr: jlong,
+ schema_addr: jlong,
+) {
+ let raw_env = env.get_raw();
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if writer_handle == 0 {
+ throw(&mut env, "null writer handle");
+ return;
+ }
+ if array_addr == 0 || schema_addr == 0 {
+ throw(&mut env, "null ArrowArray or ArrowSchema address");
+ return;
+ }
+ let writer = unsafe { &mut *(writer_handle as *mut WriterHandle) };
+
+ let ffi_array = array_addr as *mut FFI_ArrowArray;
+ let ffi_schema = schema_addr as *mut FFI_ArrowSchema;
+
+ let arr_data = match unsafe {
+ arrow_array::ffi::from_ffi(ptr::read(ffi_array),
&ptr::read(ffi_schema))
+ } {
+ Ok(d) => d,
+ Err(e) => {
+ throw(&mut env, &format!("Arrow import failed: {}", e));
+ return;
+ }
+ };
+
+ unsafe {
+ ptr::write(ffi_array, std::mem::zeroed());
+ ptr::write(ffi_schema, std::mem::zeroed());
+ }
+
+ let struct_array = StructArray::from(arr_data);
+ let batch = RecordBatch::from(struct_array);
+ if let Err(e) = writer.inner.write_batch(&batch) {
+ throw(&mut env, &format!("write_batch failed: {}", e));
+ }
+ }));
+ if let Err(e) = result {
+ let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+ throw(&mut env, &panic_message(&e));
+ }
+}
+
+// ======================== Reader ========================
+
+struct RowGroupReaderHandle {
+ inner: RowGroupReader,
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderOpen(
+ mut env: JNIEnv,
+ _class: JClass,
+ input_file: JObject,
+ file_length: jlong,
+) -> jlong {
+ let raw_env = env.get_raw();
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ let global = match env.new_global_ref(&input_file) {
+ Ok(g) => g,
+ Err(e) => {
+ throw(&mut env, &format!("failed to create global ref: {}",
e));
+ return 0;
+ }
+ };
+
+ let length = file_length as u64;
+
+ let jvm = match env.get_java_vm() {
+ Ok(vm) => Arc::new(vm),
+ Err(e) => {
+ throw(&mut env, &format!("cannot get JavaVM: {}", e));
+ return 0;
+ }
+ };
+
+ let input = JniInputFile {
+ jvm,
+ input_file_ref: global.clone(),
+ };
+
+ match MosaicReader::new(input, length) {
+ Ok(reader) => {
+ let rh = ReaderHandle {
+ reader: Box::new(reader),
+ _input_file_ref: Some(global),
+ };
+ Box::into_raw(Box::new(rh)) as jlong
+ }
+ Err(e) => {
+ throw(&mut env, &format!("open failed: {}", e));
+ 0
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+ throw(&mut env, &panic_message(&e));
+ 0
+ }
+ }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderFree(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) {
+ if handle != 0 {
+ unsafe { drop(Box::from_raw(handle as *mut ReaderHandle)) };
+ }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderExportSchema(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ schema_addr: jlong,
+) -> jint {
+ if handle == 0 || schema_addr == 0 {
+ return -1;
+ }
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ let reader = &*rh.reader;
+ let schema = reader.schema();
+ let fields: Vec<arrow_schema::Field> = schema
+ .columns
+ .iter()
+ .map(|c| arrow_schema::Field::new(&c.name, c.data_type.clone(),
c.nullable))
+ .collect();
+ let arrow_schema = Schema::new(fields);
+ match FFI_ArrowSchema::try_from(&arrow_schema) {
+ Ok(ffi_schema) => {
+ unsafe {
+ ptr::write(schema_addr as *mut FFI_ArrowSchema,
ffi_schema);
+ }
+ 0
+ }
+ Err(_) => -1,
+ }
+ }));
+ result.unwrap_or(-1)
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderNumRowGroups(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) -> jint {
+ if handle == 0 {
+ return 0;
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ let reader = &*rh.reader;
+ reader.num_row_groups() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderOpenRowGroup(
+ mut env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+) -> jlong {
+ let raw_env = env.get_raw();
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle == 0 {
+ throw(&mut env, "null reader handle");
+ return 0;
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ match rh.reader.row_group_reader(rg_index as usize) {
+ Ok(rg) => {
+ let rg_handle = Box::new(RowGroupReaderHandle { inner: rg });
+ Box::into_raw(rg_handle) as jlong
+ }
+ Err(e) => {
+ throw(&mut env, &format!("open row group failed: {}", e));
+ 0
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+ throw(&mut env, &panic_message(&e));
+ 0
+ }
+ }
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_io_mosaic_NativeLib_nativeReaderOpenRowGroupProjected(
+ mut env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+ columns: JIntArray,
+) -> jlong {
+ let raw_env = env.get_raw();
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle == 0 {
+ throw(&mut env, "null reader handle");
+ return 0;
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ let col_indices: Vec<usize> = match env.get_array_length(&columns) {
+ Ok(len) if len > 0 => {
+ let mut buf = vec![0i32; len as usize];
+ if env.get_int_array_region(&columns, 0, &mut buf).is_ok() {
+ buf.iter().map(|&v| v as usize).collect()
+ } else {
+ throw(&mut env, "failed to read columns array");
+ return 0;
+ }
+ }
+ _ => Vec::new(),
+ };
+ match rh
+ .reader
+ .row_group_reader_projected(rg_index as usize, &col_indices)
+ {
+ Ok(rg) => {
+ let rg_handle = Box::new(RowGroupReaderHandle { inner: rg });
+ Box::into_raw(rg_handle) as jlong
+ }
+ Err(e) => {
+ throw(&mut env, &format!("open row group projected failed:
{}", e));
+ 0
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+ throw(&mut env, &panic_message(&e));
+ 0
+ }
+ }
+}
+
+// ======================== RowGroupReader ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeRowGroupReaderNumRows(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) -> jint {
+ if handle == 0 {
+ return 0;
+ }
+ let rg = unsafe { &*(handle as *const RowGroupReaderHandle) };
+ rg.inner.num_rows() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeRowGroupReaderFree(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+) {
+ if handle != 0 {
+ unsafe { drop(Box::from_raw(handle as *mut RowGroupReaderHandle)) };
+ }
+}
+
+// ======================== Row Group Stats ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderRowGroupNumStats(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+) -> jint {
+ if handle == 0 {
+ return 0;
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ match rh.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s.len() as jint,
+ Err(_) => -1,
+ }
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatColumnIndex(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> jint {
+ if handle == 0 {
+ return -1;
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ let stats = match rh.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(_) => return -1,
+ };
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return -1;
+ }
+ stats[idx].column_index as jint
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatNullCount(
+ _env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> jlong {
+ if handle == 0 {
+ return 0;
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ let stats = match rh.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(_) => return -1,
+ };
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return 0;
+ }
+ stats[idx].null_count as jlong
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatMin<'a>(
+ env: JNIEnv<'a>,
+ _class: JClass<'a>,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> JByteArray<'a> {
+ if handle == 0 {
+ return JByteArray::default();
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ let stats = match rh.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(_) => return JByteArray::default(),
+ };
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return JByteArray::default();
+ }
+ let buf = match &stats[idx].min {
+ Some(v) => v.to_be_bytes(),
+ None => return JByteArray::default(),
+ };
+ if buf.is_empty() {
+ return JByteArray::default();
+ }
+ env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
+#[no_mangle]
+pub extern "system" fn
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatMax<'a>(
+ env: JNIEnv<'a>,
+ _class: JClass<'a>,
+ handle: jlong,
+ rg_index: jint,
+ stat_index: jint,
+) -> JByteArray<'a> {
+ if handle == 0 {
+ return JByteArray::default();
+ }
+ let rh = unsafe { &*(handle as *const ReaderHandle) };
+ let stats = match rh.reader.row_group_stats(rg_index as usize) {
+ Ok(s) => s,
+ Err(_) => return JByteArray::default(),
+ };
+ let idx = stat_index as usize;
+ if idx >= stats.len() {
+ return JByteArray::default();
+ }
+ let buf = match &stats[idx].max {
+ Some(v) => v.to_be_bytes(),
+ None => return JByteArray::default(),
+ };
+ if buf.is_empty() {
+ return JByteArray::default();
+ }
+ env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
+// ======================== Columnar Read (Arrow C Data Interface)
========================
+
+#[no_mangle]
+pub extern "system" fn
Java_io_mosaic_NativeLib_nativeRowGroupReaderReadColumns(
+ mut env: JNIEnv,
+ _class: JClass,
+ handle: jlong,
+ array_addr: jlong,
+ schema_addr: jlong,
+) -> jint {
+ let raw_env = env.get_raw();
+ let result = panic::catch_unwind(AssertUnwindSafe(|| {
+ if handle == 0 {
+ throw(&mut env, "null handle");
+ return -1;
+ }
+ if array_addr == 0 || schema_addr == 0 {
+ throw(&mut env, "null ArrowArray or ArrowSchema address");
+ return -1;
+ }
+ let rg = unsafe { &mut *(handle as *mut RowGroupReaderHandle) };
+ let batch = match rg.inner.read_columns() {
+ Ok(b) => b,
+ Err(e) => {
+ throw(&mut env, &format!("read_columns failed: {}", e));
+ return -1;
+ }
+ };
+
+ let struct_array = StructArray::from(batch);
+ match arrow_array::ffi::to_ffi(&struct_array.into()) {
+ Ok((ffi_array, ffi_schema)) => {
+ unsafe {
+ ptr::write(array_addr as *mut FFI_ArrowArray, ffi_array);
+ ptr::write(schema_addr as *mut FFI_ArrowSchema,
ffi_schema);
+ }
+ 0
+ }
+ Err(e) => {
+ throw(&mut env, &format!("Arrow export failed: {}", e));
+ -1
+ }
+ }
+ }));
+ match result {
+ Ok(val) => val,
+ Err(e) => {
+ let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+ throw(&mut env, &panic_message(&e));
+ -1
+ }
+ }
+}