This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a8e333c467 [format] Add paimon-mosaic module with reader and writer 
(#7917)
a8e333c467 is described below

commit a8e333c4676f2d726dc749cf8dadd2c8b954a3ac
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 3 13:09:24 2026 +0800

    [format] Add paimon-mosaic module with reader and writer (#7917)
    
    See https://paimon.apache.org/docs/mosaic/
    
    Introduces the Mosaic file format integration for Paimon with:
    - MosaicRecordsReader: row-group level predicate filtering using
    statistics, column projection, and correct returnedPosition tracking
    - MosaicRecordsWriter: BundleFormatWriter with writerMetadata() support
    for in-memory stats capture (avoids re-reading files on object stores)
    - MosaicSimpleStatsExtractor: stats extraction from file or
    writerMetadata, with SimpleColStatsCollector integration
    - MosaicObjects: byte[] to Paimon object conversion for all supported
    types
    - Comprehensive test suite (6 test classes covering unit and integration
    tests)
---
 paimon-mosaic/pom.xml                              |  86 +++++
 .../paimon/format/mosaic/MosaicFileFormat.java     | 238 +++++++++++++
 .../format/mosaic/MosaicFileFormatFactory.java     |  38 ++
 .../format/mosaic/MosaicInputFileAdapter.java      |  79 +++++
 .../apache/paimon/format/mosaic/MosaicObjects.java | 103 ++++++
 .../paimon/format/mosaic/MosaicReaderFactory.java  |  60 ++++
 .../paimon/format/mosaic/MosaicRecordsReader.java  | 214 ++++++++++++
 .../paimon/format/mosaic/MosaicRecordsWriter.java  | 199 +++++++++++
 .../format/mosaic/MosaicSimpleStatsExtractor.java  | 180 ++++++++++
 .../paimon/format/mosaic/MosaicWriterFactory.java  |  75 ++++
 .../paimon/format/mosaic/MosaicWriterMetadata.java |  53 +++
 .../org.apache.paimon.format.FileFormatFactory     |  16 +
 .../paimon/format/mosaic/MosaicFileFormatTest.java | 124 +++++++
 .../format/mosaic/MosaicFormatReadWriteTest.java   | 137 ++++++++
 .../paimon/format/mosaic/MosaicObjectsTest.java    | 209 +++++++++++
 .../format/mosaic/MosaicReaderWriterTest.java      | 361 +++++++++++++++++++
 .../mosaic/MosaicSimpleStatsExtractorTest.java     | 212 +++++++++++
 .../format/mosaic/MosaicWriterMetadataTest.java    | 386 +++++++++++++++++++++
 pom.xml                                            |   1 +
 19 files changed, 2771 insertions(+)

diff --git a/paimon-mosaic/pom.xml b/paimon-mosaic/pom.xml
new file mode 100644
index 0000000000..6e0d1eae28
--- /dev/null
+++ b/paimon-mosaic/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-parent</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.5-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-mosaic</artifactId>
+    <name>Paimon : Mosaic Format</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>mosaic</artifactId>
+            <version>0.1.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-arrow</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java
new file mode 100644
index 0000000000..bff850a4e0
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/** Mosaic {@link FileFormat}. */
+public class MosaicFileFormat extends FileFormat {
+
+    public static final ConfigOption<String> STATS_COLUMNS =
+            ConfigOptions.key("mosaic.stats-columns")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "Comma-separated list of column names to collect 
statistics for. "
+                                    + "Empty means no statistics collection.");
+
+    public static final ConfigOption<Integer> NUM_BUCKETS =
+            ConfigOptions.key("mosaic.num-buckets")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Number of column buckets for parallel 
IO.");
+
+    static {
+        System.setProperty("arrow.enable_unsafe_memory_access", "true");
+    }
+
+    private final FileFormatFactory.FormatContext formatContext;
+
+    public MosaicFileFormat(FileFormatFactory.FormatContext formatContext) {
+        super("mosaic");
+        this.formatContext = formatContext;
+    }
+
+    @Override
+    public FormatReaderFactory createReaderFactory(
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> predicates) {
+        return new MosaicReaderFactory(dataSchemaRowType, projectedRowType, 
predicates);
+    }
+
+    @Override
+    public FormatWriterFactory createWriterFactory(RowType type) {
+        return new MosaicWriterFactory(type, formatContext);
+    }
+
+    @Override
+    public void validateDataFields(RowType rowType) {
+        MosaicRowTypeVisitor visitor = new MosaicRowTypeVisitor();
+        for (DataType fieldType : rowType.getFieldTypes()) {
+            fieldType.accept(visitor);
+        }
+    }
+
+    @Override
+    public Optional<SimpleStatsExtractor> createStatsExtractor(
+            RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
+        return Optional.of(new MosaicSimpleStatsExtractor(type, 
statsCollectors));
+    }
+
+    static class MosaicRowTypeVisitor implements DataTypeVisitor<Void> {
+
+        @Override
+        public Void visit(CharType charType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(VarCharType varCharType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(BooleanType booleanType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(BinaryType binaryType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(VarBinaryType varBinaryType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(DecimalType decimalType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(TinyIntType tinyIntType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(SmallIntType smallIntType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(IntType intType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(BigIntType bigIntType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(FloatType floatType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(DoubleType doubleType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(DateType dateType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(TimeType timeType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(TimestampType timestampType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(LocalZonedTimestampType localZonedTimestampType) {
+            return null;
+        }
+
+        @Override
+        public Void visit(VariantType variantType) {
+            throw new UnsupportedOperationException(
+                    "Mosaic file format does not support type VARIANT");
+        }
+
+        @Override
+        public Void visit(BlobType blobType) {
+            throw new UnsupportedOperationException(
+                    "Mosaic file format does not support type BLOB");
+        }
+
+        @Override
+        public Void visit(ArrayType arrayType) {
+            throw new UnsupportedOperationException(
+                    "Mosaic file format does not support type ARRAY");
+        }
+
+        @Override
+        public Void visit(VectorType vectorType) {
+            throw new UnsupportedOperationException(
+                    "Mosaic file format does not support type VECTOR");
+        }
+
+        @Override
+        public Void visit(MultisetType multisetType) {
+            throw new UnsupportedOperationException(
+                    "Mosaic file format does not support type MULTISET");
+        }
+
+        @Override
+        public Void visit(MapType mapType) {
+            throw new UnsupportedOperationException("Mosaic file format does 
not support type MAP");
+        }
+
+        @Override
+        public Void visit(RowType rowType) {
+            throw new UnsupportedOperationException("Mosaic file format does 
not support type ROW");
+        }
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java
new file mode 100644
index 0000000000..782faba3e8
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link MosaicFileFormat}. */
+public class MosaicFileFormatFactory implements FileFormatFactory {
+
+    public static final String IDENTIFIER = "mosaic";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public FileFormat create(FormatContext formatContext) {
+        return new MosaicFileFormat(formatContext);
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java
new file mode 100644
index 0000000000..3a307ea0f2
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
+import org.apache.paimon.mosaic.InputFile;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Adapts Paimon's {@link FileIO} to Mosaic's {@link InputFile} interface.
+ *
+ * <p>Maintains a single {@link SeekableInputStream}. If the stream implements 
{@link
+ * VectoredReadable}, reads use {@link VectoredReadable#preadFully} which is 
thread-safe. Otherwise,
+ * reads are synchronized to protect seek+read sequences.
+ */
+public class MosaicInputFileAdapter implements InputFile, Closeable {
+
+    private final Path path;
+    private final SeekableInputStream in;
+    private final VectoredReadable vectoredReadable;
+
+    public MosaicInputFileAdapter(FileIO fileIO, Path path) throws IOException 
{
+        this.path = path;
+        this.in = fileIO.newInputStream(path);
+        this.vectoredReadable = in instanceof VectoredReadable ? 
(VectoredReadable) in : null;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int 
length) throws IOException {
+        if (vectoredReadable != null) {
+            vectoredReadable.preadFully(position, buffer, offset, length);
+        } else {
+            synchronized (in) {
+                in.seek(position);
+                int remaining = length;
+                int off = offset;
+                while (remaining > 0) {
+                    int read = in.read(buffer, off, remaining);
+                    if (read < 0) {
+                        throw new EOFException(
+                                "Reached end of file while reading "
+                                        + path
+                                        + " at position "
+                                        + position);
+                    }
+                    off += read;
+                    remaining -= read;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java
new file mode 100644
index 0000000000..54d15c43c0
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.TimestampType;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+/** Converts Mosaic's byte[] statistics to Paimon objects. */
+public class MosaicObjects {
+
+    @Nullable
+    public static Object convertStatsValue(byte[] bytes, DataType dataType) {
+        if (bytes == null) {
+            return null;
+        }
+        switch (dataType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return BinaryString.fromBytes(bytes);
+            case BINARY:
+            case VARBINARY:
+                return bytes;
+            default:
+                break;
+        }
+        if (bytes.length == 0) {
+            return null;
+        }
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+        switch (dataType.getTypeRoot()) {
+            case BOOLEAN:
+                return bytes[0] != 0;
+            case TINYINT:
+                return bytes[0];
+            case SMALLINT:
+                return buf.getShort();
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return buf.getInt();
+            case BIGINT:
+                return buf.getLong();
+            case FLOAT:
+                return buf.getFloat();
+            case DOUBLE:
+                return buf.getDouble();
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) dataType;
+                BigInteger unscaled = new BigInteger(bytes);
+                BigDecimal decimal = new BigDecimal(unscaled, 
decimalType.getScale());
+                return Decimal.fromBigDecimal(
+                        decimal, decimalType.getPrecision(), 
decimalType.getScale());
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return convertTimestamp(buf, ((TimestampType) 
dataType).getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return convertTimestamp(buf, ((LocalZonedTimestampType) 
dataType).getPrecision());
+            default:
+                return null;
+        }
+    }
+
+    private static Timestamp convertTimestamp(ByteBuffer buf, int precision) {
+        if (precision <= 3) {
+            return Timestamp.fromEpochMillis(buf.getLong());
+        } else if (precision <= 6) {
+            return Timestamp.fromMicros(buf.getLong());
+        } else {
+            // precision 7-9: 12 bytes = i64 millis (BE) + i32 nanos_of_milli 
(BE)
+            long millis = buf.getLong();
+            int nanosOfMilli = buf.getInt();
+            return Timestamp.fromEpochMillis(millis, nanosOfMilli);
+        }
+    }
+
+    private MosaicObjects() {}
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java
new file mode 100644
index 0000000000..5b39c867e2
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** A factory to create Mosaic reader. */
+public class MosaicReaderFactory implements FormatReaderFactory {
+
+    private final RowType dataSchemaRowType;
+    private final RowType projectedRowType;
+    @Nullable private final List<Predicate> predicates;
+
+    public MosaicReaderFactory(
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> predicates) {
+        this.dataSchemaRowType = dataSchemaRowType;
+        this.projectedRowType = projectedRowType;
+        this.predicates = predicates;
+    }
+
+    @Override
+    public FileRecordReader<InternalRow> createReader(Context context) throws 
IOException {
+        MosaicInputFileAdapter inputFile =
+                new MosaicInputFileAdapter(context.fileIO(), 
context.filePath());
+        return new MosaicRecordsReader(
+                inputFile,
+                context.fileSize(),
+                dataSchemaRowType,
+                projectedRowType,
+                predicates,
+                context.filePath());
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
new file mode 100644
index 0000000000..24cdcbf05c
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.arrow.reader.ArrowBatchReader;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.mosaic.ColumnStatistics;
+import org.apache.paimon.mosaic.MosaicReader;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.format.mosaic.MosaicObjects.convertStatsValue;
+
+/** File reader for Mosaic format. */
+public class MosaicRecordsReader implements FileRecordReader<InternalRow> {
+
+    private final MosaicInputFileAdapter inputFileAdapter;
+    private final MosaicReader reader;
+    private final ArrowBatchReader arrowBatchReader;
+    private final Path filePath;
+    private final BufferAllocator allocator;
+    private final int numRowGroups;
+    private final RowType dataSchemaRowType;
+    @Nullable private final List<Predicate> predicates;
+
+    private int currentRowGroup;
+    private long returnedPosition = -1;
+    private VectorSchemaRoot currentVsr;
+
+    public MosaicRecordsReader(
+            MosaicInputFileAdapter inputFileAdapter,
+            long fileSize,
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> predicates,
+            Path filePath) {
+        this.filePath = filePath;
+        this.inputFileAdapter = inputFileAdapter;
+        this.dataSchemaRowType = dataSchemaRowType;
+        this.predicates = predicates;
+        this.allocator = new RootAllocator();
+
+        try {
+            this.reader = MosaicReader.open(inputFileAdapter, fileSize, 
allocator);
+        } catch (Exception e) {
+            allocator.close();
+            throw e;
+        }
+
+        Schema fileSchema = reader.getSchema();
+        Set<String> fileColumnNames = new HashSet<>();
+        for (Field field : fileSchema.getFields()) {
+            fileColumnNames.add(field.getName());
+        }
+        List<String> projectedNames = projectedRowType.getFieldNames();
+        List<String> existingColumns = new ArrayList<>();
+        for (String name : projectedNames) {
+            if (fileColumnNames.contains(name)) {
+                existingColumns.add(name);
+            }
+        }
+        if (!existingColumns.isEmpty()) {
+            reader.project(existingColumns.toArray(new String[0]));
+        }
+
+        this.numRowGroups = reader.numRowGroups();
+        this.currentRowGroup = 0;
+        this.arrowBatchReader = new ArrowBatchReader(projectedRowType, true);
+    }
+
+    @Nullable
+    @Override
+    public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        while (currentRowGroup < numRowGroups) {
+            int numRows = reader.rowGroupNumRows(currentRowGroup);
+            if (!matchesRowGroup(currentRowGroup, numRows)) {
+                returnedPosition += numRows;
+                currentRowGroup++;
+                continue;
+            }
+
+            releaseCurrentVsr();
+
+            VectorSchemaRoot vsr = reader.readRowGroup(currentRowGroup, 
allocator);
+            currentRowGroup++;
+            this.currentVsr = vsr;
+
+            Iterator<InternalRow> rows = 
arrowBatchReader.readBatch(vsr).iterator();
+
+            return new FileRecordIterator<InternalRow>() {
+                @Override
+                public long returnedPosition() {
+                    return returnedPosition;
+                }
+
+                @Override
+                public Path filePath() {
+                    return filePath;
+                }
+
+                @Nullable
+                @Override
+                public InternalRow next() {
+                    if (rows.hasNext()) {
+                        returnedPosition++;
+                        return rows.next();
+                    }
+                    return null;
+                }
+
+                @Override
+                public void releaseBatch() {
+                    releaseCurrentVsr();
+                }
+            };
+        }
+        return null;
+    }
+
+    private boolean matchesRowGroup(int rowGroupIndex, long rowCount) {
+        if (predicates == null || predicates.isEmpty()) {
+            return true;
+        }
+
+        Map<String, ColumnStatistics> statsMap = 
reader.getRowGroupStatistics(rowGroupIndex);
+        if (statsMap.isEmpty()) {
+            return true;
+        }
+
+        int fieldCount = dataSchemaRowType.getFieldCount();
+        GenericRow minValues = new GenericRow(fieldCount);
+        GenericRow maxValues = new GenericRow(fieldCount);
+        long[] nullCounts = new long[fieldCount];
+
+        List<DataField> fields = dataSchemaRowType.getFields();
+        for (int i = 0; i < fieldCount; i++) {
+            String colName = fields.get(i).name();
+            ColumnStatistics stats = statsMap.get(colName);
+            if (stats == null) {
+                continue;
+            }
+
+            nullCounts[i] = stats.getNullCount();
+            if (stats.hasMinMax()) {
+                DataType dataType = fields.get(i).type();
+                Object min = convertStatsValue(stats.getMin(), dataType);
+                Object max = convertStatsValue(stats.getMax(), dataType);
+                minValues.setField(i, min);
+                maxValues.setField(i, max);
+            }
+        }
+
+        for (Predicate predicate : predicates) {
+            if (!predicate.test(rowCount, minValues, maxValues, new 
GenericArray(nullCounts))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void releaseCurrentVsr() {
+        if (currentVsr != null) {
+            currentVsr.close();
+            currentVsr = null;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        releaseCurrentVsr();
+        reader.close();
+        allocator.close();
+        inputFileAdapter.close();
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
new file mode 100644
index 0000000000..fdef0eb365
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.arrow.ArrowBundleRecords;
+import org.apache.paimon.arrow.vector.ArrowFormatWriter;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BundleFormatWriter;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.mosaic.ColumnStatistics;
+import org.apache.paimon.mosaic.MosaicWriter;
+import org.apache.paimon.mosaic.WriterOptions;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Mosaic records writer. */
+public class MosaicRecordsWriter implements BundleFormatWriter {
+
+    private final ArrowFormatWriter arrowFormatWriter;
+    private final MosaicWriter nativeWriter;
+    private final BufferAllocator allocator;
+    private final List<String> statsColumnNames;
+    @Nullable private MosaicWriterMetadata metadata;
+
+    public MosaicRecordsWriter(
+            OutputStream outputStream,
+            RowType rowType,
+            FileFormatFactory.FormatContext formatContext,
+            List<String> statsColumnNames,
+            @Nullable Integer numBuckets) {
+        this.statsColumnNames = statsColumnNames;
+        this.allocator = new RootAllocator();
+
+        int writeBatchSize = formatContext.writeBatchSize();
+        long writeBatchMemory = formatContext.writeBatchMemory().getBytes();
+
+        this.arrowFormatWriter =
+                new ArrowFormatWriter(rowType, writeBatchSize, true, 
allocator, writeBatchMemory);
+
+        WriterOptions options = new 
WriterOptions().zstdLevel(formatContext.zstdLevel());
+        if (numBuckets != null) {
+            options = options.numBuckets(numBuckets);
+        }
+        MemorySize blockSize = formatContext.blockSize();
+        if (blockSize != null) {
+            options = options.rowGroupMaxSize(blockSize.getBytes());
+        }
+        if (!statsColumnNames.isEmpty()) {
+            options.statsColumns(statsColumnNames.toArray(new String[0]));
+        }
+
+        Schema arrowSchema = 
arrowFormatWriter.getVectorSchemaRoot().getSchema();
+        this.nativeWriter = new MosaicWriter(outputStream, arrowSchema, 
options, allocator);
+    }
+
+    @Override
+    public void addElement(InternalRow internalRow) {
+        if (!arrowFormatWriter.write(internalRow)) {
+            flush();
+            if (!arrowFormatWriter.write(internalRow)) {
+                throw new RuntimeException("Failed to write row to Mosaic 
file");
+            }
+        }
+    }
+
+    @Override
+    public void writeBundle(BundleRecords bundleRecords) {
+        if (bundleRecords instanceof ArrowBundleRecords) {
+            flush();
+            nativeWriter.write(((ArrowBundleRecords) 
bundleRecords).getVectorSchemaRoot());
+        } else {
+            for (InternalRow row : bundleRecords) {
+                addElement(row);
+            }
+        }
+    }
+
+    @Override
+    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) {
+        if (!suggestedCheck) {
+            return false;
+        }
+        return nativeWriter.estimatedFileSize() >= targetSize;
+    }
+
+    @Override
+    public void close() throws IOException {
+        Throwable throwable = null;
+
+        try {
+            flush();
+        } catch (Throwable t) {
+            throwable = t;
+        }
+
+        try {
+            nativeWriter.close();
+        } catch (Throwable t) {
+            throwable = addSuppressed(throwable, t);
+        }
+
+        try {
+            collectMetadata();
+        } catch (Throwable t) {
+            throwable = addSuppressed(throwable, t);
+        }
+
+        try {
+            arrowFormatWriter.close();
+        } catch (Throwable t) {
+            throwable = addSuppressed(throwable, t);
+        }
+
+        try {
+            allocator.close();
+        } catch (Throwable t) {
+            throwable = addSuppressed(throwable, t);
+        }
+
+        if (throwable != null) {
+            rethrow(throwable);
+        }
+    }
+
+    @Nullable
+    @Override
+    public Object writerMetadata() {
+        return metadata;
+    }
+
+    private void collectMetadata() {
+        int numRowGroups = nativeWriter.numRowGroups();
+        List<Map<String, ColumnStatistics>> allStats = new 
ArrayList<>(numRowGroups);
+        for (int i = 0; i < numRowGroups; i++) {
+            allStats.add(nativeWriter.getRowGroupStatistics(i));
+        }
+        this.metadata = new MosaicWriterMetadata(numRowGroups, allStats, 
statsColumnNames);
+    }
+
+    private void flush() {
+        arrowFormatWriter.flush();
+        if (!arrowFormatWriter.empty()) {
+            VectorSchemaRoot vsr = arrowFormatWriter.getVectorSchemaRoot();
+            nativeWriter.write(vsr);
+        }
+        arrowFormatWriter.reset();
+    }
+
+    private static Throwable addSuppressed(Throwable throwable, Throwable 
suppressed) {
+        if (throwable == null) {
+            return suppressed;
+        }
+        throwable.addSuppressed(suppressed);
+        return throwable;
+    }
+
+    private static void rethrow(Throwable throwable) throws IOException {
+        if (throwable instanceof IOException) {
+            throw (IOException) throwable;
+        }
+        if (throwable instanceof RuntimeException) {
+            throw (RuntimeException) throwable;
+        }
+        if (throwable instanceof Error) {
+            throw (Error) throwable;
+        }
+        throw new IOException(throwable);
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java
new file mode 100644
index 0000000000..f426b27cfa
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.mosaic.ColumnStatistics;
+import org.apache.paimon.mosaic.MosaicReader;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.format.mosaic.MosaicObjects.convertStatsValue;
+
+/** Extracts statistics from Mosaic file metadata. */
+public class MosaicSimpleStatsExtractor implements SimpleStatsExtractor {
+
+    private final RowType rowType;
+    private final SimpleColStatsCollector.Factory[] statsCollectors;
+
+    public MosaicSimpleStatsExtractor(
+            RowType rowType, SimpleColStatsCollector.Factory[] 
statsCollectors) {
+        this.rowType = rowType;
+        this.statsCollectors = statsCollectors;
+    }
+
+    @Override
+    public SimpleColStats[] extract(FileIO fileIO, Path path, long length) {
+        try (MosaicInputFileAdapter inputFile = new 
MosaicInputFileAdapter(fileIO, path);
+                BufferAllocator allocator = new RootAllocator();
+                MosaicReader reader = MosaicReader.open(inputFile, length, 
allocator)) {
+            return extractFromStats(reader.numRowGroups(), 
reader::getRowGroupStatistics, null);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to extract stats from " + path, 
e);
+        }
+    }
+
+    @Override
+    public SimpleColStats[] extract(
+            FileIO fileIO, Path path, long length, @Nullable Object 
writerMetadata) {
+        if (writerMetadata instanceof MosaicWriterMetadata) {
+            MosaicWriterMetadata meta = (MosaicWriterMetadata) writerMetadata;
+            Set<Integer> statsFieldIndices = 
resolveStatsFieldIndices(meta.statsColumnNames());
+            return extractFromStats(
+                    meta.numRowGroups(), meta::getRowGroupStatistics, 
statsFieldIndices);
+        }
+        return extract(fileIO, path, length);
+    }
+
+    @Override
+    public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(
+            FileIO fileIO, Path path, long length) {
+        try (MosaicInputFileAdapter inputFile = new 
MosaicInputFileAdapter(fileIO, path);
+                BufferAllocator allocator = new RootAllocator();
+                MosaicReader reader = MosaicReader.open(inputFile, length, 
allocator)) {
+            int numRowGroups = reader.numRowGroups();
+            SimpleColStats[] stats =
+                    extractFromStats(numRowGroups, 
reader::getRowGroupStatistics, null);
+            long rowCount = 0;
+            for (int rg = 0; rg < numRowGroups; rg++) {
+                rowCount += reader.rowGroupNumRows(rg);
+            }
+            return Pair.of(stats, new FileInfo(rowCount));
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to extract stats from " + path, 
e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private SimpleColStats[] extractFromStats(
+            int numRowGroups,
+            RowGroupStatsProvider statsProvider,
+            @Nullable Set<Integer> statsFieldIndices) {
+        int fieldCount = rowType.getFieldCount();
+        List<String> fieldNames = rowType.getFieldNames();
+        Object[] minValues = new Object[fieldCount];
+        Object[] maxValues = new Object[fieldCount];
+        long[] nullCounts = new long[fieldCount];
+        Set<Integer> seenColumns = new HashSet<>();
+
+        for (int rg = 0; rg < numRowGroups; rg++) {
+            Map<String, ColumnStatistics> statsMap = 
statsProvider.getRowGroupStatistics(rg);
+            for (Map.Entry<String, ColumnStatistics> entry : 
statsMap.entrySet()) {
+                int colIdx = fieldNames.indexOf(entry.getKey());
+                if (colIdx < 0) {
+                    continue;
+                }
+
+                ColumnStatistics stat = entry.getValue();
+                seenColumns.add(colIdx);
+                nullCounts[colIdx] += stat.getNullCount();
+
+                if (stat.hasMinMax()) {
+                    DataType dataType = rowType.getFields().get(colIdx).type();
+                    Object min = convertStatsValue(stat.getMin(), dataType);
+                    Object max = convertStatsValue(stat.getMax(), dataType);
+                    if (min instanceof Comparable) {
+                        if (minValues[colIdx] == null) {
+                            minValues[colIdx] = min;
+                        } else {
+                            if (((Comparable<Object>) 
min).compareTo(minValues[colIdx]) < 0) {
+                                minValues[colIdx] = min;
+                            }
+                        }
+                    }
+                    if (max instanceof Comparable) {
+                        if (maxValues[colIdx] == null) {
+                            maxValues[colIdx] = max;
+                        } else {
+                            if (((Comparable<Object>) 
max).compareTo(maxValues[colIdx]) > 0) {
+                                maxValues[colIdx] = max;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        Set<Integer> trackedColumns = statsFieldIndices != null ? 
statsFieldIndices : seenColumns;
+        SimpleColStatsCollector[] collectors = 
SimpleColStatsCollector.create(statsCollectors);
+        SimpleColStats[] result = new SimpleColStats[fieldCount];
+        for (int i = 0; i < fieldCount; i++) {
+            if (!trackedColumns.contains(i) || !seenColumns.contains(i)) {
+                result[i] = collectors[i].convert(new SimpleColStats(null, 
null, null));
+            } else {
+                SimpleColStats fieldStats =
+                        new SimpleColStats(minValues[i], maxValues[i], 
nullCounts[i]);
+                result[i] = collectors[i].convert(fieldStats);
+            }
+        }
+        return result;
+    }
+
+    private Set<Integer> resolveStatsFieldIndices(List<String> 
statsColumnNames) {
+        Set<Integer> indices = new HashSet<>();
+        List<String> fieldNames = rowType.getFieldNames();
+        for (String name : statsColumnNames) {
+            int idx = fieldNames.indexOf(name);
+            if (idx >= 0) {
+                indices.add(idx);
+            }
+        }
+        return indices;
+    }
+
+    @FunctionalInterface
+    private interface RowGroupStatsProvider {
+        Map<String, ColumnStatistics> getRowGroupStatistics(int rowGroupIndex);
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java
new file mode 100644
index 0000000000..ca67647f8c
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+/** A factory to create Mosaic {@link FormatWriter}. */
+public class MosaicWriterFactory implements FormatWriterFactory {
+
+    private final RowType rowType;
+    private final FileFormatFactory.FormatContext formatContext;
+    private final List<String> statsColumnNames;
+    private final @Nullable Integer numBuckets;
+
+    public MosaicWriterFactory(RowType rowType, 
FileFormatFactory.FormatContext formatContext) {
+        this.rowType = rowType;
+        this.formatContext = formatContext;
+        String statsColumnsValue = 
formatContext.options().get(MosaicFileFormat.STATS_COLUMNS);
+        if (statsColumnsValue == null || statsColumnsValue.trim().isEmpty()) {
+            this.statsColumnNames = new ArrayList<>();
+        } else {
+            this.statsColumnNames =
+                    Arrays.stream(statsColumnsValue.split(","))
+                            .map(String::trim)
+                            .filter(s -> !s.isEmpty())
+                            .collect(Collectors.toList());
+        }
+        this.numBuckets = 
formatContext.options().get(MosaicFileFormat.NUM_BUCKETS);
+    }
+
+    @Override
+    public FormatWriter create(PositionOutputStream out, String compression) {
+        validateCompression(compression);
+        return new MosaicRecordsWriter(out, rowType, formatContext, 
statsColumnNames, numBuckets);
+    }
+
+    private static void validateCompression(String compression) {
+        if (compression == null) {
+            return;
+        }
+        String normalized = compression.toLowerCase(Locale.ROOT);
+        if (!normalized.equals("zstd")) {
+            throw new UnsupportedOperationException(
+                    "Mosaic format only supports zstd compression, but got: " 
+ compression);
+        }
+    }
+}
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java
new file mode 100644
index 0000000000..cd3149fd44
--- /dev/null
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.mosaic.ColumnStatistics;
+
+import java.util.List;
+import java.util.Map;
+
+/** In-memory metadata captured from MosaicWriter after close. */
+public class MosaicWriterMetadata {
+
+    private final int numRowGroups;
+    private final List<Map<String, ColumnStatistics>> rowGroupStats;
+    private final List<String> statsColumnNames;
+
+    public MosaicWriterMetadata(
+            int numRowGroups,
+            List<Map<String, ColumnStatistics>> rowGroupStats,
+            List<String> statsColumnNames) {
+        this.numRowGroups = numRowGroups;
+        this.rowGroupStats = rowGroupStats;
+        this.statsColumnNames = statsColumnNames;
+    }
+
+    public int numRowGroups() {
+        return numRowGroups;
+    }
+
+    public Map<String, ColumnStatistics> getRowGroupStatistics(int 
rowGroupIndex) {
+        return rowGroupStats.get(rowGroupIndex);
+    }
+
+    public List<String> statsColumnNames() {
+        return statsColumnNames;
+    }
+}
diff --git 
a/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
 
b/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
new file mode 100644
index 0000000000..bc955c4935
--- /dev/null
+++ 
b/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.format.mosaic.MosaicFileFormatFactory
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java
new file mode 100644
index 0000000000..8e53164e86
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit tests for {@link MosaicFileFormat} and {@link 
MosaicFileFormatFactory}. */
+class MosaicFileFormatTest {
+
+    @Test
+    void testFactoryIdentifier() {
+        MosaicFileFormatFactory factory = new MosaicFileFormatFactory();
+        assertThat(factory.identifier()).isEqualTo("mosaic");
+    }
+
+    @Test
+    void testFactoryCreate() {
+        MosaicFileFormatFactory factory = new MosaicFileFormatFactory();
+        FileFormatFactory.FormatContext context =
+                new FileFormatFactory.FormatContext(new Options(), 1024, 1024);
+        
assertThat(factory.create(context)).isInstanceOf(MosaicFileFormat.class);
+    }
+
+    @Test
+    void testCreateReaderFactory() {
+        MosaicFileFormat format = createFormat();
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        FormatReaderFactory readerFactory =
+                format.createReaderFactory(rowType, rowType, new 
ArrayList<>());
+        assertThat(readerFactory).isInstanceOf(MosaicReaderFactory.class);
+    }
+
+    @Test
+    void testCreateWriterFactory() {
+        MosaicFileFormat format = createFormat();
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        assertThat(writerFactory).isInstanceOf(MosaicWriterFactory.class);
+    }
+
+    @Test
+    void testValidateDataFieldsSupported() {
+        MosaicFileFormat format = createFormat();
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.STRING(),
+                        DataTypes.DOUBLE(),
+                        DataTypes.FLOAT(),
+                        DataTypes.BOOLEAN(),
+                        DataTypes.DATE(),
+                        DataTypes.TIMESTAMP(3),
+                        DataTypes.DECIMAL(10, 2),
+                        DataTypes.BYTES());
+        format.validateDataFields(rowType);
+    }
+
+    @Test
+    void testValidateDataFieldsMapUnsupported() {
+        MosaicFileFormat format = createFormat();
+        RowType rowType = DataTypes.ROW(DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()));
+        assertThatThrownBy(() -> format.validateDataFields(rowType))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("MAP");
+    }
+
+    @Test
+    void testValidateDataFieldsMultisetUnsupported() {
+        MosaicFileFormat format = createFormat();
+        RowType rowType = 
DataTypes.ROW(DataTypes.MULTISET(DataTypes.STRING()));
+        assertThatThrownBy(() -> format.validateDataFields(rowType))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("MULTISET");
+    }
+
+    @Test
+    void testCreateStatsExtractor() {
+        MosaicFileFormat format = createFormat();
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        assertThat(
+                        format.createStatsExtractor(
+                                rowType,
+                                new 
org.apache.paimon.statistics.SimpleColStatsCollector.Factory[] {
+                                    
org.apache.paimon.statistics.SimpleColStatsCollector.from(
+                                            "full"),
+                                    
org.apache.paimon.statistics.SimpleColStatsCollector.from(
+                                            "full")
+                                }))
+                .isPresent();
+    }
+
+    private static MosaicFileFormat createFormat() {
+        return new MosaicFileFormat(new FileFormatFactory.FormatContext(new 
Options(), 1024, 1024));
+    }
+}
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java
new file mode 100644
index 0000000000..41f632b3ee
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Round-trip read/write tests for Mosaic format. */
+class MosaicFormatReadWriteTest extends FormatReadWriteTest {
+
+    MosaicFormatReadWriteTest() {
+        super("mosaic");
+    }
+
+    @BeforeAll
+    static void checkNativeLibrary() {
+        assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+    }
+
+    @Override
+    protected FileFormat fileFormat() {
+        return new MosaicFileFormat(new FileFormatFactory.FormatContext(new 
Options(), 1024, 1024));
+    }
+
+    @Override
+    public String compression() {
+        return "zstd";
+    }
+
+    @Override
+    public boolean supportNestedReadPruning() {
+        return false;
+    }
+
+    @Override
+    protected RowType rowTypeForFullTypesTest() {
+        return RowType.builder()
+                .field("f_int", DataTypes.INT().notNull())
+                .field("f_string", DataTypes.STRING())
+                .field("f_double", DataTypes.DOUBLE().notNull())
+                .field("f_boolean", DataTypes.BOOLEAN())
+                .field("f_tinyint", DataTypes.TINYINT())
+                .field("f_smallint", DataTypes.SMALLINT())
+                .field("f_bigint", DataTypes.BIGINT())
+                .field("f_float", DataTypes.FLOAT())
+                .field("f_binary", DataTypes.BYTES())
+                .field("f_date", DataTypes.DATE())
+                .field("f_timestamp3", DataTypes.TIMESTAMP(3))
+                .field("f_timestamp6", DataTypes.TIMESTAMP(6))
+                .field("f_decimal_5_2", DataTypes.DECIMAL(5, 2))
+                .field("f_decimal_20_0", DataTypes.DECIMAL(20, 0))
+                .build();
+    }
+
+    @Override
+    protected GenericRow expectedRowForFullTypesTest() {
+        return GenericRow.of(
+                42,
+                BinaryString.fromString("hello mosaic"),
+                3.14d,
+                true,
+                (byte) 7,
+                (short) 256,
+                9876543210L,
+                1.5f,
+                new byte[] {1, 2, 3},
+                18000,
+                Timestamp.fromEpochMillis(1700000000000L),
+                Timestamp.fromMicros(1700000000000000L),
+                Decimal.fromBigDecimal(new BigDecimal("123.45"), 5, 2),
+                Decimal.fromBigDecimal(new BigDecimal("12345678901234567890"), 
20, 0));
+    }
+
+    @Override
+    protected void validateFullTypesResult(InternalRow actual, InternalRow 
expected) {
+        for (int i = 0; i < 14; i++) {
+            if (expected.isNullAt(i)) {
+                assertThat(actual.isNullAt(i)).isTrue();
+            }
+        }
+        assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0));
+        assertThat(actual.getString(1)).isEqualTo(expected.getString(1));
+        assertThat(actual.getDouble(2)).isEqualTo(expected.getDouble(2));
+        assertThat(actual.getBoolean(3)).isEqualTo(expected.getBoolean(3));
+        assertThat(actual.getByte(4)).isEqualTo(expected.getByte(4));
+        assertThat(actual.getShort(5)).isEqualTo(expected.getShort(5));
+        assertThat(actual.getLong(6)).isEqualTo(expected.getLong(6));
+        assertThat(actual.getFloat(7)).isEqualTo(expected.getFloat(7));
+        assertThat(actual.getBinary(8)).isEqualTo(expected.getBinary(8));
+        assertThat(actual.getInt(9)).isEqualTo(expected.getInt(9));
+        assertThat(actual.getTimestamp(10, 
3)).isEqualTo(expected.getTimestamp(10, 3));
+        assertThat(actual.getTimestamp(11, 
6)).isEqualTo(expected.getTimestamp(11, 6));
+        assertThat(actual.getDecimal(12, 5, 
2)).isEqualTo(expected.getDecimal(12, 5, 2));
+        assertThat(actual.getDecimal(13, 20, 
0)).isEqualTo(expected.getDecimal(13, 20, 0));
+    }
+
+    private static boolean isNativeAvailable() {
+        try {
+            Class.forName("org.apache.paimon.mosaic.NativeLib");
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+}
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java
new file mode 100644
index 0000000000..e05ed1709c
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link MosaicObjects}. */
+class MosaicObjectsTest {
+
+    @Test
+    void testNullBytes() {
+        assertThat(MosaicObjects.convertStatsValue(null, 
DataTypes.INT())).isNull();
+    }
+
+    @Test
+    void testEmptyBytes() {
+        assertThat(MosaicObjects.convertStatsValue(new byte[0], 
DataTypes.INT())).isNull();
+    }
+
+    @Test
+    void testBoolean() {
+        assertThat(MosaicObjects.convertStatsValue(new byte[] {1}, 
DataTypes.BOOLEAN()))
+                .isEqualTo(true);
+        assertThat(MosaicObjects.convertStatsValue(new byte[] {0}, 
DataTypes.BOOLEAN()))
+                .isEqualTo(false);
+    }
+
+    @Test
+    void testTinyInt() {
+        assertThat(MosaicObjects.convertStatsValue(new byte[] {42}, 
DataTypes.TINYINT()))
+                .isEqualTo((byte) 42);
+        assertThat(MosaicObjects.convertStatsValue(new byte[] {(byte) -1}, 
DataTypes.TINYINT()))
+                .isEqualTo((byte) -1);
+    }
+
+    @Test
+    void testSmallInt() {
+        byte[] bytes = ByteBuffer.allocate(2).putShort((short) 1234).array();
+        assertThat(MosaicObjects.convertStatsValue(bytes, 
DataTypes.SMALLINT()))
+                .isEqualTo((short) 1234);
+    }
+
+    @Test
+    void testInt() {
+        byte[] bytes = ByteBuffer.allocate(4).putInt(123456).array();
+        assertThat(MosaicObjects.convertStatsValue(bytes, 
DataTypes.INT())).isEqualTo(123456);
+    }
+
+    @Test
+    void testIntNegative() {
+        byte[] bytes = ByteBuffer.allocate(4).putInt(-999).array();
+        assertThat(MosaicObjects.convertStatsValue(bytes, 
DataTypes.INT())).isEqualTo(-999);
+    }
+
+    @Test
+    void testBigInt() {
+        byte[] bytes = ByteBuffer.allocate(8).putLong(9876543210L).array();
+        assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.BIGINT()))
+                .isEqualTo(9876543210L);
+    }
+
+    @Test
+    void testFloat() {
+        byte[] bytes = ByteBuffer.allocate(4).putFloat(3.14f).array();
+        assertThat(MosaicObjects.convertStatsValue(bytes, 
DataTypes.FLOAT())).isEqualTo(3.14f);
+    }
+
+    @Test
+    void testDouble() {
+        byte[] bytes = ByteBuffer.allocate(8).putDouble(2.718281828).array();
+        assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.DOUBLE()))
+                .isEqualTo(2.718281828);
+    }
+
+    @Test
+    void testVarChar() {
+        byte[] bytes = "hello".getBytes();
+        assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.STRING()))
+                .isEqualTo(BinaryString.fromString("hello"));
+    }
+
+    @Test
+    void testBinary() {
+        byte[] bytes = new byte[] {1, 2, 3, 4, 5};
+        assertThat(MosaicObjects.convertStatsValue(bytes, 
DataTypes.BYTES())).isEqualTo(bytes);
+    }
+
+    @Test
+    void testDate() {
+        byte[] bytes = ByteBuffer.allocate(4).putInt(18000).array();
+        assertThat(MosaicObjects.convertStatsValue(bytes, 
DataTypes.DATE())).isEqualTo(18000);
+    }
+
+    @Test
+    void testTimestampMillis() {
+        long millis = 1700000000000L;
+        byte[] bytes = ByteBuffer.allocate(8).putLong(millis).array();
+        Object result = MosaicObjects.convertStatsValue(bytes, 
DataTypes.TIMESTAMP(3));
+        assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis));
+    }
+
+    @Test
+    void testTimestampMicros() {
+        long micros = 1700000000000000L;
+        byte[] bytes = ByteBuffer.allocate(8).putLong(micros).array();
+        Object result = MosaicObjects.convertStatsValue(bytes, 
DataTypes.TIMESTAMP(6));
+        assertThat(result).isEqualTo(Timestamp.fromMicros(micros));
+    }
+
+    @Test
+    void testDecimal() {
+        // 1000 in big-endian two's complement = 0x03E8
+        byte[] beBytes = new byte[] {0x03, (byte) 0xE8};
+        Object result = MosaicObjects.convertStatsValue(beBytes, 
DataTypes.DECIMAL(10, 2));
+        assertThat(result).isInstanceOf(Decimal.class);
+        Decimal decimal = (Decimal) result;
+        assertThat(decimal.toBigDecimal().intValue()).isEqualTo(10);
+    }
+
+    @Test
+    void testTimestampNanos() {
+        long millis = 1700000000123L;
+        int nanosOfMilli = 456789;
+        byte[] bytes = 
ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array();
+        Object result = MosaicObjects.convertStatsValue(bytes, 
DataTypes.TIMESTAMP(9));
+        assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis, 
nanosOfMilli));
+    }
+
+    @Test
+    void testTimestampNanosPrecision7() {
+        long millis = 1700000000000L;
+        int nanosOfMilli = 100000;
+        byte[] bytes = 
ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array();
+        Object result = MosaicObjects.convertStatsValue(bytes, 
DataTypes.TIMESTAMP(7));
+        assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis, 
nanosOfMilli));
+    }
+
+    @Test
+    void testTimestampWithLocalTimeZoneMillis() {
+        long millis = 1700000000000L;
+        byte[] bytes = ByteBuffer.allocate(8).putLong(millis).array();
+        Object result =
+                MosaicObjects.convertStatsValue(bytes, 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
+        assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis));
+    }
+
+    @Test
+    void testTimestampWithLocalTimeZoneMicros() {
+        long micros = 1700000000000000L;
+        byte[] bytes = ByteBuffer.allocate(8).putLong(micros).array();
+        Object result =
+                MosaicObjects.convertStatsValue(bytes, 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6));
+        assertThat(result).isEqualTo(Timestamp.fromMicros(micros));
+    }
+
+    @Test
+    void testTimestampWithLocalTimeZoneNanos() {
+        long millis = 1700000000123L;
+        int nanosOfMilli = 456789;
+        byte[] bytes = 
ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array();
+        Object result =
+                MosaicObjects.convertStatsValue(bytes, 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9));
+        assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis, 
nanosOfMilli));
+    }
+
+    @Test
+    void testEmptyStringVarChar() {
+        Object result = MosaicObjects.convertStatsValue(new byte[0], 
DataTypes.STRING());
+        assertThat(result).isEqualTo(BinaryString.fromString(""));
+    }
+
+    @Test
+    void testEmptyBinary() {
+        Object result = MosaicObjects.convertStatsValue(new byte[0], 
DataTypes.BYTES());
+        assertThat(result).isEqualTo(new byte[0]);
+    }
+
+    @Test
+    void testUnsupportedTypeReturnsNull() {
+        byte[] bytes = new byte[] {1, 2, 3};
+        assertThat(MosaicObjects.convertStatsValue(bytes, 
DataTypes.ARRAY(DataTypes.INT())))
+                .isNull();
+    }
+}
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java
new file mode 100644
index 0000000000..60efceed08
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Integration tests for Mosaic reader and writer. */
+class MosaicReaderWriterTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @BeforeAll
+    static void checkNativeLibrary() {
+        assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+    }
+
+    @Test
+    void testWriteAndRead() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+
+        writeRows(
+                rowType,
+                path,
+                GenericRow.of(1, BinaryString.fromString("hello")),
+                GenericRow.of(2, BinaryString.fromString("world")));
+
+        List<InternalRow> result = readAll(rowType, rowType, path, null);
+        assertThat(result).hasSize(2);
+        assertThat(result.get(0).getInt(0)).isEqualTo(1);
+        assertThat(result.get(0).getString(1).toString()).isEqualTo("hello");
+        assertThat(result.get(1).getInt(0)).isEqualTo(2);
+        assertThat(result.get(1).getString(1).toString()).isEqualTo("world");
+    }
+
+    @Test
+    void testNullValues() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+
+        writeRows(
+                rowType,
+                path,
+                GenericRow.of(1, null),
+                GenericRow.of(null, BinaryString.fromString("test")),
+                GenericRow.of(null, null));
+
+        List<InternalRow> result = readAll(rowType, rowType, path, null);
+        assertThat(result).hasSize(3);
+        assertThat(result.get(0).isNullAt(1)).isTrue();
+        assertThat(result.get(1).isNullAt(0)).isTrue();
+        assertThat(result.get(2).isNullAt(0)).isTrue();
+        assertThat(result.get(2).isNullAt(1)).isTrue();
+    }
+
+    @Test
+    void testColumnProjection() throws IOException {
+        RowType writeType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .field("f_double", DataTypes.DOUBLE())
+                        .build();
+        RowType readType = RowType.builder().field("f_string", 
DataTypes.STRING()).build();
+        Path path = newPath();
+
+        writeRows(
+                writeType,
+                path,
+                GenericRow.of(1, BinaryString.fromString("aaa"), 1.1),
+                GenericRow.of(2, BinaryString.fromString("bbb"), 2.2));
+
+        List<InternalRow> result = readAll(writeType, readType, path, null);
+        assertThat(result).hasSize(2);
+        assertThat(result.get(0).getString(0).toString()).isEqualTo("aaa");
+        assertThat(result.get(1).getString(0).toString()).isEqualTo("bbb");
+    }
+
+    @Test
+    void testLargeDataset() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+
+        int numRows = 10000;
+        GenericRow[] rows = new GenericRow[numRows];
+        for (int i = 0; i < numRows; i++) {
+            rows[i] = GenericRow.of(i, BinaryString.fromString("row" + i));
+        }
+        writeRows(rowType, path, rows);
+
+        List<InternalRow> result = readAll(rowType, rowType, path, null);
+        assertThat(result).hasSize(numRows);
+        assertThat(result.get(0).getInt(0)).isEqualTo(0);
+        assertThat(result.get(numRows - 1).getInt(0)).isEqualTo(numRows - 1);
+    }
+
+    @Test
+    void testRowGroupPredicateFiltering() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .build();
+        Path path = newPath();
+
+        int numRows = 10000;
+        GenericRow[] rows = new GenericRow[numRows];
+        for (int i = 0; i < numRows; i++) {
+            rows[i] = GenericRow.of(i, BinaryString.fromString("v" + i));
+        }
+        writeRows(rowType, path, "f_int", rows);
+
+        // Predicate that cannot match any row group (all values are 0..9999)
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = builder.greaterThan(0, 99999);
+        List<InternalRow> result =
+                readAll(rowType, rowType, path, 
Collections.singletonList(predicate));
+        assertThat(result).isEmpty();
+
+        // Predicate that matches the row group (values include range 0..9999)
+        Predicate matchPredicate = builder.greaterThan(0, 5000);
+        List<InternalRow> matchResult =
+                readAll(rowType, rowType, path, 
Collections.singletonList(matchPredicate));
+        assertThat(matchResult).hasSize(numRows);
+    }
+
+    @Test
+    void testReturnedPosition() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+
+        writeRows(
+                rowType,
+                path,
+                GenericRow.of(1, BinaryString.fromString("a")),
+                GenericRow.of(2, BinaryString.fromString("b")),
+                GenericRow.of(3, BinaryString.fromString("c")));
+
+        MosaicFileFormat format = createFormat();
+        FormatReaderFactory readerFactory = 
format.createReaderFactory(rowType, rowType, null);
+        LocalFileIO fileIO = new LocalFileIO();
+        RecordReader<InternalRow> reader =
+                readerFactory.createReader(
+                        new FormatReaderContext(fileIO, path, 
fileIO.getFileSize(path)));
+
+        RecordReader.RecordIterator<InternalRow> batch = reader.readBatch();
+        assertThat(batch).isNotNull();
+        FileRecordIterator<InternalRow> fileIter = 
(FileRecordIterator<InternalRow>) batch;
+
+        fileIter.next();
+        assertThat(fileIter.returnedPosition()).isEqualTo(0);
+        fileIter.next();
+        assertThat(fileIter.returnedPosition()).isEqualTo(1);
+        fileIter.next();
+        assertThat(fileIter.returnedPosition()).isEqualTo(2);
+
+        reader.close();
+    }
+
+    @Test
+    void testProjectionWithMissingColumns() throws IOException {
+        RowType writeType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .build();
+        // Read type has a column that doesn't exist in the file (schema 
evolution)
+        RowType readType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_new_col", DataTypes.BIGINT())
+                        .field("f_string", DataTypes.STRING())
+                        .build();
+        Path path = newPath();
+
+        writeRows(
+                writeType,
+                path,
+                GenericRow.of(1, BinaryString.fromString("aaa")),
+                GenericRow.of(2, BinaryString.fromString("bbb")));
+
+        List<InternalRow> result = readAll(writeType, readType, path, null);
+        assertThat(result).hasSize(2);
+        assertThat(result.get(0).getInt(0)).isEqualTo(1);
+        assertThat(result.get(0).isNullAt(1)).isTrue();
+        assertThat(result.get(0).getString(2).toString()).isEqualTo("aaa");
+        assertThat(result.get(1).getInt(0)).isEqualTo(2);
+        assertThat(result.get(1).isNullAt(1)).isTrue();
+        assertThat(result.get(1).getString(2).toString()).isEqualTo("bbb");
+    }
+
+    @Test
+    void testProjectionAllColumnsMissing() throws IOException {
+        RowType writeType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .build();
+        // Read type has only columns that don't exist in the file
+        RowType readType =
+                RowType.builder()
+                        .field("f_new_a", DataTypes.INT())
+                        .field("f_new_b", DataTypes.STRING())
+                        .build();
+        Path path = newPath();
+
+        writeRows(
+                writeType,
+                path,
+                GenericRow.of(1, BinaryString.fromString("x")),
+                GenericRow.of(2, BinaryString.fromString("y")));
+
+        List<InternalRow> result = readAll(writeType, readType, path, null);
+        assertThat(result).hasSize(2);
+        assertThat(result.get(0).isNullAt(0)).isTrue();
+        assertThat(result.get(0).isNullAt(1)).isTrue();
+        assertThat(result.get(1).isNullAt(0)).isTrue();
+        assertThat(result.get(1).isNullAt(1)).isTrue();
+    }
+
+    @Test
+    void testUnsupportedCompressionThrows() {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+        MosaicFileFormat format = createFormat();
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        LocalFileIO fileIO = new LocalFileIO();
+
+        assertThatThrownBy(() -> 
writerFactory.create(fileIO.newOutputStream(path, false), "lz4"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("lz4");
+    }
+
+    @Test
+    void testReachTargetSize() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+        MosaicFileFormat format = createFormat();
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+
+        LocalFileIO fileIO = new LocalFileIO();
+        FormatWriter writer = 
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+
+        boolean reached = false;
+        for (int i = 0; i < 100000; i++) {
+            writer.addElement(GenericRow.of(i, 
BinaryString.fromString("value_" + i + "_padding")));
+            if (writer.reachTargetSize(true, 1024)) {
+                reached = true;
+                break;
+            }
+        }
+        writer.close();
+        assertThat(reached).isTrue();
+    }
+
+    private Path newPath() {
+        return new Path(tempDir.toUri().toString(), UUID.randomUUID() + 
".mosaic");
+    }
+
+    private void writeRows(RowType rowType, Path path, GenericRow... rows) 
throws IOException {
+        writeRows(rowType, path, "", rows);
+    }
+
+    private void writeRows(RowType rowType, Path path, String statsColumns, 
GenericRow... rows)
+            throws IOException {
+        MosaicFileFormat format = createFormat(statsColumns);
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        LocalFileIO fileIO = new LocalFileIO();
+        FormatWriter writer = 
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+        for (GenericRow row : rows) {
+            writer.addElement(row);
+        }
+        writer.close();
+    }
+
+    private List<InternalRow> readAll(
+            RowType dataType, RowType readType, Path path, List<Predicate> 
predicates)
+            throws IOException {
+        MosaicFileFormat format = createFormat();
+        FormatReaderFactory readerFactory =
+                format.createReaderFactory(dataType, readType, predicates);
+        LocalFileIO fileIO = new LocalFileIO();
+        RecordReader<InternalRow> reader =
+                readerFactory.createReader(
+                        new FormatReaderContext(fileIO, path, 
fileIO.getFileSize(path)));
+
+        InternalRowSerializer serializer = new InternalRowSerializer(readType);
+        List<InternalRow> result = new ArrayList<>();
+        reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+        reader.close();
+        return result;
+    }
+
+    private static MosaicFileFormat createFormat() {
+        return createFormat("");
+    }
+
+    private static MosaicFileFormat createFormat(String statsColumns) {
+        Options options = new Options();
+        if (!statsColumns.isEmpty()) {
+            options.set(MosaicFileFormat.STATS_COLUMNS, statsColumns);
+        }
+        return new MosaicFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+    }
+
+    private static boolean isNativeAvailable() {
+        try {
+            Class.forName("org.apache.paimon.mosaic.NativeLib");
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+}
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java
new file mode 100644
index 0000000000..8477c5b065
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleColStatsExtractorTest;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Tests for {@link MosaicSimpleStatsExtractor}. */
+class MosaicSimpleStatsExtractorTest extends SimpleColStatsExtractorTest {
+
+    @TempDir java.nio.file.Path statsTestTempDir;
+
+    @BeforeAll
+    static void checkNativeLibrary() {
+        assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+    }
+
+    @Override
+    protected FileFormat createFormat() {
+        Options options = new Options();
+        options.set(
+                MosaicFileFormat.STATS_COLUMNS,
+                "f_boolean,f_tinyint,f_smallint,f_int,f_bigint,f_float,"
+                        + 
"f_double,f_string,f_decimal_5_2,f_date,f_timestamp3,f_timestamp6");
+        return new MosaicFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+    }
+
+    @Override
+    protected RowType rowType() {
+        return RowType.builder()
+                .field("f_boolean", DataTypes.BOOLEAN())
+                .field("f_tinyint", DataTypes.TINYINT())
+                .field("f_smallint", DataTypes.SMALLINT())
+                .field("f_int", DataTypes.INT())
+                .field("f_bigint", DataTypes.BIGINT())
+                .field("f_float", DataTypes.FLOAT())
+                .field("f_double", DataTypes.DOUBLE())
+                .field("f_string", DataTypes.VARCHAR(100))
+                .field("f_decimal_5_2", DataTypes.DECIMAL(5, 2))
+                .field("f_date", DataTypes.DATE())
+                .field("f_timestamp3", DataTypes.TIMESTAMP(3))
+                .field("f_timestamp6", DataTypes.TIMESTAMP(6))
+                .build();
+    }
+
+    @Override
+    protected String fileCompression() {
+        return "zstd";
+    }
+
+    @Test
+    void testUntrackedColumnsReturnNone() throws IOException {
+        // stats_columns only tracks f_int, but the table has f_int + f_string
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .build();
+        Options options = new Options();
+        options.set(MosaicFileFormat.STATS_COLUMNS, "f_int");
+        MosaicFileFormat format =
+                new MosaicFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+        Path path = new Path(statsTestTempDir.toUri().toString(), 
UUID.randomUUID() + ".mosaic");
+        LocalFileIO fileIO = new LocalFileIO();
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        FormatWriter writer = 
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+        writer.addElement(GenericRow.of(1, BinaryString.fromString("a")));
+        writer.addElement(GenericRow.of(2, BinaryString.fromString("b")));
+        writer.close();
+
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, rowType.getFieldCount())
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        SimpleColStats[] stats = extractor.extract(fileIO, path, 
fileIO.getFileSize(path));
+
+        // f_int is tracked, should have real stats
+        assertThat(stats[0].min()).isEqualTo(1);
+        assertThat(stats[0].max()).isEqualTo(2);
+        assertThat(stats[0].nullCount()).isEqualTo(0L);
+        // f_string is NOT tracked, should be NONE (null nullCount)
+        assertThat(stats[1].min()).isNull();
+        assertThat(stats[1].max()).isNull();
+        assertThat(stats[1].nullCount()).isNull();
+    }
+
+    @Test
+    void testBinaryColumnStatsNoException() throws Exception {
+        // Binary columns produce byte[] from convertStatsValue, which is not 
Comparable.
+        // Verify multi-row-group aggregation doesn't throw ClassCastException.
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_binary", DataTypes.VARBINARY(100))
+                        .build();
+        // Build a fake MosaicWriterMetadata with binary stats across 2 row 
groups
+        java.lang.reflect.Constructor<?> ctor =
+                
org.apache.paimon.mosaic.ColumnStatistics.class.getDeclaredConstructor(
+                        long.class, byte[].class, byte[].class);
+        ctor.setAccessible(true);
+
+        java.util.Map<String, org.apache.paimon.mosaic.ColumnStatistics> rg0 =
+                new java.util.HashMap<>();
+        rg0.put(
+                "f_int",
+                (org.apache.paimon.mosaic.ColumnStatistics)
+                        ctor.newInstance(0L, intBytes(0), intBytes(100)));
+        rg0.put(
+                "f_binary",
+                (org.apache.paimon.mosaic.ColumnStatistics)
+                        ctor.newInstance(0L, new byte[] {1, 2}, new byte[] {3, 
4}));
+
+        java.util.Map<String, org.apache.paimon.mosaic.ColumnStatistics> rg1 =
+                new java.util.HashMap<>();
+        rg1.put(
+                "f_int",
+                (org.apache.paimon.mosaic.ColumnStatistics)
+                        ctor.newInstance(0L, intBytes(50), intBytes(200)));
+        rg1.put(
+                "f_binary",
+                (org.apache.paimon.mosaic.ColumnStatistics)
+                        ctor.newInstance(0L, new byte[] {5, 6}, new byte[] {7, 
8}));
+
+        java.util.List<java.util.Map<String, 
org.apache.paimon.mosaic.ColumnStatistics>> allStats =
+                java.util.Arrays.asList(rg0, rg1);
+        MosaicWriterMetadata metadata =
+                new MosaicWriterMetadata(2, allStats, 
java.util.Arrays.asList("f_int", "f_binary"));
+
+        // Write a minimal file (only f_int in stats_columns since native 
rejects binary)
+        Options options = new Options();
+        options.set(MosaicFileFormat.STATS_COLUMNS, "f_int");
+        MosaicFileFormat format =
+                new MosaicFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+        Path path = new Path(statsTestTempDir.toUri().toString(), 
UUID.randomUUID() + ".mosaic");
+        LocalFileIO fileIO = new LocalFileIO();
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        FormatWriter writer = 
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+        writer.addElement(GenericRow.of(1, new byte[] {1}));
+        writer.close();
+
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, rowType.getFieldCount())
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        // Should not throw ClassCastException
+        SimpleColStats[] stats =
+                extractor.extract(fileIO, path, fileIO.getFileSize(path), 
metadata);
+
+        // f_int aggregated across row groups: min=0, max=200
+        assertThat(stats[0].min()).isEqualTo(0);
+        assertThat(stats[0].max()).isEqualTo(200);
+        // f_binary min/max should be null (byte[] not Comparable, skipped)
+        assertThat(stats[1].min()).isNull();
+        assertThat(stats[1].max()).isNull();
+    }
+
+    private static byte[] intBytes(int value) {
+        return java.nio.ByteBuffer.allocate(4).putInt(value).array();
+    }
+
+    private static boolean isNativeAvailable() {
+        try {
+            Class.forName("org.apache.paimon.mosaic.NativeLib");
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+}
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java
new file mode 100644
index 0000000000..4caf65d661
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.format.SimpleStatsExtractor.FileInfo;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Tests for writer metadata based stats extraction in Mosaic format. */
+class MosaicWriterMetadataTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @BeforeAll
+    static void checkNativeLibrary() {
+        assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+    }
+
+    @Test
+    void testWriterMetadataNotNull() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+
+        FormatWriter writer = createWriter(rowType, path, "f0,f1");
+        writer.addElement(GenericRow.of(1, BinaryString.fromString("hello")));
+        writer.addElement(GenericRow.of(2, BinaryString.fromString("world")));
+        writer.close();
+
+        Object metadata = writer.writerMetadata();
+        assertThat(metadata).isNotNull();
+        assertThat(metadata).isInstanceOf(MosaicWriterMetadata.class);
+
+        MosaicWriterMetadata mosaicMeta = (MosaicWriterMetadata) metadata;
+        assertThat(mosaicMeta.numRowGroups()).isGreaterThan(0);
+    }
+
+    @Test
+    void testStatsFromMetadataMatchesStatsFromFile() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_bigint", DataTypes.BIGINT())
+                        .field("f_string", DataTypes.STRING())
+                        .field("f_double", DataTypes.DOUBLE())
+                        .build();
+        Path path = newPath();
+        String statsColumns = "f_int,f_bigint,f_string,f_double";
+
+        FormatWriter writer = createWriter(rowType, path, statsColumns);
+        for (int i = 0; i < 1000; i++) {
+            writer.addElement(
+                    GenericRow.of(i, (long) i * 100, 
BinaryString.fromString("val_" + i), i * 1.1));
+        }
+        writer.close();
+
+        Object metadata = writer.writerMetadata();
+        assertThat(metadata).isNotNull();
+
+        MosaicFileFormat format = createFormat(statsColumns);
+        int fieldCount = rowType.getFieldCount();
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, fieldCount)
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        LocalFileIO fileIO = new LocalFileIO();
+        long fileSize = fileIO.getFileSize(path);
+
+        SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize);
+        SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, 
fileSize, metadata);
+
+        assertThat(fromMetadata).isEqualTo(fromFile);
+    }
+
+    @Test
+    void testStatsFromMetadataWithNullValues() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .build();
+        Path path = newPath();
+        String statsColumns = "f_int,f_string";
+
+        FormatWriter writer = createWriter(rowType, path, statsColumns);
+        writer.addElement(GenericRow.of(1, null));
+        writer.addElement(GenericRow.of(null, BinaryString.fromString("a")));
+        writer.addElement(GenericRow.of(3, BinaryString.fromString("b")));
+        writer.close();
+
+        Object metadata = writer.writerMetadata();
+        MosaicFileFormat format = createFormat(statsColumns);
+        int fieldCount = rowType.getFieldCount();
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, fieldCount)
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        LocalFileIO fileIO = new LocalFileIO();
+        long fileSize = fileIO.getFileSize(path);
+
+        SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, 
fileSize, metadata);
+        assertThat(fromMetadata).isNotNull();
+        assertThat(fromMetadata[0].nullCount()).isEqualTo(1L);
+        assertThat(fromMetadata[1].nullCount()).isEqualTo(1L);
+    }
+
+    @Test
+    void testExtractWithFileInfoRowCount() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .build();
+        Path path = newPath();
+        String statsColumns = "f_int,f_string";
+
+        int numRows = 500;
+        FormatWriter writer = createWriter(rowType, path, statsColumns);
+        for (int i = 0; i < numRows; i++) {
+            writer.addElement(GenericRow.of(i, BinaryString.fromString("row_" 
+ i)));
+        }
+        writer.close();
+
+        MosaicFileFormat format = createFormat(statsColumns);
+        int fieldCount = rowType.getFieldCount();
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, fieldCount)
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        LocalFileIO fileIO = new LocalFileIO();
+        long fileSize = fileIO.getFileSize(path);
+
+        Pair<SimpleColStats[], FileInfo> result =
+                extractor.extractWithFileInfo(fileIO, path, fileSize);
+        assertThat(result.getRight().getRowCount()).isEqualTo(numRows);
+        assertThat(result.getLeft()).isNotNull();
+        assertThat(result.getLeft()).hasSize(fieldCount);
+    }
+
+    @Test
+    void testPartialStatsColumnsFromMetadata() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .field("f_double", DataTypes.DOUBLE())
+                        .build();
+        Path path = newPath();
+        String statsColumns = "f_int";
+
+        FormatWriter writer = createWriter(rowType, path, statsColumns);
+        writer.addElement(GenericRow.of(1, BinaryString.fromString("a"), 1.0));
+        writer.addElement(GenericRow.of(null, BinaryString.fromString("b"), 
2.0));
+        writer.addElement(GenericRow.of(3, null, null));
+        writer.close();
+
+        Object metadata = writer.writerMetadata();
+        assertThat(metadata).isInstanceOf(MosaicWriterMetadata.class);
+        MosaicWriterMetadata mosaicMeta = (MosaicWriterMetadata) metadata;
+        assertThat(mosaicMeta.statsColumnNames()).containsExactly("f_int");
+
+        MosaicFileFormat format = createFormat(statsColumns);
+        int fieldCount = rowType.getFieldCount();
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, fieldCount)
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        LocalFileIO fileIO = new LocalFileIO();
+        long fileSize = fileIO.getFileSize(path);
+
+        SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, 
fileSize, metadata);
+
+        // f_int has stats: min=1, max=3, nullCount=1
+        assertThat(fromMetadata[0].min()).isEqualTo(1);
+        assertThat(fromMetadata[0].max()).isEqualTo(3);
+        assertThat(fromMetadata[0].nullCount()).isEqualTo(1L);
+
+        // f_string and f_double have no stats (not in statsColumns)
+        assertThat(fromMetadata[1].min()).isNull();
+        assertThat(fromMetadata[1].max()).isNull();
+        assertThat(fromMetadata[1].nullCount()).isNull();
+        assertThat(fromMetadata[2].min()).isNull();
+        assertThat(fromMetadata[2].max()).isNull();
+        assertThat(fromMetadata[2].nullCount()).isNull();
+    }
+
+    @Test
+    void testStatsOnMiddleColumn() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .field("f_double", DataTypes.DOUBLE())
+                        .build();
+        Path path = newPath();
+        String statsColumns = "f_string";
+
+        FormatWriter writer = createWriter(rowType, path, statsColumns);
+        writer.addElement(GenericRow.of(1, BinaryString.fromString("banana"), 
1.0));
+        writer.addElement(GenericRow.of(2, BinaryString.fromString("apple"), 
2.0));
+        writer.addElement(GenericRow.of(3, null, 3.0));
+        writer.close();
+
+        Object metadata = writer.writerMetadata();
+        MosaicFileFormat format = createFormat(statsColumns);
+        int fieldCount = rowType.getFieldCount();
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, fieldCount)
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        LocalFileIO fileIO = new LocalFileIO();
+        long fileSize = fileIO.getFileSize(path);
+
+        SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, 
fileSize, metadata);
+
+        // f_int has no stats
+        assertThat(fromMetadata[0].min()).isNull();
+        assertThat(fromMetadata[0].max()).isNull();
+        assertThat(fromMetadata[0].nullCount()).isNull();
+
+        // f_string has stats: min="apple", max="banana", nullCount=1
+        
assertThat(fromMetadata[1].min()).isEqualTo(BinaryString.fromString("apple"));
+        
assertThat(fromMetadata[1].max()).isEqualTo(BinaryString.fromString("banana"));
+        assertThat(fromMetadata[1].nullCount()).isEqualTo(1L);
+
+        // f_double has no stats
+        assertThat(fromMetadata[2].min()).isNull();
+        assertThat(fromMetadata[2].max()).isNull();
+        assertThat(fromMetadata[2].nullCount()).isNull();
+    }
+
+    @Test
+    void testPartialStatsColumnsFromFile() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("f_int", DataTypes.INT())
+                        .field("f_string", DataTypes.STRING())
+                        .field("f_double", DataTypes.DOUBLE())
+                        .build();
+        Path path = newPath();
+        String statsColumns = "f_string";
+
+        FormatWriter writer = createWriter(rowType, path, statsColumns);
+        writer.addElement(GenericRow.of(1, BinaryString.fromString("banana"), 
1.0));
+        writer.addElement(GenericRow.of(2, BinaryString.fromString("apple"), 
2.0));
+        writer.addElement(GenericRow.of(3, null, 3.0));
+        writer.close();
+
+        // Extract from file (no writer metadata), simulating fallback path
+        MosaicFileFormat format = createFormat(statsColumns);
+        int fieldCount = rowType.getFieldCount();
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, fieldCount)
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        LocalFileIO fileIO = new LocalFileIO();
+        long fileSize = fileIO.getFileSize(path);
+
+        SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize);
+
+        // f_int has no stats in file
+        assertThat(fromFile[0].min()).isNull();
+        assertThat(fromFile[0].max()).isNull();
+        assertThat(fromFile[0].nullCount()).isNull();
+
+        // f_string has stats
+        
assertThat(fromFile[1].min()).isEqualTo(BinaryString.fromString("apple"));
+        
assertThat(fromFile[1].max()).isEqualTo(BinaryString.fromString("banana"));
+        assertThat(fromFile[1].nullCount()).isEqualTo(1L);
+
+        // f_double has no stats in file
+        assertThat(fromFile[2].min()).isNull();
+        assertThat(fromFile[2].max()).isNull();
+        assertThat(fromFile[2].nullCount()).isNull();
+    }
+
+    @Test
+    void testFallbackToFileWhenMetadataIsNull() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        Path path = newPath();
+        String statsColumns = "f0,f1";
+
+        FormatWriter writer = createWriter(rowType, path, statsColumns);
+        writer.addElement(GenericRow.of(10, BinaryString.fromString("test")));
+        writer.close();
+
+        MosaicFileFormat format = createFormat(statsColumns);
+        int fieldCount = rowType.getFieldCount();
+        SimpleColStatsCollector.Factory[] collectors =
+                IntStream.range(0, fieldCount)
+                        .mapToObj(i -> SimpleColStatsCollector.from("full"))
+                        .toArray(SimpleColStatsCollector.Factory[]::new);
+
+        SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, 
collectors).get();
+        LocalFileIO fileIO = new LocalFileIO();
+        long fileSize = fileIO.getFileSize(path);
+
+        SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize);
+        SimpleColStats[] fromNull = extractor.extract(fileIO, path, fileSize, 
null);
+
+        assertThat(fromNull).isEqualTo(fromFile);
+    }
+
+    private Path newPath() {
+        return new Path(tempDir.toUri().toString(), UUID.randomUUID() + 
".mosaic");
+    }
+
+    private FormatWriter createWriter(RowType rowType, Path path, String 
statsColumns)
+            throws IOException {
+        MosaicFileFormat format = createFormat(statsColumns);
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        LocalFileIO fileIO = new LocalFileIO();
+        return writerFactory.create(fileIO.newOutputStream(path, false), 
"zstd");
+    }
+
+    private static MosaicFileFormat createFormat() {
+        return createFormat("");
+    }
+
+    private static MosaicFileFormat createFormat(String statsColumns) {
+        Options options = new Options();
+        if (!statsColumns.isEmpty()) {
+            options.set(MosaicFileFormat.STATS_COLUMNS, statsColumns);
+        }
+        return new MosaicFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+    }
+
+    private static boolean isNativeAvailable() {
+        try {
+            Class.forName("org.apache.paimon.mosaic.NativeLib");
+            return true;
+        } catch (Throwable t) {
+            return false;
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index 5a336fb76c..d2c02ae4d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,7 @@ under the License.
         <module>paimon-api</module>
         <module>paimon-lumina</module>
         <module>paimon-vortex</module>
+        <module>paimon-mosaic</module>
         <module>paimon-tantivy</module>
     </modules>
 

Reply via email to