This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new 35b5bfb  Add Java/JNI bindings and CI test job (#6)
35b5bfb is described below

commit 35b5bfbec6d6bcf05cbf63721187a0af49610a55
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 11:06:37 2026 +0800

    Add Java/JNI bindings and CI test job (#6)
    
    Enabling Java applications to use MosaicWriter/MosaicReader via Arrow C 
Data Interface over JNI.
---
 .github/workflows/ci.yml                           |  29 +
 Cargo.toml                                         |   2 +-
 java/pom.xml                                       |  73 ++
 java/src/main/java/io/mosaic/ColumnStatistics.java |  55 ++
 java/src/main/java/io/mosaic/InputFile.java        |  33 +
 java/src/main/java/io/mosaic/MosaicReader.java     | 123 +++
 java/src/main/java/io/mosaic/MosaicWriter.java     |  87 +++
 java/src/main/java/io/mosaic/NativeLib.java        |  62 ++
 java/src/main/java/io/mosaic/WriterOptions.java    |  85 +++
 .../test/java/io/mosaic/MosaicRoundtripTest.java   | 670 +++++++++++++++++
 Cargo.toml => jni/Cargo.toml                       |  18 +-
 jni/src/lib.rs                                     | 832 +++++++++++++++++++++
 12 files changed, 2064 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 70b4fec..53ee1d3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -102,3 +102,32 @@ jobs:
         run: cpp/build/test_mosaic
         env:
           LD_LIBRARY_PATH: ${{ github.workspace }}/target/release
+
+  java-test:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v6
+
+      - name: Rust Cache
+        uses: actions/cache@v5
+        with:
+          path: |
+            ~/.cargo/registry
+            ~/.cargo/git
+            target
+          key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+          restore-keys: |
+            ${{ runner.os }}-cargo-
+
+      - name: Set up JDK 11
+        uses: actions/setup-java@v4
+        with:
+          java-version: '11'
+          distribution: 'temurin'
+
+      - name: Build JNI library
+        run: cargo build -p mosaic-jni --release
+
+      - name: Run Java tests
+        working-directory: java
+        run: mvn test "-DargLine=-Djava.library.path=${{ github.workspace 
}}/target/release"
diff --git a/Cargo.toml b/Cargo.toml
index 39c103c..240d079 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [workspace]
-members = ["core", "ffi"]
+members = ["core", "ffi", "jni"]
 resolver = "2"
 
 [profile.release]
diff --git a/java/pom.xml b/java/pom.xml
new file mode 100644
index 0000000..f295fcc
--- /dev/null
+++ b/java/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>io.mosaic</groupId>
+    <artifactId>mosaic-writer</artifactId>
+    <version>0.1.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>Mosaic Writer</name>
+    <description>Mosaic file format writer for Java (backed by Rust via 
JNI)</description>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <arrow.version>18.1.0</arrow.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.arrow</groupId>
+                <artifactId>arrow-bom</artifactId>
+                <version>${arrow.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-vector</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-netty</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-c-data</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.13.2</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/java/src/main/java/io/mosaic/ColumnStatistics.java 
b/java/src/main/java/io/mosaic/ColumnStatistics.java
new file mode 100644
index 0000000..e4733d7
--- /dev/null
+++ b/java/src/main/java/io/mosaic/ColumnStatistics.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+public class ColumnStatistics {
+
+    private final int columnIndex;
+    private final long nullCount;
+    private final byte[] min;
+    private final byte[] max;
+
+    ColumnStatistics(int columnIndex, long nullCount, byte[] min, byte[] max) {
+        this.columnIndex = columnIndex;
+        this.nullCount = nullCount;
+        this.min = min;
+        this.max = max;
+    }
+
+    public int getColumnIndex() {
+        return columnIndex;
+    }
+
+    public long getNullCount() {
+        return nullCount;
+    }
+
+    public boolean hasMinMax() {
+        return min != null;
+    }
+
+    public byte[] getMin() {
+        return min;
+    }
+
+    public byte[] getMax() {
+        return max;
+    }
+}
diff --git a/java/src/main/java/io/mosaic/InputFile.java 
b/java/src/main/java/io/mosaic/InputFile.java
new file mode 100644
index 0000000..9808d1f
--- /dev/null
+++ b/java/src/main/java/io/mosaic/InputFile.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.IOException;
+
+public interface InputFile {
+
+    /**
+     * Read {@code length} bytes starting at {@code position} into {@code 
buffer}.
+     *
+     * <p>This method must be thread-safe: the reader may call it concurrently
+     * from multiple threads to perform parallel IO.
+     */
+    void readFully(long position, byte[] buffer, int offset, int length) 
throws IOException;
+}
diff --git a/java/src/main/java/io/mosaic/MosaicReader.java 
b/java/src/main/java/io/mosaic/MosaicReader.java
new file mode 100644
index 0000000..95d8f47
--- /dev/null
+++ b/java/src/main/java/io/mosaic/MosaicReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class MosaicReader implements AutoCloseable {
+
+    private long handle;
+    private final Schema schema;
+
+    private MosaicReader(long handle, BufferAllocator allocator) {
+        this.handle = handle;
+        try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
+            int rc = NativeLib.nativeReaderExportSchema(handle, 
cSchema.memoryAddress());
+            if (rc != 0) {
+                throw new RuntimeException("failed to export schema");
+            }
+            this.schema = Data.importSchema(allocator, cSchema, null);
+        }
+    }
+
+    public static MosaicReader open(InputFile inputFile, long fileLength, 
BufferAllocator allocator) {
+        long h = NativeLib.nativeReaderOpen(inputFile, fileLength);
+        if (h == 0) {
+            throw new RuntimeException("failed to open reader");
+        }
+        return new MosaicReader(h, allocator);
+    }
+
+    public Schema getSchema() {
+        return schema;
+    }
+
+    public int numRowGroups() {
+        return NativeLib.nativeReaderNumRowGroups(handle);
+    }
+
+    public VectorSchemaRoot readRowGroup(int rgIndex, BufferAllocator 
allocator) {
+        long rgHandle = NativeLib.nativeReaderOpenRowGroup(handle, rgIndex);
+        if (rgHandle == 0) {
+            throw new RuntimeException("failed to open row group " + rgIndex);
+        }
+        try {
+            return readRowGroupHandle(rgHandle, allocator);
+        } finally {
+            NativeLib.nativeRowGroupReaderFree(rgHandle);
+        }
+    }
+
+    public VectorSchemaRoot readRowGroup(int rgIndex, int[] columns, 
BufferAllocator allocator) {
+        long rgHandle = NativeLib.nativeReaderOpenRowGroupProjected(handle, 
rgIndex, columns);
+        if (rgHandle == 0) {
+            throw new RuntimeException("failed to open row group " + rgIndex);
+        }
+        try {
+            return readRowGroupHandle(rgHandle, allocator);
+        } finally {
+            NativeLib.nativeRowGroupReaderFree(rgHandle);
+        }
+    }
+
+    private VectorSchemaRoot readRowGroupHandle(long rgHandle, BufferAllocator 
allocator) {
+        try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
+             ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator)) {
+            int rc = NativeLib.nativeRowGroupReaderReadColumns(
+                    rgHandle, arrowArray.memoryAddress(), 
arrowSchema.memoryAddress());
+            if (rc != 0) {
+                throw new RuntimeException("readColumns failed");
+            }
+            return Data.importVectorSchemaRoot(allocator, arrowArray, 
arrowSchema, null);
+        }
+    }
+
+    public List<ColumnStatistics> getRowGroupStatistics(int rgIndex) {
+        int n = NativeLib.nativeReaderRowGroupNumStats(handle, rgIndex);
+        if (n < 0) {
+            throw new RuntimeException("failed to get row group statistics for 
index " + rgIndex);
+        }
+        List<ColumnStatistics> result = new ArrayList<>(n);
+        for (int i = 0; i < n; i++) {
+            result.add(new ColumnStatistics(
+                    NativeLib.nativeReaderRowGroupStatColumnIndex(handle, 
rgIndex, i),
+                    NativeLib.nativeReaderRowGroupStatNullCount(handle, 
rgIndex, i),
+                    NativeLib.nativeReaderRowGroupStatMin(handle, rgIndex, i),
+                    NativeLib.nativeReaderRowGroupStatMax(handle, rgIndex, 
i)));
+        }
+        return result;
+    }
+
+    @Override
+    public void close() {
+        if (handle != 0) {
+            NativeLib.nativeReaderFree(handle);
+            handle = 0;
+        }
+    }
+}
diff --git a/java/src/main/java/io/mosaic/MosaicWriter.java 
b/java/src/main/java/io/mosaic/MosaicWriter.java
new file mode 100644
index 0000000..09f6a3a
--- /dev/null
+++ b/java/src/main/java/io/mosaic/MosaicWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.OutputStream;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class MosaicWriter implements AutoCloseable {
+
+    private long handle;
+    private boolean closed;
+    private final BufferAllocator allocator;
+
+    public MosaicWriter(OutputStream outputStream, Schema arrowSchema, 
BufferAllocator allocator) {
+        this(outputStream, arrowSchema, new WriterOptions(), allocator);
+    }
+
+    public MosaicWriter(OutputStream outputStream, Schema arrowSchema, 
WriterOptions options, BufferAllocator allocator) {
+        this.allocator = allocator;
+        try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
+            Data.exportSchema(allocator, arrowSchema, null, cSchema);
+            this.handle = NativeLib.nativeWriterOpen(
+                    outputStream,
+                    cSchema.memoryAddress(),
+                    options.getNumBuckets(),
+                    options.getCompression(),
+                    options.getZstdLevel(),
+                    options.getRowGroupMaxSize(),
+                    options.getMaxDictTotalBytes(),
+                    options.getMaxDictEntries(),
+                    options.getStatsColumns(),
+                    options.getPageSizeThreshold());
+            cSchema.release();
+        }
+        if (this.handle == 0) {
+            throw new RuntimeException("failed to open writer");
+        }
+    }
+
+    public void write(VectorSchemaRoot root) {
+        try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
+             ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator)) {
+            Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, 
arrowSchema);
+            NativeLib.nativeWriterWriteBatch(handle, 
arrowArray.memoryAddress(), arrowSchema.memoryAddress());
+        }
+    }
+
+    public long estimatedFileSize() {
+        return NativeLib.nativeWriterEstimatedSize(handle);
+    }
+
+    @Override
+    public void close() {
+        if (!closed && handle != 0) {
+            closed = true;
+            try {
+                NativeLib.nativeWriterClose(handle);
+            } finally {
+                NativeLib.nativeWriterFree(handle);
+                handle = 0;
+            }
+        }
+    }
+}
diff --git a/java/src/main/java/io/mosaic/NativeLib.java 
b/java/src/main/java/io/mosaic/NativeLib.java
new file mode 100644
index 0000000..1805269
--- /dev/null
+++ b/java/src/main/java/io/mosaic/NativeLib.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.OutputStream;
+
+final class NativeLib {
+
+    static {
+        System.loadLibrary("mosaic_jni");
+    }
+
+    private NativeLib() {}
+
+    // Writer
+    static native long nativeWriterOpen(OutputStream stream, long 
arrowSchemaAddr,
+                                        int numBuckets, int compression, int 
zstdLevel,
+                                        long rowGroupMaxSize, int 
maxDictTotalBytes,
+                                        int maxDictEntries, int[] statsColumns,
+                                        int pageSizeThreshold);
+    static native void nativeWriterClose(long handle);
+    static native void nativeWriterFree(long handle);
+    static native long nativeWriterEstimatedSize(long handle);
+    static native void nativeWriterWriteBatch(long writerHandle, long 
arrayAddr, long schemaAddr);
+
+    // Reader
+    static native long nativeReaderOpen(Object inputFile, long fileLength);
+    static native void nativeReaderFree(long handle);
+    static native int nativeReaderExportSchema(long handle, long schemaAddr);
+    static native int nativeReaderNumRowGroups(long handle);
+    static native long nativeReaderOpenRowGroup(long handle, int rgIndex);
+    static native long nativeReaderOpenRowGroupProjected(long handle, int 
rgIndex, int[] columns);
+
+    // RowGroupReader
+    static native int nativeRowGroupReaderNumRows(long handle);
+    static native int nativeRowGroupReaderReadColumns(long handle, long 
arrayAddr, long schemaAddr);
+    static native void nativeRowGroupReaderFree(long handle);
+
+    // Row group stats
+    static native int nativeReaderRowGroupNumStats(long handle, int rgIndex);
+    static native int nativeReaderRowGroupStatColumnIndex(long handle, int 
rgIndex, int statIndex);
+    static native long nativeReaderRowGroupStatNullCount(long handle, int 
rgIndex, int statIndex);
+    static native byte[] nativeReaderRowGroupStatMin(long handle, int rgIndex, 
int statIndex);
+    static native byte[] nativeReaderRowGroupStatMax(long handle, int rgIndex, 
int statIndex);
+}
diff --git a/java/src/main/java/io/mosaic/WriterOptions.java 
b/java/src/main/java/io/mosaic/WriterOptions.java
new file mode 100644
index 0000000..8b8d687
--- /dev/null
+++ b/java/src/main/java/io/mosaic/WriterOptions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+public class WriterOptions {
+
+    public static final int COMPRESSION_ZSTD = 1;
+
+    private int compression = COMPRESSION_ZSTD;
+    private int zstdLevel = 1;
+    private int numBuckets = 0;
+    private long rowGroupMaxSize = 256L * 1024 * 1024;
+    private int maxDictTotalBytes = 32 * 1024;
+    private int maxDictEntries = 255;
+    private int[] statsColumns = new int[0];
+    private int pageSizeThreshold = 32 * 1024;
+
+    public WriterOptions() {}
+
+    public WriterOptions compression(int compression) {
+        this.compression = compression;
+        return this;
+    }
+
+    public WriterOptions zstdLevel(int level) {
+        this.zstdLevel = level;
+        return this;
+    }
+
+    public WriterOptions numBuckets(int numBuckets) {
+        this.numBuckets = numBuckets;
+        return this;
+    }
+
+    public WriterOptions rowGroupMaxSize(long size) {
+        this.rowGroupMaxSize = size;
+        return this;
+    }
+
+    public WriterOptions maxDictTotalBytes(int bytes) {
+        this.maxDictTotalBytes = bytes;
+        return this;
+    }
+
+    public WriterOptions maxDictEntries(int entries) {
+        this.maxDictEntries = entries;
+        return this;
+    }
+
+    public WriterOptions statsColumns(int... columns) {
+        this.statsColumns = columns.clone();
+        return this;
+    }
+
+    public WriterOptions pageSizeThreshold(int threshold) {
+        this.pageSizeThreshold = threshold;
+        return this;
+    }
+
+    int getCompression() { return compression; }
+    int getZstdLevel() { return zstdLevel; }
+    int getNumBuckets() { return numBuckets; }
+    long getRowGroupMaxSize() { return rowGroupMaxSize; }
+    int getMaxDictTotalBytes() { return maxDictTotalBytes; }
+    int getMaxDictEntries() { return maxDictEntries; }
+    int[] getStatsColumns() { return statsColumns; }
+    int getPageSizeThreshold() { return pageSizeThreshold; }
+}
diff --git a/java/src/test/java/io/mosaic/MosaicRoundtripTest.java 
b/java/src/test/java/io/mosaic/MosaicRoundtripTest.java
new file mode 100644
index 0000000..7fd777a
--- /dev/null
+++ b/java/src/test/java/io/mosaic/MosaicRoundtripTest.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.mosaic;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MosaicRoundtripTest {
+
+    private BufferAllocator allocator;
+
+    @Before
+    public void setUp() {
+        allocator = new RootAllocator();
+    }
+
+    @After
+    public void tearDown() {
+        allocator.close();
+    }
+
+    private byte[] writeToBytes(Schema schema, 
java.util.function.Consumer<MosaicWriter> writeFn) {
+        return writeToBytes(schema, new WriterOptions(), writeFn);
+    }
+
+    private byte[] writeToBytes(Schema schema, WriterOptions opts, 
java.util.function.Consumer<MosaicWriter> writeFn) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (MosaicWriter writer = new MosaicWriter(baos, schema, opts, 
allocator)) {
+            writeFn.accept(writer);
+        }
+        return baos.toByteArray();
+    }
+
+    private MosaicReader readerFromBytes(byte[] data) {
+        InputFile inputFile = (position, buffer, offset, length) -> {
+            System.arraycopy(data, (int) position, buffer, offset, length);
+        };
+        return MosaicReader.open(inputFile, data.length, allocator);
+    }
+
+    @Test
+    public void testBasicRoundtrip() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.notNullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("name", ArrowType.Utf8.INSTANCE),
+                Field.nullable("score", new 
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            VarCharVector names = (VarCharVector) root.getVector("name");
+            Float8Vector scores = (Float8Vector) root.getVector("score");
+
+            ids.allocateNew(50);
+            names.allocateNew(50);
+            scores.allocateNew(50);
+
+            for (int i = 0; i < 50; i++) {
+                ids.set(i, i);
+                names.setSafe(i, ("user_" + i).getBytes());
+                scores.set(i, i * 1.5);
+            }
+            root.setRowCount(50);
+
+            data = writeToBytes(arrowSchema, new 
WriterOptions().numBuckets(2), writer -> writer.write(root));
+        }
+
+        assertTrue(data.length > 32);
+        assertEquals('M', data[data.length - 4]);
+        assertEquals('O', data[data.length - 3]);
+        assertEquals('S', data[data.length - 2]);
+        assertEquals('A', data[data.length - 1]);
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            assertEquals(3, reader.getSchema().getFields().size());
+            assertTrue(reader.numRowGroups() >= 1);
+
+            int idCol = 
reader.getSchema().getFields().indexOf(reader.getSchema().findField("id"));
+            int nameCol = 
reader.getSchema().getFields().indexOf(reader.getSchema().findField("name"));
+            int scoreCol = 
reader.getSchema().getFields().indexOf(reader.getSchema().findField("score"));
+            assertTrue(idCol >= 0);
+            assertTrue(nameCol >= 0);
+            assertTrue(scoreCol >= 0);
+
+            int totalRows = 0;
+            for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+                try (VectorSchemaRoot batch = reader.readRowGroup(rg, 
allocator)) {
+                    int rows = batch.getRowCount();
+                    totalRows += rows;
+
+                    IntVector readIds = (IntVector) batch.getVector(idCol);
+                    VarCharVector readNames = (VarCharVector) 
batch.getVector(nameCol);
+                    Float8Vector readScores = (Float8Vector) 
batch.getVector(scoreCol);
+
+                    for (int i = 0; i < rows; i++) {
+                        int id = readIds.get(i);
+                        String name = new String(readNames.get(i));
+                        double score = readScores.get(i);
+                        assertEquals("user_" + id, name);
+                        assertEquals(id * 1.5, score, 1e-9);
+                    }
+                }
+            }
+            assertEquals(50, totalRows);
+        }
+    }
+
+    @Test
+    public void testNullValues() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("name", ArrowType.Utf8.INSTANCE),
+                Field.nullable("value", new 
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            VarCharVector names = (VarCharVector) root.getVector("name");
+            Float8Vector values = (Float8Vector) root.getVector("value");
+
+            ids.allocateNew(3);
+            names.allocateNew(3);
+            values.allocateNew(3);
+
+            ids.set(0, 1);
+            names.setSafe(0, "hello".getBytes());
+            values.set(0, 1.0);
+
+            ids.set(1, 2);
+            names.setNull(1);
+            values.set(1, 2.0);
+
+            ids.set(2, 3);
+            names.setSafe(2, "world".getBytes());
+            values.setNull(2);
+
+            root.setRowCount(3);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            int nameCol = 
reader.getSchema().getFields().indexOf(reader.getSchema().findField("name"));
+            int valueCol = 
reader.getSchema().getFields().indexOf(reader.getSchema().findField("value"));
+
+            for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+                try (VectorSchemaRoot batch = reader.readRowGroup(rg, 
allocator)) {
+                    assertEquals(3, batch.getRowCount());
+
+                    VarCharVector readNames = (VarCharVector) 
batch.getVector(nameCol);
+                    Float8Vector readValues = (Float8Vector) 
batch.getVector(valueCol);
+
+                    assertFalse(readNames.isNull(0));
+                    assertEquals("hello", new String(readNames.get(0)));
+
+                    assertTrue(readNames.isNull(1));
+
+                    assertFalse(readNames.isNull(2));
+                    assertEquals("world", new String(readNames.get(2)));
+
+                    assertFalse(readValues.isNull(0));
+                    assertTrue(readValues.isNull(2));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testProjection() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("a", new ArrowType.Int(32, true)),
+                Field.nullable("b", ArrowType.Utf8.INSTANCE),
+                Field.nullable("c", new 
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE)),
+                Field.nullable("d", ArrowType.Utf8.INSTANCE)
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector aVec = (IntVector) root.getVector("a");
+            VarCharVector bVec = (VarCharVector) root.getVector("b");
+            Float8Vector cVec = (Float8Vector) root.getVector("c");
+            VarCharVector dVec = (VarCharVector) root.getVector("d");
+
+            int n = 20;
+            aVec.allocateNew(n);
+            bVec.allocateNew(n);
+            cVec.allocateNew(n);
+            dVec.allocateNew(n);
+
+            for (int i = 0; i < n; i++) {
+                aVec.set(i, i);
+                bVec.setSafe(i, ("val_" + i).getBytes());
+                cVec.set(i, (double) i);
+                dVec.setSafe(i, ("extra_" + i).getBytes());
+            }
+            root.setRowCount(n);
+            data = writeToBytes(arrowSchema, new 
WriterOptions().numBuckets(2), writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            int aCol = 
reader.getSchema().getFields().indexOf(reader.getSchema().findField("a"));
+            int bCol = 
reader.getSchema().getFields().indexOf(reader.getSchema().findField("b"));
+            int[] projected = {aCol, bCol};
+
+            int totalRows = 0;
+            for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+                try (VectorSchemaRoot batch = reader.readRowGroup(rg, 
projected, allocator)) {
+                    totalRows += batch.getRowCount();
+                    assertEquals(2, batch.getFieldVectors().size());
+                }
+            }
+            assertEquals(20, totalRows);
+        }
+    }
+
+    @Test
+    public void testStats() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("name", ArrowType.Utf8.INSTANCE),
+                Field.nullable("value", new 
ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 2);
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            VarCharVector names = (VarCharVector) root.getVector("name");
+            Float8Vector values = (Float8Vector) root.getVector("value");
+
+            int n = 10;
+            ids.allocateNew(n);
+            names.allocateNew(n);
+            values.allocateNew(n);
+
+            for (int i = 0; i < n; i++) {
+                ids.set(i, i * 10);
+                names.setSafe(i, ("item_" + i).getBytes());
+                values.set(i, i * 1.1);
+            }
+            root.setRowCount(n);
+            data = writeToBytes(arrowSchema, opts, writer -> 
writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+                java.util.List<ColumnStatistics> stats = 
reader.getRowGroupStatistics(rg);
+                assertTrue(stats.size() > 0);
+                for (ColumnStatistics stat : stats) {
+                    assertTrue(stat.getColumnIndex() == 0 || 
stat.getColumnIndex() == 2);
+                    assertEquals(0, stat.getNullCount());
+                    assertTrue(stat.hasMinMax());
+                    assertNotNull(stat.getMin());
+                    assertNotNull(stat.getMax());
+                    assertTrue(stat.getMin().length > 0);
+                    assertTrue(stat.getMax().length > 0);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testAllTypes() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("f_bool", ArrowType.Bool.INSTANCE),
+                Field.nullable("f_int8", new ArrowType.Int(8, true)),
+                Field.nullable("f_int16", new ArrowType.Int(16, true)),
+                Field.nullable("f_int32", new ArrowType.Int(32, true)),
+                Field.nullable("f_int64", new ArrowType.Int(64, true)),
+                Field.nullable("f_float32", new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)),
+                Field.nullable("f_float64", new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
+                Field.nullable("f_utf8", ArrowType.Utf8.INSTANCE),
+                Field.nullable("f_binary", ArrowType.Binary.INSTANCE)
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            BitVector boolVec = (BitVector) root.getVector("f_bool");
+            TinyIntVector int8Vec = (TinyIntVector) root.getVector("f_int8");
+            SmallIntVector int16Vec = (SmallIntVector) 
root.getVector("f_int16");
+            IntVector int32Vec = (IntVector) root.getVector("f_int32");
+            BigIntVector int64Vec = (BigIntVector) root.getVector("f_int64");
+            Float4Vector f32Vec = (Float4Vector) root.getVector("f_float32");
+            Float8Vector f64Vec = (Float8Vector) root.getVector("f_float64");
+            VarCharVector utf8Vec = (VarCharVector) root.getVector("f_utf8");
+            VarBinaryVector binVec = (VarBinaryVector) 
root.getVector("f_binary");
+
+            int n = 2;
+            boolVec.allocateNew(n);
+            int8Vec.allocateNew(n);
+            int16Vec.allocateNew(n);
+            int32Vec.allocateNew(n);
+            int64Vec.allocateNew(n);
+            f32Vec.allocateNew(n);
+            f64Vec.allocateNew(n);
+            utf8Vec.allocateNew(n);
+            binVec.allocateNew(n);
+
+            boolVec.set(0, 1); boolVec.set(1, 0);
+            int8Vec.set(0, 42); int8Vec.set(1, -1);
+            int16Vec.set(0, 1234); int16Vec.set(1, -5678);
+            int32Vec.set(0, 100000); int32Vec.set(1, -200000);
+            int64Vec.set(0, 9999999999L); int64Vec.set(1, -9999999999L);
+            f32Vec.set(0, 3.14f); f32Vec.set(1, -2.71f);
+            f64Vec.set(0, 2.718281828); f64Vec.set(1, -3.141592653);
+            utf8Vec.setSafe(0, "hello".getBytes()); utf8Vec.setSafe(1, 
"world".getBytes());
+            binVec.setSafe(0, new byte[]{1, 2, 3}); binVec.setSafe(1, new 
byte[]{(byte) 0xff, 0});
+
+            root.setRowCount(n);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+                assertEquals(2, batch.getRowCount());
+                assertEquals(1, ((BitVector) 
batch.getVector("f_bool")).get(0));
+                assertEquals(0, ((BitVector) 
batch.getVector("f_bool")).get(1));
+                assertEquals(42, ((TinyIntVector) 
batch.getVector("f_int8")).get(0));
+                assertEquals(-1, ((TinyIntVector) 
batch.getVector("f_int8")).get(1));
+                assertEquals(1234, ((SmallIntVector) 
batch.getVector("f_int16")).get(0));
+                assertEquals(-5678, ((SmallIntVector) 
batch.getVector("f_int16")).get(1));
+                assertEquals(100000, ((IntVector) 
batch.getVector("f_int32")).get(0));
+                assertEquals(-200000, ((IntVector) 
batch.getVector("f_int32")).get(1));
+                assertEquals(9999999999L, ((BigIntVector) 
batch.getVector("f_int64")).get(0));
+                assertEquals(-9999999999L, ((BigIntVector) 
batch.getVector("f_int64")).get(1));
+                assertEquals(3.14f, ((Float4Vector) 
batch.getVector("f_float32")).get(0), 1e-5f);
+                assertEquals(-2.71f, ((Float4Vector) 
batch.getVector("f_float32")).get(1), 1e-5f);
+                assertEquals(2.718281828, ((Float8Vector) 
batch.getVector("f_float64")).get(0), 1e-9);
+                assertEquals(-3.141592653, ((Float8Vector) 
batch.getVector("f_float64")).get(1), 1e-9);
+                assertEquals("hello", new String(((VarCharVector) 
batch.getVector("f_utf8")).get(0)));
+                assertEquals("world", new String(((VarCharVector) 
batch.getVector("f_utf8")).get(1)));
+                assertArrayEquals(new byte[]{1, 2, 3}, ((VarBinaryVector) 
batch.getVector("f_binary")).get(0));
+                assertArrayEquals(new byte[]{(byte) 0xff, 0}, 
((VarBinaryVector) batch.getVector("f_binary")).get(1));
+            }
+        }
+    }
+
+    @Test
+    public void testCompressionNone() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("x", new ArrowType.Int(32, true)),
+                Field.nullable("y", ArrowType.Utf8.INSTANCE)
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector xVec = (IntVector) root.getVector("x");
+            VarCharVector yVec = (VarCharVector) root.getVector("y");
+            int n = 20;
+            xVec.allocateNew(n);
+            yVec.allocateNew(n);
+            for (int i = 0; i < n; i++) {
+                xVec.set(i, i);
+                yVec.setSafe(i, ("v_" + i).getBytes());
+            }
+            root.setRowCount(n);
+            data = writeToBytes(arrowSchema, new 
WriterOptions().compression(0), writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+                assertEquals(20, batch.getRowCount());
+                for (int i = 0; i < 20; i++) {
+                    assertEquals(i, ((IntVector) batch.getVector("x")).get(i));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleRowGroups() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("data", new ArrowType.Int(64, true))
+        ));
+
+        WriterOptions opts = new 
WriterOptions().compression(0).numBuckets(1).rowGroupMaxSize(200);
+
+        byte[] data;
+        int totalRows = 500;
+        int batchSize = 10;
+        data = writeToBytes(arrowSchema, opts, writer -> {
+            for (int start = 0; start < totalRows; start += batchSize) {
+                try (VectorSchemaRoot root = 
VectorSchemaRoot.create(arrowSchema, allocator)) {
+                    IntVector idVec = (IntVector) root.getVector("id");
+                    BigIntVector dataVec = (BigIntVector) 
root.getVector("data");
+                    idVec.allocateNew(batchSize);
+                    dataVec.allocateNew(batchSize);
+                    for (int i = 0; i < batchSize; i++) {
+                        idVec.set(i, start + i);
+                        dataVec.set(i, (long) (start + i) * 3);
+                    }
+                    root.setRowCount(batchSize);
+                    writer.write(root);
+                }
+            }
+        });
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            assertTrue(reader.numRowGroups() > 1);
+            int offset = 0;
+            for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+                try (VectorSchemaRoot batch = reader.readRowGroup(rg, 
allocator)) {
+                    IntVector ids = (IntVector) batch.getVector("id");
+                    BigIntVector datas = (BigIntVector) 
batch.getVector("data");
+                    for (int i = 0; i < batch.getRowCount(); i++) {
+                        assertEquals(offset + i, ids.get(i));
+                        assertEquals((long) (offset + i) * 3, datas.get(i));
+                    }
+                    offset += batch.getRowCount();
+                }
+            }
+            assertEquals(500, offset);
+        }
+    }
+
+    @Test
+    public void testMultipleWrites() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("x", new ArrowType.Int(32, true))
+        ));
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (MosaicWriter writer = new MosaicWriter(baos, arrowSchema, 
allocator)) {
+            for (int start = 0; start < 30; start += 10) {
+                try (VectorSchemaRoot root = 
VectorSchemaRoot.create(arrowSchema, allocator)) {
+                    IntVector xVec = (IntVector) root.getVector("x");
+                    xVec.allocateNew(10);
+                    for (int i = 0; i < 10; i++) {
+                        xVec.set(i, start + i);
+                    }
+                    root.setRowCount(10);
+                    writer.write(root);
+                }
+            }
+        }
+        byte[] data = baos.toByteArray();
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            int totalRows = 0;
+            for (int rg = 0; rg < reader.numRowGroups(); rg++) {
+                try (VectorSchemaRoot batch = reader.readRowGroup(rg, 
allocator)) {
+                    totalRows += batch.getRowCount();
+                }
+            }
+            assertEquals(30, totalRows);
+        }
+    }
+
+    @Test
+    public void testSingleRow() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("v", new ArrowType.Int(32, true))
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector vVec = (IntVector) root.getVector("v");
+            vVec.allocateNew(1);
+            vVec.set(0, 42);
+            root.setRowCount(1);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            try (VectorSchemaRoot batch = reader.readRowGroup(0, allocator)) {
+                assertEquals(1, batch.getRowCount());
+                assertEquals(42, ((IntVector) batch.getVector("v")).get(0));
+            }
+        }
+    }
+
+    @Test
+    public void testZeroRows() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("v", new ArrowType.Int(32, true))
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            root.getVector("v").allocateNew();
+            root.setRowCount(0);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            assertEquals(0, reader.numRowGroups());
+        }
+    }
+
+    @Test
+    public void testStatsWithNulls() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("a", new ArrowType.Int(32, true)),
+                Field.nullable("b", new ArrowType.Int(64, true))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0, 
1).numBuckets(1);
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector aVec = (IntVector) root.getVector("a");
+            BigIntVector bVec = (BigIntVector) root.getVector("b");
+            aVec.allocateNew(4);
+            bVec.allocateNew(4);
+
+            aVec.set(0, 10);
+            aVec.setNull(1);
+            aVec.set(2, 5);
+            aVec.set(3, 20);
+
+            bVec.setNull(0);
+            bVec.setNull(1);
+            bVec.set(2, 100);
+            bVec.set(3, 50);
+
+            root.setRowCount(4);
+            data = writeToBytes(arrowSchema, opts, writer -> 
writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            List<ColumnStatistics> stats = reader.getRowGroupStatistics(0);
+            assertEquals(2, stats.size());
+
+            ColumnStatistics aStat = stats.stream().filter(s -> 
s.getColumnIndex() == 0).findFirst().get();
+            assertEquals(1, aStat.getNullCount());
+            assertTrue(aStat.hasMinMax());
+            int minA = 
ByteBuffer.wrap(aStat.getMin()).order(ByteOrder.BIG_ENDIAN).getInt();
+            int maxA = 
ByteBuffer.wrap(aStat.getMax()).order(ByteOrder.BIG_ENDIAN).getInt();
+            assertEquals(5, minA);
+            assertEquals(20, maxA);
+
+            ColumnStatistics bStat = stats.stream().filter(s -> 
s.getColumnIndex() == 1).findFirst().get();
+            assertEquals(2, bStat.getNullCount());
+            assertTrue(bStat.hasMinMax());
+            long minB = 
ByteBuffer.wrap(bStat.getMin()).order(ByteOrder.BIG_ENDIAN).getLong();
+            long maxB = 
ByteBuffer.wrap(bStat.getMax()).order(ByteOrder.BIG_ENDIAN).getLong();
+            assertEquals(50, minB);
+            assertEquals(100, maxB);
+        }
+    }
+
+    @Test
+    public void testStatsAllNull() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("x", new ArrowType.Int(32, true))
+        ));
+
+        WriterOptions opts = new WriterOptions().statsColumns(0).numBuckets(1);
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector xVec = (IntVector) root.getVector("x");
+            xVec.allocateNew(3);
+            xVec.setNull(0);
+            xVec.setNull(1);
+            xVec.setNull(2);
+            root.setRowCount(3);
+            data = writeToBytes(arrowSchema, opts, writer -> 
writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            List<ColumnStatistics> stats = reader.getRowGroupStatistics(0);
+            assertEquals(1, stats.size());
+            assertEquals(3, stats.get(0).getNullCount());
+            assertFalse(stats.get(0).hasMinMax());
+        }
+    }
+
+    @Test
+    public void testEstimatedFileSize() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("x", new ArrowType.Int(32, true)),
+                Field.nullable("y", ArrowType.Utf8.INSTANCE)
+        ));
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (MosaicWriter writer = new MosaicWriter(baos, arrowSchema, 
allocator)) {
+            try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+                IntVector xVec = (IntVector) root.getVector("x");
+                VarCharVector yVec = (VarCharVector) root.getVector("y");
+                int n = 100;
+                xVec.allocateNew(n);
+                yVec.allocateNew(n);
+                for (int i = 0; i < n; i++) {
+                    xVec.set(i, i);
+                    yVec.setSafe(i, ("value_" + i).getBytes());
+                }
+                root.setRowCount(n);
+                writer.write(root);
+            }
+            assertTrue(writer.estimatedFileSize() > 0);
+        }
+    }
+
+    @Test
+    public void testSchemaRoundtrip() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.notNullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("name", ArrowType.Utf8.INSTANCE),
+                Field.nullable("score", new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
+        ));
+
+        byte[] data;
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            IntVector ids = (IntVector) root.getVector("id");
+            VarCharVector names = (VarCharVector) root.getVector("name");
+            Float8Vector scores = (Float8Vector) root.getVector("score");
+            ids.allocateNew(1); names.allocateNew(1); scores.allocateNew(1);
+            ids.set(0, 1); names.setSafe(0, "x".getBytes()); scores.set(0, 
1.0);
+            root.setRowCount(1);
+            data = writeToBytes(arrowSchema, writer -> writer.write(root));
+        }
+
+        try (MosaicReader reader = readerFromBytes(data)) {
+            Schema readSchema = reader.getSchema();
+            assertEquals(3, readSchema.getFields().size());
+            assertEquals("id", readSchema.getFields().get(0).getName());
+            assertEquals("name", readSchema.getFields().get(1).getName());
+            assertEquals("score", readSchema.getFields().get(2).getName());
+            assertFalse(readSchema.getFields().get(0).isNullable());
+            assertTrue(readSchema.getFields().get(1).isNullable());
+        }
+    }
+}
diff --git a/Cargo.toml b/jni/Cargo.toml
similarity index 71%
copy from Cargo.toml
copy to jni/Cargo.toml
index 39c103c..1cf0142 100644
--- a/Cargo.toml
+++ b/jni/Cargo.toml
@@ -15,8 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = ["core", "ffi"]
-resolver = "2"
+[package]
+name = "mosaic-jni"
+version = "0.1.0"
+edition = "2021"
+description = "Mosaic file format — JNI bindings for Java"
+license = "Apache-2.0"
 
-[profile.release]
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+mosaic-core = { path = "../core" }
+jni = "0.21"
+arrow-schema = "58"
+arrow-array = { version = "58", features = ["ffi"] }
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
new file mode 100644
index 0000000..2c845c6
--- /dev/null
+++ b/jni/src/lib.rs
@@ -0,0 +1,832 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io;
+use std::panic::{self, AssertUnwindSafe};
+use std::ptr;
+use std::sync::Arc;
+
+use jni::objects::{GlobalRef, JByteArray, JClass, JIntArray, JMethodID, 
JObject, JValue};
+use jni::sys::{jint, jlong};
+use jni::JNIEnv;
+use jni::JavaVM;
+
+use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use arrow_array::{RecordBatch, StructArray};
+use arrow_schema::Schema;
+
+use mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess, 
RowGroupReader};
+use mosaic_core::spec::*;
+use mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
+
+fn panic_message(e: &Box<dyn std::any::Any + Send>) -> String {
+    if let Some(s) = e.downcast_ref::<String>() {
+        format!("native panic: {}", s)
+    } else if let Some(s) = e.downcast_ref::<&str>() {
+        format!("native panic: {}", s)
+    } else {
+        "native panic: unknown".to_string()
+    }
+}
+
+struct JniOutputFile {
+    jvm: Arc<JavaVM>,
+    stream_ref: GlobalRef,
+    write_mid: JMethodID,
+    flush_mid: JMethodID,
+    pos: u64,
+    cached_array: Option<GlobalRef>,
+    cached_array_len: usize,
+}
+
+unsafe impl Send for JniOutputFile {}
+
+impl OutputFile for JniOutputFile {
+    fn write(&mut self, data: &[u8]) -> io::Result<()> {
+        let mut env = self
+            .jvm
+            .attach_current_thread()
+            .map_err(|e| io::Error::other(e.to_string()))?;
+
+        let len = data.len() as i32;
+
+        let need_new = match &self.cached_array {
+            Some(_) => data.len() > self.cached_array_len,
+            None => true,
+        };
+
+        if need_new {
+            let byte_array = env
+                .new_byte_array(len)
+                .map_err(|e| io::Error::other(e.to_string()))?;
+            let global = env
+                .new_global_ref(&byte_array)
+                .map_err(|e| io::Error::other(e.to_string()))?;
+            self.cached_array = Some(global);
+            self.cached_array_len = data.len();
+        }
+
+        let raw = self.cached_array.as_ref().unwrap().as_raw();
+        let byte_array = unsafe { JByteArray::from_raw(raw) };
+
+        env.set_byte_array_region(&byte_array, 0, bytemuck_cast(data))
+            .map_err(|e| io::Error::other(e.to_string()))?;
+
+        unsafe {
+            env.call_method_unchecked(
+                &self.stream_ref,
+                self.write_mid,
+                
jni::signature::ReturnType::Primitive(jni::signature::Primitive::Void),
+                &[
+                    jni::sys::jvalue { l: raw },
+                    jni::sys::jvalue { i: 0 },
+                    jni::sys::jvalue { i: len },
+                ],
+            )
+            .map_err(|e| io::Error::other(e.to_string()))?;
+        }
+        #[allow(clippy::forget_non_drop)]
+        std::mem::forget(byte_array);
+        self.pos += data.len() as u64;
+        Ok(())
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        let mut env = self
+            .jvm
+            .attach_current_thread()
+            .map_err(|e| io::Error::other(e.to_string()))?;
+        unsafe {
+            env.call_method_unchecked(
+                &self.stream_ref,
+                self.flush_mid,
+                
jni::signature::ReturnType::Primitive(jni::signature::Primitive::Void),
+                &[],
+            )
+            .map_err(|e| io::Error::other(e.to_string()))?;
+        }
+        Ok(())
+    }
+
+    fn pos(&self) -> u64 {
+        self.pos
+    }
+}
+
+// ======================== JniInputFile ========================
+
+struct JniInputFile {
+    jvm: Arc<JavaVM>,
+    input_file_ref: GlobalRef,
+}
+
+unsafe impl Send for JniInputFile {}
+unsafe impl Sync for JniInputFile {}
+
+impl InputFile for JniInputFile {
+    fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
+        if buf.is_empty() {
+            return Ok(());
+        }
+        let mut env = self
+            .jvm
+            .attach_current_thread()
+            .map_err(|e| io::Error::other(e.to_string()))?;
+
+        let java_buf = env
+            .new_byte_array(buf.len() as i32)
+            .map_err(|e| io::Error::other(e.to_string()))?;
+
+        env.call_method(
+            &self.input_file_ref,
+            "readFully",
+            "(J[BII)V",
+            &[
+                JValue::Long(offset as jlong),
+                JValue::Object(&java_buf),
+                JValue::Int(0),
+                JValue::Int(buf.len() as jint),
+            ],
+        )
+        .map_err(|e| io::Error::other(e.to_string()))?;
+
+        let i8_buf: &mut [i8] =
+            unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut 
i8, buf.len()) };
+        env.get_byte_array_region(&java_buf, 0, i8_buf)
+            .map_err(|e| io::Error::other(e.to_string()))?;
+
+        Ok(())
+    }
+}
+
+struct ReaderHandle {
+    reader: Box<dyn ReaderAccess>,
+    _input_file_ref: Option<GlobalRef>,
+}
+
+fn bytemuck_cast(data: &[u8]) -> &[i8] {
+    unsafe { std::slice::from_raw_parts(data.as_ptr() as *const i8, 
data.len()) }
+}
+
+fn throw(env: &mut JNIEnv, msg: &str) {
+    let _ = env.throw_new("java/lang/RuntimeException", msg);
+}
+
+struct WriterHandle {
+    inner: MosaicWriter<JniOutputFile>,
+    _stream_ref: GlobalRef,
+}
+
+// ======================== Writer ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterOpen(
+    mut env: JNIEnv,
+    _class: JClass,
+    stream: JObject,
+    arrow_schema_addr: jlong,
+    num_buckets: jint,
+    compression: jint,
+    zstd_level: jint,
+    row_group_max_size: jlong,
+    max_dict_total_bytes: jint,
+    max_dict_entries: jint,
+    stats_columns: jni::objects::JIntArray,
+    page_size_threshold: jint,
+) -> jlong {
+    let raw_env = env.get_raw();
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if arrow_schema_addr == 0 {
+            throw(&mut env, "null Arrow schema address");
+            return 0;
+        }
+
+        let ffi_schema = unsafe { &*(arrow_schema_addr as *const 
FFI_ArrowSchema) };
+        let arrow_schema = match Schema::try_from(ffi_schema) {
+            Ok(s) => s,
+            Err(e) => {
+                throw(&mut env, &format!("Arrow schema import failed: {}", e));
+                return 0;
+            }
+        };
+
+        let stream_global = match env.new_global_ref(&stream) {
+            Ok(g) => g,
+            Err(e) => {
+                throw(&mut env, &format!("failed to create global ref: {}", 
e));
+                return 0;
+            }
+        };
+
+        let write_mid = match env.get_method_id("java/io/OutputStream", 
"write", "([BII)V") {
+            Ok(m) => m,
+            Err(e) => {
+                throw(&mut env, &format!("cannot find OutputStream.write: {}", 
e));
+                return 0;
+            }
+        };
+        let flush_mid = match env.get_method_id("java/io/OutputStream", 
"flush", "()V") {
+            Ok(m) => m,
+            Err(e) => {
+                throw(&mut env, &format!("cannot find OutputStream.flush: {}", 
e));
+                return 0;
+            }
+        };
+
+        let jvm = match env.get_java_vm() {
+            Ok(vm) => Arc::new(vm),
+            Err(e) => {
+                throw(&mut env, &format!("cannot get JavaVM: {}", e));
+                return 0;
+            }
+        };
+
+        let jni_stream = JniOutputFile {
+            jvm,
+            stream_ref: stream_global.clone(),
+            write_mid,
+            flush_mid,
+            pos: 0,
+            cached_array: None,
+            cached_array_len: 0,
+        };
+
+        let stats_cols: Vec<usize> = match 
env.get_array_length(&stats_columns) {
+            Ok(len) if len > 0 => {
+                let mut buf = vec![0i32; len as usize];
+                if env
+                    .get_int_array_region(&stats_columns, 0, &mut buf)
+                    .is_ok()
+                {
+                    buf.iter().map(|&v| v as usize).collect()
+                } else {
+                    Vec::new()
+                }
+            }
+            _ => Vec::new(),
+        };
+
+        let buckets = if num_buckets <= 0 {
+            DEFAULT_NUM_BUCKETS
+        } else {
+            num_buckets as usize
+        };
+
+        let opts = WriterOptions {
+            compression: compression as u8,
+            zstd_level,
+            num_buckets: buckets,
+            row_group_max_size: row_group_max_size as u64,
+            max_dict_total_bytes: max_dict_total_bytes as usize,
+            max_dict_entries: max_dict_entries as usize,
+            stats_columns: stats_cols,
+            page_size_threshold: page_size_threshold as usize,
+        };
+
+        let writer = match MosaicWriter::new(jni_stream, &arrow_schema, opts) {
+            Ok(w) => w,
+            Err(e) => {
+                throw(&mut env, &format!("writer open failed: {}", e));
+                return 0;
+            }
+        };
+        let handle = Box::new(WriterHandle {
+            inner: writer,
+            _stream_ref: stream_global,
+        });
+        Box::into_raw(handle) as jlong
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+            throw(&mut env, &panic_message(&e));
+            0
+        }
+    }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterClose(
+    mut env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) {
+    let raw_env = env.get_raw();
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle == 0 {
+            return;
+        }
+        let writer = unsafe { &mut *(handle as *mut WriterHandle) };
+        if let Err(e) = writer.inner.close() {
+            throw(&mut env, &format!("close failed: {}", e));
+        }
+    }));
+    if let Err(e) = result {
+        let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+        throw(&mut env, &panic_message(&e));
+    }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterFree(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) {
+    if handle != 0 {
+        unsafe { drop(Box::from_raw(handle as *mut WriterHandle)) };
+    }
+}
+
+// ======================== Writer.estimatedSize ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterEstimatedSize(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) -> jlong {
+    if handle == 0 {
+        return 0;
+    }
+    let writer = unsafe { &*(handle as *const WriterHandle) };
+    writer.inner.estimated_file_size() as jlong
+}
+
+// ======================== Writer.writeBatch (Arrow C Data Interface) 
========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeWriterWriteBatch(
+    mut env: JNIEnv,
+    _class: JClass,
+    writer_handle: jlong,
+    array_addr: jlong,
+    schema_addr: jlong,
+) {
+    let raw_env = env.get_raw();
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if writer_handle == 0 {
+            throw(&mut env, "null writer handle");
+            return;
+        }
+        if array_addr == 0 || schema_addr == 0 {
+            throw(&mut env, "null ArrowArray or ArrowSchema address");
+            return;
+        }
+        let writer = unsafe { &mut *(writer_handle as *mut WriterHandle) };
+
+        let ffi_array = array_addr as *mut FFI_ArrowArray;
+        let ffi_schema = schema_addr as *mut FFI_ArrowSchema;
+
+        let arr_data = match unsafe {
+            arrow_array::ffi::from_ffi(ptr::read(ffi_array), 
&ptr::read(ffi_schema))
+        } {
+            Ok(d) => d,
+            Err(e) => {
+                throw(&mut env, &format!("Arrow import failed: {}", e));
+                return;
+            }
+        };
+
+        unsafe {
+            ptr::write(ffi_array, std::mem::zeroed());
+            ptr::write(ffi_schema, std::mem::zeroed());
+        }
+
+        let struct_array = StructArray::from(arr_data);
+        let batch = RecordBatch::from(struct_array);
+        if let Err(e) = writer.inner.write_batch(&batch) {
+            throw(&mut env, &format!("write_batch failed: {}", e));
+        }
+    }));
+    if let Err(e) = result {
+        let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+        throw(&mut env, &panic_message(&e));
+    }
+}
+
+// ======================== Reader ========================
+
+struct RowGroupReaderHandle {
+    inner: RowGroupReader,
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderOpen(
+    mut env: JNIEnv,
+    _class: JClass,
+    input_file: JObject,
+    file_length: jlong,
+) -> jlong {
+    let raw_env = env.get_raw();
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        let global = match env.new_global_ref(&input_file) {
+            Ok(g) => g,
+            Err(e) => {
+                throw(&mut env, &format!("failed to create global ref: {}", 
e));
+                return 0;
+            }
+        };
+
+        let length = file_length as u64;
+
+        let jvm = match env.get_java_vm() {
+            Ok(vm) => Arc::new(vm),
+            Err(e) => {
+                throw(&mut env, &format!("cannot get JavaVM: {}", e));
+                return 0;
+            }
+        };
+
+        let input = JniInputFile {
+            jvm,
+            input_file_ref: global.clone(),
+        };
+
+        match MosaicReader::new(input, length) {
+            Ok(reader) => {
+                let rh = ReaderHandle {
+                    reader: Box::new(reader),
+                    _input_file_ref: Some(global),
+                };
+                Box::into_raw(Box::new(rh)) as jlong
+            }
+            Err(e) => {
+                throw(&mut env, &format!("open failed: {}", e));
+                0
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+            throw(&mut env, &panic_message(&e));
+            0
+        }
+    }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderFree(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) {
+    if handle != 0 {
+        unsafe { drop(Box::from_raw(handle as *mut ReaderHandle)) };
+    }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderExportSchema(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    schema_addr: jlong,
+) -> jint {
+    if handle == 0 || schema_addr == 0 {
+        return -1;
+    }
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        let rh = unsafe { &*(handle as *const ReaderHandle) };
+        let reader = &*rh.reader;
+        let schema = reader.schema();
+        let fields: Vec<arrow_schema::Field> = schema
+            .columns
+            .iter()
+            .map(|c| arrow_schema::Field::new(&c.name, c.data_type.clone(), 
c.nullable))
+            .collect();
+        let arrow_schema = Schema::new(fields);
+        match FFI_ArrowSchema::try_from(&arrow_schema) {
+            Ok(ffi_schema) => {
+                unsafe {
+                    ptr::write(schema_addr as *mut FFI_ArrowSchema, 
ffi_schema);
+                }
+                0
+            }
+            Err(_) => -1,
+        }
+    }));
+    result.unwrap_or(-1)
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderNumRowGroups(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) -> jint {
+    if handle == 0 {
+        return 0;
+    }
+    let rh = unsafe { &*(handle as *const ReaderHandle) };
+    let reader = &*rh.reader;
+    reader.num_row_groups() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderOpenRowGroup(
+    mut env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+) -> jlong {
+    let raw_env = env.get_raw();
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle == 0 {
+            throw(&mut env, "null reader handle");
+            return 0;
+        }
+        let rh = unsafe { &*(handle as *const ReaderHandle) };
+        match rh.reader.row_group_reader(rg_index as usize) {
+            Ok(rg) => {
+                let rg_handle = Box::new(RowGroupReaderHandle { inner: rg });
+                Box::into_raw(rg_handle) as jlong
+            }
+            Err(e) => {
+                throw(&mut env, &format!("open row group failed: {}", e));
+                0
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+            throw(&mut env, &panic_message(&e));
+            0
+        }
+    }
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_io_mosaic_NativeLib_nativeReaderOpenRowGroupProjected(
+    mut env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+    columns: JIntArray,
+) -> jlong {
+    let raw_env = env.get_raw();
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle == 0 {
+            throw(&mut env, "null reader handle");
+            return 0;
+        }
+        let rh = unsafe { &*(handle as *const ReaderHandle) };
+        let col_indices: Vec<usize> = match env.get_array_length(&columns) {
+            Ok(len) if len > 0 => {
+                let mut buf = vec![0i32; len as usize];
+                if env.get_int_array_region(&columns, 0, &mut buf).is_ok() {
+                    buf.iter().map(|&v| v as usize).collect()
+                } else {
+                    throw(&mut env, "failed to read columns array");
+                    return 0;
+                }
+            }
+            _ => Vec::new(),
+        };
+        match rh
+            .reader
+            .row_group_reader_projected(rg_index as usize, &col_indices)
+        {
+            Ok(rg) => {
+                let rg_handle = Box::new(RowGroupReaderHandle { inner: rg });
+                Box::into_raw(rg_handle) as jlong
+            }
+            Err(e) => {
+                throw(&mut env, &format!("open row group projected failed: 
{}", e));
+                0
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+            throw(&mut env, &panic_message(&e));
+            0
+        }
+    }
+}
+
+// ======================== RowGroupReader ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeRowGroupReaderNumRows(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) -> jint {
+    if handle == 0 {
+        return 0;
+    }
+    let rg = unsafe { &*(handle as *const RowGroupReaderHandle) };
+    rg.inner.num_rows() as jint
+}
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeRowGroupReaderFree(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+) {
+    if handle != 0 {
+        unsafe { drop(Box::from_raw(handle as *mut RowGroupReaderHandle)) };
+    }
+}
+
+// ======================== Row Group Stats ========================
+
+#[no_mangle]
+pub extern "system" fn Java_io_mosaic_NativeLib_nativeReaderRowGroupNumStats(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+) -> jint {
+    if handle == 0 {
+        return 0;
+    }
+    let rh = unsafe { &*(handle as *const ReaderHandle) };
+    match rh.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s.len() as jint,
+        Err(_) => -1,
+    }
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatColumnIndex(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> jint {
+    if handle == 0 {
+        return -1;
+    }
+    let rh = unsafe { &*(handle as *const ReaderHandle) };
+    let stats = match rh.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(_) => return -1,
+    };
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return -1;
+    }
+    stats[idx].column_index as jint
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatNullCount(
+    _env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> jlong {
+    if handle == 0 {
+        return 0;
+    }
+    let rh = unsafe { &*(handle as *const ReaderHandle) };
+    let stats = match rh.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(_) => return -1,
+    };
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return 0;
+    }
+    stats[idx].null_count as jlong
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatMin<'a>(
+    env: JNIEnv<'a>,
+    _class: JClass<'a>,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> JByteArray<'a> {
+    if handle == 0 {
+        return JByteArray::default();
+    }
+    let rh = unsafe { &*(handle as *const ReaderHandle) };
+    let stats = match rh.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(_) => return JByteArray::default(),
+    };
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return JByteArray::default();
+    }
+    let buf = match &stats[idx].min {
+        Some(v) => v.to_be_bytes(),
+        None => return JByteArray::default(),
+    };
+    if buf.is_empty() {
+        return JByteArray::default();
+    }
+    env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
+#[no_mangle]
+pub extern "system" fn 
Java_io_mosaic_NativeLib_nativeReaderRowGroupStatMax<'a>(
+    env: JNIEnv<'a>,
+    _class: JClass<'a>,
+    handle: jlong,
+    rg_index: jint,
+    stat_index: jint,
+) -> JByteArray<'a> {
+    if handle == 0 {
+        return JByteArray::default();
+    }
+    let rh = unsafe { &*(handle as *const ReaderHandle) };
+    let stats = match rh.reader.row_group_stats(rg_index as usize) {
+        Ok(s) => s,
+        Err(_) => return JByteArray::default(),
+    };
+    let idx = stat_index as usize;
+    if idx >= stats.len() {
+        return JByteArray::default();
+    }
+    let buf = match &stats[idx].max {
+        Some(v) => v.to_be_bytes(),
+        None => return JByteArray::default(),
+    };
+    if buf.is_empty() {
+        return JByteArray::default();
+    }
+    env.byte_array_from_slice(&buf).unwrap_or_default()
+}
+
+// ======================== Columnar Read (Arrow C Data Interface) 
========================
+
+#[no_mangle]
+pub extern "system" fn 
Java_io_mosaic_NativeLib_nativeRowGroupReaderReadColumns(
+    mut env: JNIEnv,
+    _class: JClass,
+    handle: jlong,
+    array_addr: jlong,
+    schema_addr: jlong,
+) -> jint {
+    let raw_env = env.get_raw();
+    let result = panic::catch_unwind(AssertUnwindSafe(|| {
+        if handle == 0 {
+            throw(&mut env, "null handle");
+            return -1;
+        }
+        if array_addr == 0 || schema_addr == 0 {
+            throw(&mut env, "null ArrowArray or ArrowSchema address");
+            return -1;
+        }
+        let rg = unsafe { &mut *(handle as *mut RowGroupReaderHandle) };
+        let batch = match rg.inner.read_columns() {
+            Ok(b) => b,
+            Err(e) => {
+                throw(&mut env, &format!("read_columns failed: {}", e));
+                return -1;
+            }
+        };
+
+        let struct_array = StructArray::from(batch);
+        match arrow_array::ffi::to_ffi(&struct_array.into()) {
+            Ok((ffi_array, ffi_schema)) => {
+                unsafe {
+                    ptr::write(array_addr as *mut FFI_ArrowArray, ffi_array);
+                    ptr::write(schema_addr as *mut FFI_ArrowSchema, 
ffi_schema);
+                }
+                0
+            }
+            Err(e) => {
+                throw(&mut env, &format!("Arrow export failed: {}", e));
+                -1
+            }
+        }
+    }));
+    match result {
+        Ok(val) => val,
+        Err(e) => {
+            let mut env = unsafe { JNIEnv::from_raw(raw_env).unwrap() };
+            throw(&mut env, &panic_message(&e));
+            -1
+        }
+    }
+}

Reply via email to