This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a8e333c467 [format] Add paimon-mosaic module with reader and writer
(#7917)
a8e333c467 is described below
commit a8e333c4676f2d726dc749cf8dadd2c8b954a3ac
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 3 13:09:24 2026 +0800
[format] Add paimon-mosaic module with reader and writer (#7917)
See https://paimon.apache.org/docs/mosaic/
Introduces the Mosaic file format integration for Paimon with:
- MosaicRecordsReader: row-group level predicate filtering using
statistics, column projection, and correct returnedPosition tracking
- MosaicRecordsWriter: BundleFormatWriter with writerMetadata() support
for in-memory stats capture (avoids re-reading files on object stores)
- MosaicSimpleStatsExtractor: stats extraction from file or
writerMetadata, with SimpleColStatsCollector integration
- MosaicObjects: byte[] to Paimon object conversion for all supported
types
- Comprehensive test suite (6 test classes covering unit and integration
tests)
---
paimon-mosaic/pom.xml | 86 +++++
.../paimon/format/mosaic/MosaicFileFormat.java | 238 +++++++++++++
.../format/mosaic/MosaicFileFormatFactory.java | 38 ++
.../format/mosaic/MosaicInputFileAdapter.java | 79 +++++
.../apache/paimon/format/mosaic/MosaicObjects.java | 103 ++++++
.../paimon/format/mosaic/MosaicReaderFactory.java | 60 ++++
.../paimon/format/mosaic/MosaicRecordsReader.java | 214 ++++++++++++
.../paimon/format/mosaic/MosaicRecordsWriter.java | 199 +++++++++++
.../format/mosaic/MosaicSimpleStatsExtractor.java | 180 ++++++++++
.../paimon/format/mosaic/MosaicWriterFactory.java | 75 ++++
.../paimon/format/mosaic/MosaicWriterMetadata.java | 53 +++
.../org.apache.paimon.format.FileFormatFactory | 16 +
.../paimon/format/mosaic/MosaicFileFormatTest.java | 124 +++++++
.../format/mosaic/MosaicFormatReadWriteTest.java | 137 ++++++++
.../paimon/format/mosaic/MosaicObjectsTest.java | 209 +++++++++++
.../format/mosaic/MosaicReaderWriterTest.java | 361 +++++++++++++++++++
.../mosaic/MosaicSimpleStatsExtractorTest.java | 212 +++++++++++
.../format/mosaic/MosaicWriterMetadataTest.java | 386 +++++++++++++++++++++
pom.xml | 1 +
19 files changed, 2771 insertions(+)
diff --git a/paimon-mosaic/pom.xml b/paimon-mosaic/pom.xml
new file mode 100644
index 0000000000..6e0d1eae28
--- /dev/null
+++ b/paimon-mosaic/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>paimon-parent</artifactId>
+ <groupId>org.apache.paimon</groupId>
+ <version>1.5-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>paimon-mosaic</artifactId>
+ <name>Paimon : Mosaic Format</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>mosaic</artifactId>
+ <version>0.1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-arrow</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java
new file mode 100644
index 0000000000..bff850a4e0
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BlobType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/** Mosaic {@link FileFormat}. */
+public class MosaicFileFormat extends FileFormat {
+
+ public static final ConfigOption<String> STATS_COLUMNS =
+ ConfigOptions.key("mosaic.stats-columns")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "Comma-separated list of column names to collect
statistics for. "
+ + "Empty means no statistics collection.");
+
+ public static final ConfigOption<Integer> NUM_BUCKETS =
+ ConfigOptions.key("mosaic.num-buckets")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Number of column buckets for parallel
IO.");
+
+ static {
+ System.setProperty("arrow.enable_unsafe_memory_access", "true");
+ }
+
+ private final FileFormatFactory.FormatContext formatContext;
+
+ public MosaicFileFormat(FileFormatFactory.FormatContext formatContext) {
+ super("mosaic");
+ this.formatContext = formatContext;
+ }
+
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List<Predicate> predicates) {
+ return new MosaicReaderFactory(dataSchemaRowType, projectedRowType,
predicates);
+ }
+
+ @Override
+ public FormatWriterFactory createWriterFactory(RowType type) {
+ return new MosaicWriterFactory(type, formatContext);
+ }
+
+ @Override
+ public void validateDataFields(RowType rowType) {
+ MosaicRowTypeVisitor visitor = new MosaicRowTypeVisitor();
+ for (DataType fieldType : rowType.getFieldTypes()) {
+ fieldType.accept(visitor);
+ }
+ }
+
+ @Override
+ public Optional<SimpleStatsExtractor> createStatsExtractor(
+ RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
+ return Optional.of(new MosaicSimpleStatsExtractor(type,
statsCollectors));
+ }
+
+ static class MosaicRowTypeVisitor implements DataTypeVisitor<Void> {
+
+ @Override
+ public Void visit(CharType charType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(VarCharType varCharType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(BooleanType booleanType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(BinaryType binaryType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(VarBinaryType varBinaryType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(DecimalType decimalType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(TinyIntType tinyIntType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(SmallIntType smallIntType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(IntType intType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(BigIntType bigIntType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(FloatType floatType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(DoubleType doubleType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(DateType dateType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(TimeType timeType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(TimestampType timestampType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(LocalZonedTimestampType localZonedTimestampType) {
+ return null;
+ }
+
+ @Override
+ public Void visit(VariantType variantType) {
+ throw new UnsupportedOperationException(
+ "Mosaic file format does not support type VARIANT");
+ }
+
+ @Override
+ public Void visit(BlobType blobType) {
+ throw new UnsupportedOperationException(
+ "Mosaic file format does not support type BLOB");
+ }
+
+ @Override
+ public Void visit(ArrayType arrayType) {
+ throw new UnsupportedOperationException(
+ "Mosaic file format does not support type ARRAY");
+ }
+
+ @Override
+ public Void visit(VectorType vectorType) {
+ throw new UnsupportedOperationException(
+ "Mosaic file format does not support type VECTOR");
+ }
+
+ @Override
+ public Void visit(MultisetType multisetType) {
+ throw new UnsupportedOperationException(
+ "Mosaic file format does not support type MULTISET");
+ }
+
+ @Override
+ public Void visit(MapType mapType) {
+ throw new UnsupportedOperationException("Mosaic file format does
not support type MAP");
+ }
+
+ @Override
+ public Void visit(RowType rowType) {
+ throw new UnsupportedOperationException("Mosaic file format does
not support type ROW");
+ }
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java
new file mode 100644
index 0000000000..782faba3e8
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link MosaicFileFormat}. */
+public class MosaicFileFormatFactory implements FileFormatFactory {
+
+ public static final String IDENTIFIER = "mosaic";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public FileFormat create(FormatContext formatContext) {
+ return new MosaicFileFormat(formatContext);
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java
new file mode 100644
index 0000000000..3a307ea0f2
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
+import org.apache.paimon.mosaic.InputFile;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Adapts Paimon's {@link FileIO} to Mosaic's {@link InputFile} interface.
+ *
+ * <p>Maintains a single {@link SeekableInputStream}. If the stream implements
{@link
+ * VectoredReadable}, reads use {@link VectoredReadable#preadFully} which is
thread-safe. Otherwise,
+ * reads are synchronized to protect seek+read sequences.
+ */
+public class MosaicInputFileAdapter implements InputFile, Closeable {
+
+ private final Path path;
+ private final SeekableInputStream in;
+ private final VectoredReadable vectoredReadable;
+
+ public MosaicInputFileAdapter(FileIO fileIO, Path path) throws IOException
{
+ this.path = path;
+ this.in = fileIO.newInputStream(path);
+ this.vectoredReadable = in instanceof VectoredReadable ?
(VectoredReadable) in : null;
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int
length) throws IOException {
+ if (vectoredReadable != null) {
+ vectoredReadable.preadFully(position, buffer, offset, length);
+ } else {
+ synchronized (in) {
+ in.seek(position);
+ int remaining = length;
+ int off = offset;
+ while (remaining > 0) {
+ int read = in.read(buffer, off, remaining);
+ if (read < 0) {
+ throw new EOFException(
+ "Reached end of file while reading "
+ + path
+ + " at position "
+ + position);
+ }
+ off += read;
+ remaining -= read;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java
new file mode 100644
index 0000000000..54d15c43c0
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.TimestampType;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+/** Converts Mosaic's byte[] statistics to Paimon objects. */
+public class MosaicObjects {
+
+ @Nullable
+ public static Object convertStatsValue(byte[] bytes, DataType dataType) {
+ if (bytes == null) {
+ return null;
+ }
+ switch (dataType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return BinaryString.fromBytes(bytes);
+ case BINARY:
+ case VARBINARY:
+ return bytes;
+ default:
+ break;
+ }
+ if (bytes.length == 0) {
+ return null;
+ }
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ switch (dataType.getTypeRoot()) {
+ case BOOLEAN:
+ return bytes[0] != 0;
+ case TINYINT:
+ return bytes[0];
+ case SMALLINT:
+ return buf.getShort();
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return buf.getInt();
+ case BIGINT:
+ return buf.getLong();
+ case FLOAT:
+ return buf.getFloat();
+ case DOUBLE:
+ return buf.getDouble();
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) dataType;
+ BigInteger unscaled = new BigInteger(bytes);
+ BigDecimal decimal = new BigDecimal(unscaled,
decimalType.getScale());
+ return Decimal.fromBigDecimal(
+ decimal, decimalType.getPrecision(),
decimalType.getScale());
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return convertTimestamp(buf, ((TimestampType)
dataType).getPrecision());
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return convertTimestamp(buf, ((LocalZonedTimestampType)
dataType).getPrecision());
+ default:
+ return null;
+ }
+ }
+
+ private static Timestamp convertTimestamp(ByteBuffer buf, int precision) {
+ if (precision <= 3) {
+ return Timestamp.fromEpochMillis(buf.getLong());
+ } else if (precision <= 6) {
+ return Timestamp.fromMicros(buf.getLong());
+ } else {
+ // precision 7-9: 12 bytes = i64 millis (BE) + i32 nanos_of_milli
(BE)
+ long millis = buf.getLong();
+ int nanosOfMilli = buf.getInt();
+ return Timestamp.fromEpochMillis(millis, nanosOfMilli);
+ }
+ }
+
+ private MosaicObjects() {}
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java
new file mode 100644
index 0000000000..5b39c867e2
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** A factory to create Mosaic reader. */
+public class MosaicReaderFactory implements FormatReaderFactory {
+
+ private final RowType dataSchemaRowType;
+ private final RowType projectedRowType;
+ @Nullable private final List<Predicate> predicates;
+
+ public MosaicReaderFactory(
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List<Predicate> predicates) {
+ this.dataSchemaRowType = dataSchemaRowType;
+ this.projectedRowType = projectedRowType;
+ this.predicates = predicates;
+ }
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context) throws
IOException {
+ MosaicInputFileAdapter inputFile =
+ new MosaicInputFileAdapter(context.fileIO(),
context.filePath());
+ return new MosaicRecordsReader(
+ inputFile,
+ context.fileSize(),
+ dataSchemaRowType,
+ projectedRowType,
+ predicates,
+ context.filePath());
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
new file mode 100644
index 0000000000..24cdcbf05c
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.arrow.reader.ArrowBatchReader;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.mosaic.ColumnStatistics;
+import org.apache.paimon.mosaic.MosaicReader;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.format.mosaic.MosaicObjects.convertStatsValue;
+
+/** File reader for Mosaic format. */
+public class MosaicRecordsReader implements FileRecordReader<InternalRow> {
+
+ private final MosaicInputFileAdapter inputFileAdapter;
+ private final MosaicReader reader;
+ private final ArrowBatchReader arrowBatchReader;
+ private final Path filePath;
+ private final BufferAllocator allocator;
+ private final int numRowGroups;
+ private final RowType dataSchemaRowType;
+ @Nullable private final List<Predicate> predicates;
+
+ private int currentRowGroup;
+ private long returnedPosition = -1;
+ private VectorSchemaRoot currentVsr;
+
+ public MosaicRecordsReader(
+ MosaicInputFileAdapter inputFileAdapter,
+ long fileSize,
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List<Predicate> predicates,
+ Path filePath) {
+ this.filePath = filePath;
+ this.inputFileAdapter = inputFileAdapter;
+ this.dataSchemaRowType = dataSchemaRowType;
+ this.predicates = predicates;
+ this.allocator = new RootAllocator();
+
+ try {
+ this.reader = MosaicReader.open(inputFileAdapter, fileSize,
allocator);
+ } catch (Exception e) {
+ allocator.close();
+ throw e;
+ }
+
+ Schema fileSchema = reader.getSchema();
+ Set<String> fileColumnNames = new HashSet<>();
+ for (Field field : fileSchema.getFields()) {
+ fileColumnNames.add(field.getName());
+ }
+ List<String> projectedNames = projectedRowType.getFieldNames();
+ List<String> existingColumns = new ArrayList<>();
+ for (String name : projectedNames) {
+ if (fileColumnNames.contains(name)) {
+ existingColumns.add(name);
+ }
+ }
+ if (!existingColumns.isEmpty()) {
+ reader.project(existingColumns.toArray(new String[0]));
+ }
+
+ this.numRowGroups = reader.numRowGroups();
+ this.currentRowGroup = 0;
+ this.arrowBatchReader = new ArrowBatchReader(projectedRowType, true);
+ }
+
+ @Nullable
+ @Override
+ public FileRecordIterator<InternalRow> readBatch() throws IOException {
+ while (currentRowGroup < numRowGroups) {
+ int numRows = reader.rowGroupNumRows(currentRowGroup);
+ if (!matchesRowGroup(currentRowGroup, numRows)) {
+ returnedPosition += numRows;
+ currentRowGroup++;
+ continue;
+ }
+
+ releaseCurrentVsr();
+
+ VectorSchemaRoot vsr = reader.readRowGroup(currentRowGroup,
allocator);
+ currentRowGroup++;
+ this.currentVsr = vsr;
+
+ Iterator<InternalRow> rows =
arrowBatchReader.readBatch(vsr).iterator();
+
+ return new FileRecordIterator<InternalRow>() {
+ @Override
+ public long returnedPosition() {
+ return returnedPosition;
+ }
+
+ @Override
+ public Path filePath() {
+ return filePath;
+ }
+
+ @Nullable
+ @Override
+ public InternalRow next() {
+ if (rows.hasNext()) {
+ returnedPosition++;
+ return rows.next();
+ }
+ return null;
+ }
+
+ @Override
+ public void releaseBatch() {
+ releaseCurrentVsr();
+ }
+ };
+ }
+ return null;
+ }
+
+ private boolean matchesRowGroup(int rowGroupIndex, long rowCount) {
+ if (predicates == null || predicates.isEmpty()) {
+ return true;
+ }
+
+ Map<String, ColumnStatistics> statsMap =
reader.getRowGroupStatistics(rowGroupIndex);
+ if (statsMap.isEmpty()) {
+ return true;
+ }
+
+ int fieldCount = dataSchemaRowType.getFieldCount();
+ GenericRow minValues = new GenericRow(fieldCount);
+ GenericRow maxValues = new GenericRow(fieldCount);
+ long[] nullCounts = new long[fieldCount];
+
+ List<DataField> fields = dataSchemaRowType.getFields();
+ for (int i = 0; i < fieldCount; i++) {
+ String colName = fields.get(i).name();
+ ColumnStatistics stats = statsMap.get(colName);
+ if (stats == null) {
+ continue;
+ }
+
+ nullCounts[i] = stats.getNullCount();
+ if (stats.hasMinMax()) {
+ DataType dataType = fields.get(i).type();
+ Object min = convertStatsValue(stats.getMin(), dataType);
+ Object max = convertStatsValue(stats.getMax(), dataType);
+ minValues.setField(i, min);
+ maxValues.setField(i, max);
+ }
+ }
+
+ for (Predicate predicate : predicates) {
+ if (!predicate.test(rowCount, minValues, maxValues, new
GenericArray(nullCounts))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void releaseCurrentVsr() {
+ if (currentVsr != null) {
+ currentVsr.close();
+ currentVsr = null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ releaseCurrentVsr();
+ reader.close();
+ allocator.close();
+ inputFileAdapter.close();
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
new file mode 100644
index 0000000000..fdef0eb365
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.arrow.ArrowBundleRecords;
+import org.apache.paimon.arrow.vector.ArrowFormatWriter;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BundleFormatWriter;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.mosaic.ColumnStatistics;
+import org.apache.paimon.mosaic.MosaicWriter;
+import org.apache.paimon.mosaic.WriterOptions;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Mosaic records writer. */
+public class MosaicRecordsWriter implements BundleFormatWriter {
+
+ private final ArrowFormatWriter arrowFormatWriter;
+ private final MosaicWriter nativeWriter;
+ private final BufferAllocator allocator;
+ private final List<String> statsColumnNames;
+ @Nullable private MosaicWriterMetadata metadata;
+
+ public MosaicRecordsWriter(
+ OutputStream outputStream,
+ RowType rowType,
+ FileFormatFactory.FormatContext formatContext,
+ List<String> statsColumnNames,
+ @Nullable Integer numBuckets) {
+ this.statsColumnNames = statsColumnNames;
+ this.allocator = new RootAllocator();
+
+ int writeBatchSize = formatContext.writeBatchSize();
+ long writeBatchMemory = formatContext.writeBatchMemory().getBytes();
+
+ this.arrowFormatWriter =
+ new ArrowFormatWriter(rowType, writeBatchSize, true,
allocator, writeBatchMemory);
+
+ WriterOptions options = new
WriterOptions().zstdLevel(formatContext.zstdLevel());
+ if (numBuckets != null) {
+ options = options.numBuckets(numBuckets);
+ }
+ MemorySize blockSize = formatContext.blockSize();
+ if (blockSize != null) {
+ options = options.rowGroupMaxSize(blockSize.getBytes());
+ }
+ if (!statsColumnNames.isEmpty()) {
+ options.statsColumns(statsColumnNames.toArray(new String[0]));
+ }
+
+ Schema arrowSchema =
arrowFormatWriter.getVectorSchemaRoot().getSchema();
+ this.nativeWriter = new MosaicWriter(outputStream, arrowSchema,
options, allocator);
+ }
+
+ @Override
+ public void addElement(InternalRow internalRow) {
+ if (!arrowFormatWriter.write(internalRow)) {
+ flush();
+ if (!arrowFormatWriter.write(internalRow)) {
+ throw new RuntimeException("Failed to write row to Mosaic
file");
+ }
+ }
+ }
+
+ @Override
+ public void writeBundle(BundleRecords bundleRecords) {
+ if (bundleRecords instanceof ArrowBundleRecords) {
+ flush();
+ nativeWriter.write(((ArrowBundleRecords)
bundleRecords).getVectorSchemaRoot());
+ } else {
+ for (InternalRow row : bundleRecords) {
+ addElement(row);
+ }
+ }
+ }
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize) {
+ if (!suggestedCheck) {
+ return false;
+ }
+ return nativeWriter.estimatedFileSize() >= targetSize;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Throwable throwable = null;
+
+ try {
+ flush();
+ } catch (Throwable t) {
+ throwable = t;
+ }
+
+ try {
+ nativeWriter.close();
+ } catch (Throwable t) {
+ throwable = addSuppressed(throwable, t);
+ }
+
+ try {
+ collectMetadata();
+ } catch (Throwable t) {
+ throwable = addSuppressed(throwable, t);
+ }
+
+ try {
+ arrowFormatWriter.close();
+ } catch (Throwable t) {
+ throwable = addSuppressed(throwable, t);
+ }
+
+ try {
+ allocator.close();
+ } catch (Throwable t) {
+ throwable = addSuppressed(throwable, t);
+ }
+
+ if (throwable != null) {
+ rethrow(throwable);
+ }
+ }
+
+ @Nullable
+ @Override
+ public Object writerMetadata() {
+ return metadata;
+ }
+
+ private void collectMetadata() {
+ int numRowGroups = nativeWriter.numRowGroups();
+ List<Map<String, ColumnStatistics>> allStats = new
ArrayList<>(numRowGroups);
+ for (int i = 0; i < numRowGroups; i++) {
+ allStats.add(nativeWriter.getRowGroupStatistics(i));
+ }
+ this.metadata = new MosaicWriterMetadata(numRowGroups, allStats,
statsColumnNames);
+ }
+
+ private void flush() {
+ arrowFormatWriter.flush();
+ if (!arrowFormatWriter.empty()) {
+ VectorSchemaRoot vsr = arrowFormatWriter.getVectorSchemaRoot();
+ nativeWriter.write(vsr);
+ }
+ arrowFormatWriter.reset();
+ }
+
+ private static Throwable addSuppressed(Throwable throwable, Throwable
suppressed) {
+ if (throwable == null) {
+ return suppressed;
+ }
+ throwable.addSuppressed(suppressed);
+ return throwable;
+ }
+
+ private static void rethrow(Throwable throwable) throws IOException {
+ if (throwable instanceof IOException) {
+ throw (IOException) throwable;
+ }
+ if (throwable instanceof RuntimeException) {
+ throw (RuntimeException) throwable;
+ }
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ }
+ throw new IOException(throwable);
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java
new file mode 100644
index 0000000000..f426b27cfa
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.mosaic.ColumnStatistics;
+import org.apache.paimon.mosaic.MosaicReader;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.format.mosaic.MosaicObjects.convertStatsValue;
+
+/** Extracts statistics from Mosaic file metadata. */
+public class MosaicSimpleStatsExtractor implements SimpleStatsExtractor {
+
+ private final RowType rowType;
+ private final SimpleColStatsCollector.Factory[] statsCollectors;
+
+ public MosaicSimpleStatsExtractor(
+ RowType rowType, SimpleColStatsCollector.Factory[]
statsCollectors) {
+ this.rowType = rowType;
+ this.statsCollectors = statsCollectors;
+ }
+
+ @Override
+ public SimpleColStats[] extract(FileIO fileIO, Path path, long length) {
+ try (MosaicInputFileAdapter inputFile = new
MosaicInputFileAdapter(fileIO, path);
+ BufferAllocator allocator = new RootAllocator();
+ MosaicReader reader = MosaicReader.open(inputFile, length,
allocator)) {
+ return extractFromStats(reader.numRowGroups(),
reader::getRowGroupStatistics, null);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to extract stats from " + path,
e);
+ }
+ }
+
+ @Override
+ public SimpleColStats[] extract(
+ FileIO fileIO, Path path, long length, @Nullable Object
writerMetadata) {
+ if (writerMetadata instanceof MosaicWriterMetadata) {
+ MosaicWriterMetadata meta = (MosaicWriterMetadata) writerMetadata;
+ Set<Integer> statsFieldIndices =
resolveStatsFieldIndices(meta.statsColumnNames());
+ return extractFromStats(
+ meta.numRowGroups(), meta::getRowGroupStatistics,
statsFieldIndices);
+ }
+ return extract(fileIO, path, length);
+ }
+
+ @Override
+ public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(
+ FileIO fileIO, Path path, long length) {
+ try (MosaicInputFileAdapter inputFile = new
MosaicInputFileAdapter(fileIO, path);
+ BufferAllocator allocator = new RootAllocator();
+ MosaicReader reader = MosaicReader.open(inputFile, length,
allocator)) {
+ int numRowGroups = reader.numRowGroups();
+ SimpleColStats[] stats =
+ extractFromStats(numRowGroups,
reader::getRowGroupStatistics, null);
+ long rowCount = 0;
+ for (int rg = 0; rg < numRowGroups; rg++) {
+ rowCount += reader.rowGroupNumRows(rg);
+ }
+ return Pair.of(stats, new FileInfo(rowCount));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to extract stats from " + path,
e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private SimpleColStats[] extractFromStats(
+ int numRowGroups,
+ RowGroupStatsProvider statsProvider,
+ @Nullable Set<Integer> statsFieldIndices) {
+ int fieldCount = rowType.getFieldCount();
+ List<String> fieldNames = rowType.getFieldNames();
+ Object[] minValues = new Object[fieldCount];
+ Object[] maxValues = new Object[fieldCount];
+ long[] nullCounts = new long[fieldCount];
+ Set<Integer> seenColumns = new HashSet<>();
+
+ for (int rg = 0; rg < numRowGroups; rg++) {
+ Map<String, ColumnStatistics> statsMap =
statsProvider.getRowGroupStatistics(rg);
+ for (Map.Entry<String, ColumnStatistics> entry :
statsMap.entrySet()) {
+ int colIdx = fieldNames.indexOf(entry.getKey());
+ if (colIdx < 0) {
+ continue;
+ }
+
+ ColumnStatistics stat = entry.getValue();
+ seenColumns.add(colIdx);
+ nullCounts[colIdx] += stat.getNullCount();
+
+ if (stat.hasMinMax()) {
+ DataType dataType = rowType.getFields().get(colIdx).type();
+ Object min = convertStatsValue(stat.getMin(), dataType);
+ Object max = convertStatsValue(stat.getMax(), dataType);
+ if (min instanceof Comparable) {
+ if (minValues[colIdx] == null) {
+ minValues[colIdx] = min;
+ } else {
+ if (((Comparable<Object>)
min).compareTo(minValues[colIdx]) < 0) {
+ minValues[colIdx] = min;
+ }
+ }
+ }
+ if (max instanceof Comparable) {
+ if (maxValues[colIdx] == null) {
+ maxValues[colIdx] = max;
+ } else {
+ if (((Comparable<Object>)
max).compareTo(maxValues[colIdx]) > 0) {
+ maxValues[colIdx] = max;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ Set<Integer> trackedColumns = statsFieldIndices != null ?
statsFieldIndices : seenColumns;
+ SimpleColStatsCollector[] collectors =
SimpleColStatsCollector.create(statsCollectors);
+ SimpleColStats[] result = new SimpleColStats[fieldCount];
+ for (int i = 0; i < fieldCount; i++) {
+ if (!trackedColumns.contains(i) || !seenColumns.contains(i)) {
+ result[i] = collectors[i].convert(new SimpleColStats(null,
null, null));
+ } else {
+ SimpleColStats fieldStats =
+ new SimpleColStats(minValues[i], maxValues[i],
nullCounts[i]);
+ result[i] = collectors[i].convert(fieldStats);
+ }
+ }
+ return result;
+ }
+
+ private Set<Integer> resolveStatsFieldIndices(List<String>
statsColumnNames) {
+ Set<Integer> indices = new HashSet<>();
+ List<String> fieldNames = rowType.getFieldNames();
+ for (String name : statsColumnNames) {
+ int idx = fieldNames.indexOf(name);
+ if (idx >= 0) {
+ indices.add(idx);
+ }
+ }
+ return indices;
+ }
+
+ @FunctionalInterface
+ private interface RowGroupStatsProvider {
+ Map<String, ColumnStatistics> getRowGroupStatistics(int rowGroupIndex);
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java
new file mode 100644
index 0000000000..ca67647f8c
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+/** A factory to create Mosaic {@link FormatWriter}. */
+public class MosaicWriterFactory implements FormatWriterFactory {
+
+ private final RowType rowType;
+ private final FileFormatFactory.FormatContext formatContext;
+ private final List<String> statsColumnNames;
+ private final @Nullable Integer numBuckets;
+
+ public MosaicWriterFactory(RowType rowType,
FileFormatFactory.FormatContext formatContext) {
+ this.rowType = rowType;
+ this.formatContext = formatContext;
+ String statsColumnsValue =
formatContext.options().get(MosaicFileFormat.STATS_COLUMNS);
+ if (statsColumnsValue == null || statsColumnsValue.trim().isEmpty()) {
+ this.statsColumnNames = new ArrayList<>();
+ } else {
+ this.statsColumnNames =
+ Arrays.stream(statsColumnsValue.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+ }
+ this.numBuckets =
formatContext.options().get(MosaicFileFormat.NUM_BUCKETS);
+ }
+
+ @Override
+ public FormatWriter create(PositionOutputStream out, String compression) {
+ validateCompression(compression);
+ return new MosaicRecordsWriter(out, rowType, formatContext,
statsColumnNames, numBuckets);
+ }
+
+ private static void validateCompression(String compression) {
+ if (compression == null) {
+ return;
+ }
+ String normalized = compression.toLowerCase(Locale.ROOT);
+ if (!normalized.equals("zstd")) {
+ throw new UnsupportedOperationException(
+ "Mosaic format only supports zstd compression, but got: "
+ compression);
+ }
+ }
+}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java
new file mode 100644
index 0000000000..cd3149fd44
--- /dev/null
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.mosaic.ColumnStatistics;
+
+import java.util.List;
+import java.util.Map;
+
+/** In-memory metadata captured from MosaicWriter after close. */
+public class MosaicWriterMetadata {
+
+ private final int numRowGroups;
+ private final List<Map<String, ColumnStatistics>> rowGroupStats;
+ private final List<String> statsColumnNames;
+
+ public MosaicWriterMetadata(
+ int numRowGroups,
+ List<Map<String, ColumnStatistics>> rowGroupStats,
+ List<String> statsColumnNames) {
+ this.numRowGroups = numRowGroups;
+ this.rowGroupStats = rowGroupStats;
+ this.statsColumnNames = statsColumnNames;
+ }
+
+ public int numRowGroups() {
+ return numRowGroups;
+ }
+
+ public Map<String, ColumnStatistics> getRowGroupStatistics(int
rowGroupIndex) {
+ return rowGroupStats.get(rowGroupIndex);
+ }
+
+ public List<String> statsColumnNames() {
+ return statsColumnNames;
+ }
+}
diff --git
a/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
b/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
new file mode 100644
index 0000000000..bc955c4935
--- /dev/null
+++
b/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.format.mosaic.MosaicFileFormatFactory
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java
new file mode 100644
index 0000000000..8e53164e86
--- /dev/null
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit tests for {@link MosaicFileFormat} and {@link
MosaicFileFormatFactory}. */
+class MosaicFileFormatTest {
+
+ @Test
+ void testFactoryIdentifier() {
+ MosaicFileFormatFactory factory = new MosaicFileFormatFactory();
+ assertThat(factory.identifier()).isEqualTo("mosaic");
+ }
+
+ @Test
+ void testFactoryCreate() {
+ MosaicFileFormatFactory factory = new MosaicFileFormatFactory();
+ FileFormatFactory.FormatContext context =
+ new FileFormatFactory.FormatContext(new Options(), 1024, 1024);
+
assertThat(factory.create(context)).isInstanceOf(MosaicFileFormat.class);
+ }
+
+ @Test
+ void testCreateReaderFactory() {
+ MosaicFileFormat format = createFormat();
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ FormatReaderFactory readerFactory =
+ format.createReaderFactory(rowType, rowType, new
ArrayList<>());
+ assertThat(readerFactory).isInstanceOf(MosaicReaderFactory.class);
+ }
+
+ @Test
+ void testCreateWriterFactory() {
+ MosaicFileFormat format = createFormat();
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ assertThat(writerFactory).isInstanceOf(MosaicWriterFactory.class);
+ }
+
+ @Test
+ void testValidateDataFieldsSupported() {
+ MosaicFileFormat format = createFormat();
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.DOUBLE(),
+ DataTypes.FLOAT(),
+ DataTypes.BOOLEAN(),
+ DataTypes.DATE(),
+ DataTypes.TIMESTAMP(3),
+ DataTypes.DECIMAL(10, 2),
+ DataTypes.BYTES());
+ format.validateDataFields(rowType);
+ }
+
+ @Test
+ void testValidateDataFieldsMapUnsupported() {
+ MosaicFileFormat format = createFormat();
+ RowType rowType = DataTypes.ROW(DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()));
+ assertThatThrownBy(() -> format.validateDataFields(rowType))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("MAP");
+ }
+
+ @Test
+ void testValidateDataFieldsMultisetUnsupported() {
+ MosaicFileFormat format = createFormat();
+ RowType rowType =
DataTypes.ROW(DataTypes.MULTISET(DataTypes.STRING()));
+ assertThatThrownBy(() -> format.validateDataFields(rowType))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("MULTISET");
+ }
+
+ @Test
+ void testCreateStatsExtractor() {
+ MosaicFileFormat format = createFormat();
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ assertThat(
+ format.createStatsExtractor(
+ rowType,
+ new
org.apache.paimon.statistics.SimpleColStatsCollector.Factory[] {
+
org.apache.paimon.statistics.SimpleColStatsCollector.from(
+ "full"),
+
org.apache.paimon.statistics.SimpleColStatsCollector.from(
+ "full")
+ }))
+ .isPresent();
+ }
+
+ private static MosaicFileFormat createFormat() {
+ return new MosaicFileFormat(new FileFormatFactory.FormatContext(new
Options(), 1024, 1024));
+ }
+}
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java
new file mode 100644
index 0000000000..41f632b3ee
--- /dev/null
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Round-trip read/write tests for Mosaic format. */
+class MosaicFormatReadWriteTest extends FormatReadWriteTest {
+
+ MosaicFormatReadWriteTest() {
+ super("mosaic");
+ }
+
+ @BeforeAll
+ static void checkNativeLibrary() {
+ assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+ }
+
+ @Override
+ protected FileFormat fileFormat() {
+ return new MosaicFileFormat(new FileFormatFactory.FormatContext(new
Options(), 1024, 1024));
+ }
+
+ @Override
+ public String compression() {
+ return "zstd";
+ }
+
+ @Override
+ public boolean supportNestedReadPruning() {
+ return false;
+ }
+
+ @Override
+ protected RowType rowTypeForFullTypesTest() {
+ return RowType.builder()
+ .field("f_int", DataTypes.INT().notNull())
+ .field("f_string", DataTypes.STRING())
+ .field("f_double", DataTypes.DOUBLE().notNull())
+ .field("f_boolean", DataTypes.BOOLEAN())
+ .field("f_tinyint", DataTypes.TINYINT())
+ .field("f_smallint", DataTypes.SMALLINT())
+ .field("f_bigint", DataTypes.BIGINT())
+ .field("f_float", DataTypes.FLOAT())
+ .field("f_binary", DataTypes.BYTES())
+ .field("f_date", DataTypes.DATE())
+ .field("f_timestamp3", DataTypes.TIMESTAMP(3))
+ .field("f_timestamp6", DataTypes.TIMESTAMP(6))
+ .field("f_decimal_5_2", DataTypes.DECIMAL(5, 2))
+ .field("f_decimal_20_0", DataTypes.DECIMAL(20, 0))
+ .build();
+ }
+
+ @Override
+ protected GenericRow expectedRowForFullTypesTest() {
+ return GenericRow.of(
+ 42,
+ BinaryString.fromString("hello mosaic"),
+ 3.14d,
+ true,
+ (byte) 7,
+ (short) 256,
+ 9876543210L,
+ 1.5f,
+ new byte[] {1, 2, 3},
+ 18000,
+ Timestamp.fromEpochMillis(1700000000000L),
+ Timestamp.fromMicros(1700000000000000L),
+ Decimal.fromBigDecimal(new BigDecimal("123.45"), 5, 2),
+ Decimal.fromBigDecimal(new BigDecimal("12345678901234567890"),
20, 0));
+ }
+
+ @Override
+ protected void validateFullTypesResult(InternalRow actual, InternalRow
expected) {
+ for (int i = 0; i < 14; i++) {
+ if (expected.isNullAt(i)) {
+ assertThat(actual.isNullAt(i)).isTrue();
+ }
+ }
+ assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0));
+ assertThat(actual.getString(1)).isEqualTo(expected.getString(1));
+ assertThat(actual.getDouble(2)).isEqualTo(expected.getDouble(2));
+ assertThat(actual.getBoolean(3)).isEqualTo(expected.getBoolean(3));
+ assertThat(actual.getByte(4)).isEqualTo(expected.getByte(4));
+ assertThat(actual.getShort(5)).isEqualTo(expected.getShort(5));
+ assertThat(actual.getLong(6)).isEqualTo(expected.getLong(6));
+ assertThat(actual.getFloat(7)).isEqualTo(expected.getFloat(7));
+ assertThat(actual.getBinary(8)).isEqualTo(expected.getBinary(8));
+ assertThat(actual.getInt(9)).isEqualTo(expected.getInt(9));
+ assertThat(actual.getTimestamp(10,
3)).isEqualTo(expected.getTimestamp(10, 3));
+ assertThat(actual.getTimestamp(11,
6)).isEqualTo(expected.getTimestamp(11, 6));
+ assertThat(actual.getDecimal(12, 5,
2)).isEqualTo(expected.getDecimal(12, 5, 2));
+ assertThat(actual.getDecimal(13, 20,
0)).isEqualTo(expected.getDecimal(13, 20, 0));
+ }
+
+ private static boolean isNativeAvailable() {
+ try {
+ Class.forName("org.apache.paimon.mosaic.NativeLib");
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }
+}
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java
new file mode 100644
index 0000000000..e05ed1709c
--- /dev/null
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link MosaicObjects}. */
+class MosaicObjectsTest {
+
+ @Test
+ void testNullBytes() {
+ assertThat(MosaicObjects.convertStatsValue(null,
DataTypes.INT())).isNull();
+ }
+
+ @Test
+ void testEmptyBytes() {
+ assertThat(MosaicObjects.convertStatsValue(new byte[0],
DataTypes.INT())).isNull();
+ }
+
+ @Test
+ void testBoolean() {
+ assertThat(MosaicObjects.convertStatsValue(new byte[] {1},
DataTypes.BOOLEAN()))
+ .isEqualTo(true);
+ assertThat(MosaicObjects.convertStatsValue(new byte[] {0},
DataTypes.BOOLEAN()))
+ .isEqualTo(false);
+ }
+
+ @Test
+ void testTinyInt() {
+ assertThat(MosaicObjects.convertStatsValue(new byte[] {42},
DataTypes.TINYINT()))
+ .isEqualTo((byte) 42);
+ assertThat(MosaicObjects.convertStatsValue(new byte[] {(byte) -1},
DataTypes.TINYINT()))
+ .isEqualTo((byte) -1);
+ }
+
+ @Test
+ void testSmallInt() {
+ byte[] bytes = ByteBuffer.allocate(2).putShort((short) 1234).array();
+ assertThat(MosaicObjects.convertStatsValue(bytes,
DataTypes.SMALLINT()))
+ .isEqualTo((short) 1234);
+ }
+
+ @Test
+ void testInt() {
+ byte[] bytes = ByteBuffer.allocate(4).putInt(123456).array();
+ assertThat(MosaicObjects.convertStatsValue(bytes,
DataTypes.INT())).isEqualTo(123456);
+ }
+
+ @Test
+ void testIntNegative() {
+ byte[] bytes = ByteBuffer.allocate(4).putInt(-999).array();
+ assertThat(MosaicObjects.convertStatsValue(bytes,
DataTypes.INT())).isEqualTo(-999);
+ }
+
+ @Test
+ void testBigInt() {
+ byte[] bytes = ByteBuffer.allocate(8).putLong(9876543210L).array();
+ assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.BIGINT()))
+ .isEqualTo(9876543210L);
+ }
+
+ @Test
+ void testFloat() {
+ byte[] bytes = ByteBuffer.allocate(4).putFloat(3.14f).array();
+ assertThat(MosaicObjects.convertStatsValue(bytes,
DataTypes.FLOAT())).isEqualTo(3.14f);
+ }
+
+ @Test
+ void testDouble() {
+ byte[] bytes = ByteBuffer.allocate(8).putDouble(2.718281828).array();
+ assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.DOUBLE()))
+ .isEqualTo(2.718281828);
+ }
+
+ @Test
+ void testVarChar() {
+ byte[] bytes = "hello".getBytes();
+ assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.STRING()))
+ .isEqualTo(BinaryString.fromString("hello"));
+ }
+
+ @Test
+ void testBinary() {
+ byte[] bytes = new byte[] {1, 2, 3, 4, 5};
+ assertThat(MosaicObjects.convertStatsValue(bytes,
DataTypes.BYTES())).isEqualTo(bytes);
+ }
+
+ @Test
+ void testDate() {
+ byte[] bytes = ByteBuffer.allocate(4).putInt(18000).array();
+ assertThat(MosaicObjects.convertStatsValue(bytes,
DataTypes.DATE())).isEqualTo(18000);
+ }
+
+ @Test
+ void testTimestampMillis() {
+ long millis = 1700000000000L;
+ byte[] bytes = ByteBuffer.allocate(8).putLong(millis).array();
+ Object result = MosaicObjects.convertStatsValue(bytes,
DataTypes.TIMESTAMP(3));
+ assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis));
+ }
+
+ @Test
+ void testTimestampMicros() {
+ long micros = 1700000000000000L;
+ byte[] bytes = ByteBuffer.allocate(8).putLong(micros).array();
+ Object result = MosaicObjects.convertStatsValue(bytes,
DataTypes.TIMESTAMP(6));
+ assertThat(result).isEqualTo(Timestamp.fromMicros(micros));
+ }
+
+ @Test
+ void testDecimal() {
+ // 1000 in big-endian two's complement = 0x03E8
+ byte[] beBytes = new byte[] {0x03, (byte) 0xE8};
+ Object result = MosaicObjects.convertStatsValue(beBytes,
DataTypes.DECIMAL(10, 2));
+ assertThat(result).isInstanceOf(Decimal.class);
+ Decimal decimal = (Decimal) result;
+ assertThat(decimal.toBigDecimal().intValue()).isEqualTo(10);
+ }
+
+ @Test
+ void testTimestampNanos() {
+ long millis = 1700000000123L;
+ int nanosOfMilli = 456789;
+ byte[] bytes =
ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array();
+ Object result = MosaicObjects.convertStatsValue(bytes,
DataTypes.TIMESTAMP(9));
+ assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis,
nanosOfMilli));
+ }
+
+ @Test
+ void testTimestampNanosPrecision7() {
+ long millis = 1700000000000L;
+ int nanosOfMilli = 100000;
+ byte[] bytes =
ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array();
+ Object result = MosaicObjects.convertStatsValue(bytes,
DataTypes.TIMESTAMP(7));
+ assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis,
nanosOfMilli));
+ }
+
+ @Test
+ void testTimestampWithLocalTimeZoneMillis() {
+ long millis = 1700000000000L;
+ byte[] bytes = ByteBuffer.allocate(8).putLong(millis).array();
+ Object result =
+ MosaicObjects.convertStatsValue(bytes,
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3));
+ assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis));
+ }
+
+ @Test
+ void testTimestampWithLocalTimeZoneMicros() {
+ long micros = 1700000000000000L;
+ byte[] bytes = ByteBuffer.allocate(8).putLong(micros).array();
+ Object result =
+ MosaicObjects.convertStatsValue(bytes,
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6));
+ assertThat(result).isEqualTo(Timestamp.fromMicros(micros));
+ }
+
+ @Test
+ void testTimestampWithLocalTimeZoneNanos() {
+ long millis = 1700000000123L;
+ int nanosOfMilli = 456789;
+ byte[] bytes =
ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array();
+ Object result =
+ MosaicObjects.convertStatsValue(bytes,
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9));
+ assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis,
nanosOfMilli));
+ }
+
+ @Test
+ void testEmptyStringVarChar() {
+ Object result = MosaicObjects.convertStatsValue(new byte[0],
DataTypes.STRING());
+ assertThat(result).isEqualTo(BinaryString.fromString(""));
+ }
+
+ @Test
+ void testEmptyBinary() {
+ Object result = MosaicObjects.convertStatsValue(new byte[0],
DataTypes.BYTES());
+ assertThat(result).isEqualTo(new byte[0]);
+ }
+
+ @Test
+ void testUnsupportedTypeReturnsNull() {
+ byte[] bytes = new byte[] {1, 2, 3};
+ assertThat(MosaicObjects.convertStatsValue(bytes,
DataTypes.ARRAY(DataTypes.INT())))
+ .isNull();
+ }
+}
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java
new file mode 100644
index 0000000000..60efceed08
--- /dev/null
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Integration tests for Mosaic reader and writer. */
+class MosaicReaderWriterTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @BeforeAll
+ static void checkNativeLibrary() {
+ assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+ }
+
+ @Test
+ void testWriteAndRead() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+
+ writeRows(
+ rowType,
+ path,
+ GenericRow.of(1, BinaryString.fromString("hello")),
+ GenericRow.of(2, BinaryString.fromString("world")));
+
+ List<InternalRow> result = readAll(rowType, rowType, path, null);
+ assertThat(result).hasSize(2);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(0).getString(1).toString()).isEqualTo("hello");
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).getString(1).toString()).isEqualTo("world");
+ }
+
+ @Test
+ void testNullValues() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+
+ writeRows(
+ rowType,
+ path,
+ GenericRow.of(1, null),
+ GenericRow.of(null, BinaryString.fromString("test")),
+ GenericRow.of(null, null));
+
+ List<InternalRow> result = readAll(rowType, rowType, path, null);
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).isNullAt(1)).isTrue();
+ assertThat(result.get(1).isNullAt(0)).isTrue();
+ assertThat(result.get(2).isNullAt(0)).isTrue();
+ assertThat(result.get(2).isNullAt(1)).isTrue();
+ }
+
+ @Test
+ void testColumnProjection() throws IOException {
+ RowType writeType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .field("f_double", DataTypes.DOUBLE())
+ .build();
+ RowType readType = RowType.builder().field("f_string",
DataTypes.STRING()).build();
+ Path path = newPath();
+
+ writeRows(
+ writeType,
+ path,
+ GenericRow.of(1, BinaryString.fromString("aaa"), 1.1),
+ GenericRow.of(2, BinaryString.fromString("bbb"), 2.2));
+
+ List<InternalRow> result = readAll(writeType, readType, path, null);
+ assertThat(result).hasSize(2);
+ assertThat(result.get(0).getString(0).toString()).isEqualTo("aaa");
+ assertThat(result.get(1).getString(0).toString()).isEqualTo("bbb");
+ }
+
+ @Test
+ void testLargeDataset() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+
+ int numRows = 10000;
+ GenericRow[] rows = new GenericRow[numRows];
+ for (int i = 0; i < numRows; i++) {
+ rows[i] = GenericRow.of(i, BinaryString.fromString("row" + i));
+ }
+ writeRows(rowType, path, rows);
+
+ List<InternalRow> result = readAll(rowType, rowType, path, null);
+ assertThat(result).hasSize(numRows);
+ assertThat(result.get(0).getInt(0)).isEqualTo(0);
+ assertThat(result.get(numRows - 1).getInt(0)).isEqualTo(numRows - 1);
+ }
+
+ @Test
+ void testRowGroupPredicateFiltering() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .build();
+ Path path = newPath();
+
+ int numRows = 10000;
+ GenericRow[] rows = new GenericRow[numRows];
+ for (int i = 0; i < numRows; i++) {
+ rows[i] = GenericRow.of(i, BinaryString.fromString("v" + i));
+ }
+ writeRows(rowType, path, "f_int", rows);
+
+ // Predicate that cannot match any row group (all values are 0..9999)
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = builder.greaterThan(0, 99999);
+ List<InternalRow> result =
+ readAll(rowType, rowType, path,
Collections.singletonList(predicate));
+ assertThat(result).isEmpty();
+
+ // Predicate that matches the row group (values include range 0..9999)
+ Predicate matchPredicate = builder.greaterThan(0, 5000);
+ List<InternalRow> matchResult =
+ readAll(rowType, rowType, path,
Collections.singletonList(matchPredicate));
+ assertThat(matchResult).hasSize(numRows);
+ }
+
+ @Test
+ void testReturnedPosition() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+
+ writeRows(
+ rowType,
+ path,
+ GenericRow.of(1, BinaryString.fromString("a")),
+ GenericRow.of(2, BinaryString.fromString("b")),
+ GenericRow.of(3, BinaryString.fromString("c")));
+
+ MosaicFileFormat format = createFormat();
+ FormatReaderFactory readerFactory =
format.createReaderFactory(rowType, rowType, null);
+ LocalFileIO fileIO = new LocalFileIO();
+ RecordReader<InternalRow> reader =
+ readerFactory.createReader(
+ new FormatReaderContext(fileIO, path,
fileIO.getFileSize(path)));
+
+ RecordReader.RecordIterator<InternalRow> batch = reader.readBatch();
+ assertThat(batch).isNotNull();
+ FileRecordIterator<InternalRow> fileIter =
(FileRecordIterator<InternalRow>) batch;
+
+ fileIter.next();
+ assertThat(fileIter.returnedPosition()).isEqualTo(0);
+ fileIter.next();
+ assertThat(fileIter.returnedPosition()).isEqualTo(1);
+ fileIter.next();
+ assertThat(fileIter.returnedPosition()).isEqualTo(2);
+
+ reader.close();
+ }
+
+ @Test
+ void testProjectionWithMissingColumns() throws IOException {
+ RowType writeType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .build();
+ // Read type has a column that doesn't exist in the file (schema
evolution)
+ RowType readType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_new_col", DataTypes.BIGINT())
+ .field("f_string", DataTypes.STRING())
+ .build();
+ Path path = newPath();
+
+ writeRows(
+ writeType,
+ path,
+ GenericRow.of(1, BinaryString.fromString("aaa")),
+ GenericRow.of(2, BinaryString.fromString("bbb")));
+
+ List<InternalRow> result = readAll(writeType, readType, path, null);
+ assertThat(result).hasSize(2);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(0).isNullAt(1)).isTrue();
+ assertThat(result.get(0).getString(2).toString()).isEqualTo("aaa");
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).isNullAt(1)).isTrue();
+ assertThat(result.get(1).getString(2).toString()).isEqualTo("bbb");
+ }
+
+ @Test
+ void testProjectionAllColumnsMissing() throws IOException {
+ RowType writeType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .build();
+ // Read type has only columns that don't exist in the file
+ RowType readType =
+ RowType.builder()
+ .field("f_new_a", DataTypes.INT())
+ .field("f_new_b", DataTypes.STRING())
+ .build();
+ Path path = newPath();
+
+ writeRows(
+ writeType,
+ path,
+ GenericRow.of(1, BinaryString.fromString("x")),
+ GenericRow.of(2, BinaryString.fromString("y")));
+
+ List<InternalRow> result = readAll(writeType, readType, path, null);
+ assertThat(result).hasSize(2);
+ assertThat(result.get(0).isNullAt(0)).isTrue();
+ assertThat(result.get(0).isNullAt(1)).isTrue();
+ assertThat(result.get(1).isNullAt(0)).isTrue();
+ assertThat(result.get(1).isNullAt(1)).isTrue();
+ }
+
+ @Test
+ void testUnsupportedCompressionThrows() {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+ MosaicFileFormat format = createFormat();
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ LocalFileIO fileIO = new LocalFileIO();
+
+ assertThatThrownBy(() ->
writerFactory.create(fileIO.newOutputStream(path, false), "lz4"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("lz4");
+ }
+
+ @Test
+ void testReachTargetSize() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+ MosaicFileFormat format = createFormat();
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+
+ LocalFileIO fileIO = new LocalFileIO();
+ FormatWriter writer =
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+
+ boolean reached = false;
+ for (int i = 0; i < 100000; i++) {
+ writer.addElement(GenericRow.of(i,
BinaryString.fromString("value_" + i + "_padding")));
+ if (writer.reachTargetSize(true, 1024)) {
+ reached = true;
+ break;
+ }
+ }
+ writer.close();
+ assertThat(reached).isTrue();
+ }
+
+ private Path newPath() {
+ return new Path(tempDir.toUri().toString(), UUID.randomUUID() +
".mosaic");
+ }
+
+ private void writeRows(RowType rowType, Path path, GenericRow... rows)
throws IOException {
+ writeRows(rowType, path, "", rows);
+ }
+
+ private void writeRows(RowType rowType, Path path, String statsColumns,
GenericRow... rows)
+ throws IOException {
+ MosaicFileFormat format = createFormat(statsColumns);
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ LocalFileIO fileIO = new LocalFileIO();
+ FormatWriter writer =
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+ for (GenericRow row : rows) {
+ writer.addElement(row);
+ }
+ writer.close();
+ }
+
+ private List<InternalRow> readAll(
+ RowType dataType, RowType readType, Path path, List<Predicate>
predicates)
+ throws IOException {
+ MosaicFileFormat format = createFormat();
+ FormatReaderFactory readerFactory =
+ format.createReaderFactory(dataType, readType, predicates);
+ LocalFileIO fileIO = new LocalFileIO();
+ RecordReader<InternalRow> reader =
+ readerFactory.createReader(
+ new FormatReaderContext(fileIO, path,
fileIO.getFileSize(path)));
+
+ InternalRowSerializer serializer = new InternalRowSerializer(readType);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+ reader.close();
+ return result;
+ }
+
+ private static MosaicFileFormat createFormat() {
+ return createFormat("");
+ }
+
+ private static MosaicFileFormat createFormat(String statsColumns) {
+ Options options = new Options();
+ if (!statsColumns.isEmpty()) {
+ options.set(MosaicFileFormat.STATS_COLUMNS, statsColumns);
+ }
+ return new MosaicFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+ }
+
+ private static boolean isNativeAvailable() {
+ try {
+ Class.forName("org.apache.paimon.mosaic.NativeLib");
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }
+}
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java
new file mode 100644
index 0000000000..8477c5b065
--- /dev/null
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleColStatsExtractorTest;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Tests for {@link MosaicSimpleStatsExtractor}. */
+class MosaicSimpleStatsExtractorTest extends SimpleColStatsExtractorTest {
+
+ @TempDir java.nio.file.Path statsTestTempDir;
+
+ @BeforeAll
+ static void checkNativeLibrary() {
+ assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+ }
+
+ @Override
+ protected FileFormat createFormat() {
+ Options options = new Options();
+ options.set(
+ MosaicFileFormat.STATS_COLUMNS,
+ "f_boolean,f_tinyint,f_smallint,f_int,f_bigint,f_float,"
+ +
"f_double,f_string,f_decimal_5_2,f_date,f_timestamp3,f_timestamp6");
+ return new MosaicFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+ }
+
+ @Override
+ protected RowType rowType() {
+ return RowType.builder()
+ .field("f_boolean", DataTypes.BOOLEAN())
+ .field("f_tinyint", DataTypes.TINYINT())
+ .field("f_smallint", DataTypes.SMALLINT())
+ .field("f_int", DataTypes.INT())
+ .field("f_bigint", DataTypes.BIGINT())
+ .field("f_float", DataTypes.FLOAT())
+ .field("f_double", DataTypes.DOUBLE())
+ .field("f_string", DataTypes.VARCHAR(100))
+ .field("f_decimal_5_2", DataTypes.DECIMAL(5, 2))
+ .field("f_date", DataTypes.DATE())
+ .field("f_timestamp3", DataTypes.TIMESTAMP(3))
+ .field("f_timestamp6", DataTypes.TIMESTAMP(6))
+ .build();
+ }
+
+ @Override
+ protected String fileCompression() {
+ return "zstd";
+ }
+
+ @Test
+ void testUntrackedColumnsReturnNone() throws IOException {
+ // stats_columns only tracks f_int, but the table has f_int + f_string
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .build();
+ Options options = new Options();
+ options.set(MosaicFileFormat.STATS_COLUMNS, "f_int");
+ MosaicFileFormat format =
+ new MosaicFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ Path path = new Path(statsTestTempDir.toUri().toString(),
UUID.randomUUID() + ".mosaic");
+ LocalFileIO fileIO = new LocalFileIO();
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ FormatWriter writer =
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+ writer.addElement(GenericRow.of(1, BinaryString.fromString("a")));
+ writer.addElement(GenericRow.of(2, BinaryString.fromString("b")));
+ writer.close();
+
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ SimpleColStats[] stats = extractor.extract(fileIO, path,
fileIO.getFileSize(path));
+
+ // f_int is tracked, should have real stats
+ assertThat(stats[0].min()).isEqualTo(1);
+ assertThat(stats[0].max()).isEqualTo(2);
+ assertThat(stats[0].nullCount()).isEqualTo(0L);
+ // f_string is NOT tracked, should be NONE (null nullCount)
+ assertThat(stats[1].min()).isNull();
+ assertThat(stats[1].max()).isNull();
+ assertThat(stats[1].nullCount()).isNull();
+ }
+
+ @Test
+ void testBinaryColumnStatsNoException() throws Exception {
+ // Binary columns produce byte[] from convertStatsValue, which is not
Comparable.
+ // Verify multi-row-group aggregation doesn't throw ClassCastException.
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_binary", DataTypes.VARBINARY(100))
+ .build();
+ // Build a fake MosaicWriterMetadata with binary stats across 2 row
groups
+ java.lang.reflect.Constructor<?> ctor =
+
org.apache.paimon.mosaic.ColumnStatistics.class.getDeclaredConstructor(
+ long.class, byte[].class, byte[].class);
+ ctor.setAccessible(true);
+
+ java.util.Map<String, org.apache.paimon.mosaic.ColumnStatistics> rg0 =
+ new java.util.HashMap<>();
+ rg0.put(
+ "f_int",
+ (org.apache.paimon.mosaic.ColumnStatistics)
+ ctor.newInstance(0L, intBytes(0), intBytes(100)));
+ rg0.put(
+ "f_binary",
+ (org.apache.paimon.mosaic.ColumnStatistics)
+ ctor.newInstance(0L, new byte[] {1, 2}, new byte[] {3,
4}));
+
+ java.util.Map<String, org.apache.paimon.mosaic.ColumnStatistics> rg1 =
+ new java.util.HashMap<>();
+ rg1.put(
+ "f_int",
+ (org.apache.paimon.mosaic.ColumnStatistics)
+ ctor.newInstance(0L, intBytes(50), intBytes(200)));
+ rg1.put(
+ "f_binary",
+ (org.apache.paimon.mosaic.ColumnStatistics)
+ ctor.newInstance(0L, new byte[] {5, 6}, new byte[] {7,
8}));
+
+ java.util.List<java.util.Map<String,
org.apache.paimon.mosaic.ColumnStatistics>> allStats =
+ java.util.Arrays.asList(rg0, rg1);
+ MosaicWriterMetadata metadata =
+ new MosaicWriterMetadata(2, allStats,
java.util.Arrays.asList("f_int", "f_binary"));
+
+ // Write a minimal file (only f_int in stats_columns since native
rejects binary)
+ Options options = new Options();
+ options.set(MosaicFileFormat.STATS_COLUMNS, "f_int");
+ MosaicFileFormat format =
+ new MosaicFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+ Path path = new Path(statsTestTempDir.toUri().toString(),
UUID.randomUUID() + ".mosaic");
+ LocalFileIO fileIO = new LocalFileIO();
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ FormatWriter writer =
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+ writer.addElement(GenericRow.of(1, new byte[] {1}));
+ writer.close();
+
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ // Should not throw ClassCastException
+ SimpleColStats[] stats =
+ extractor.extract(fileIO, path, fileIO.getFileSize(path),
metadata);
+
+ // f_int aggregated across row groups: min=0, max=200
+ assertThat(stats[0].min()).isEqualTo(0);
+ assertThat(stats[0].max()).isEqualTo(200);
+ // f_binary min/max should be null (byte[] not Comparable, skipped)
+ assertThat(stats[1].min()).isNull();
+ assertThat(stats[1].max()).isNull();
+ }
+
+ private static byte[] intBytes(int value) {
+ return java.nio.ByteBuffer.allocate(4).putInt(value).array();
+ }
+
+ private static boolean isNativeAvailable() {
+ try {
+ Class.forName("org.apache.paimon.mosaic.NativeLib");
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }
+}
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java
new file mode 100644
index 0000000000..4caf65d661
--- /dev/null
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SimpleColStats;
+import org.apache.paimon.format.SimpleStatsExtractor;
+import org.apache.paimon.format.SimpleStatsExtractor.FileInfo;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Tests for writer metadata based stats extraction in Mosaic format. */
+class MosaicWriterMetadataTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @BeforeAll
+ static void checkNativeLibrary() {
+ assumeTrue(isNativeAvailable(), "Mosaic native library not available");
+ }
+
+ @Test
+ void testWriterMetadataNotNull() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+
+ FormatWriter writer = createWriter(rowType, path, "f0,f1");
+ writer.addElement(GenericRow.of(1, BinaryString.fromString("hello")));
+ writer.addElement(GenericRow.of(2, BinaryString.fromString("world")));
+ writer.close();
+
+ Object metadata = writer.writerMetadata();
+ assertThat(metadata).isNotNull();
+ assertThat(metadata).isInstanceOf(MosaicWriterMetadata.class);
+
+ MosaicWriterMetadata mosaicMeta = (MosaicWriterMetadata) metadata;
+ assertThat(mosaicMeta.numRowGroups()).isGreaterThan(0);
+ }
+
+ @Test
+ void testStatsFromMetadataMatchesStatsFromFile() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_bigint", DataTypes.BIGINT())
+ .field("f_string", DataTypes.STRING())
+ .field("f_double", DataTypes.DOUBLE())
+ .build();
+ Path path = newPath();
+ String statsColumns = "f_int,f_bigint,f_string,f_double";
+
+ FormatWriter writer = createWriter(rowType, path, statsColumns);
+ for (int i = 0; i < 1000; i++) {
+ writer.addElement(
+ GenericRow.of(i, (long) i * 100,
BinaryString.fromString("val_" + i), i * 1.1));
+ }
+ writer.close();
+
+ Object metadata = writer.writerMetadata();
+ assertThat(metadata).isNotNull();
+
+ MosaicFileFormat format = createFormat(statsColumns);
+ int fieldCount = rowType.getFieldCount();
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, fieldCount)
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ LocalFileIO fileIO = new LocalFileIO();
+ long fileSize = fileIO.getFileSize(path);
+
+ SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize);
+ SimpleColStats[] fromMetadata = extractor.extract(fileIO, path,
fileSize, metadata);
+
+ assertThat(fromMetadata).isEqualTo(fromFile);
+ }
+
+ @Test
+ void testStatsFromMetadataWithNullValues() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .build();
+ Path path = newPath();
+ String statsColumns = "f_int,f_string";
+
+ FormatWriter writer = createWriter(rowType, path, statsColumns);
+ writer.addElement(GenericRow.of(1, null));
+ writer.addElement(GenericRow.of(null, BinaryString.fromString("a")));
+ writer.addElement(GenericRow.of(3, BinaryString.fromString("b")));
+ writer.close();
+
+ Object metadata = writer.writerMetadata();
+ MosaicFileFormat format = createFormat(statsColumns);
+ int fieldCount = rowType.getFieldCount();
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, fieldCount)
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ LocalFileIO fileIO = new LocalFileIO();
+ long fileSize = fileIO.getFileSize(path);
+
+ SimpleColStats[] fromMetadata = extractor.extract(fileIO, path,
fileSize, metadata);
+ assertThat(fromMetadata).isNotNull();
+ assertThat(fromMetadata[0].nullCount()).isEqualTo(1L);
+ assertThat(fromMetadata[1].nullCount()).isEqualTo(1L);
+ }
+
+ @Test
+ void testExtractWithFileInfoRowCount() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .build();
+ Path path = newPath();
+ String statsColumns = "f_int,f_string";
+
+ int numRows = 500;
+ FormatWriter writer = createWriter(rowType, path, statsColumns);
+ for (int i = 0; i < numRows; i++) {
+ writer.addElement(GenericRow.of(i, BinaryString.fromString("row_"
+ i)));
+ }
+ writer.close();
+
+ MosaicFileFormat format = createFormat(statsColumns);
+ int fieldCount = rowType.getFieldCount();
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, fieldCount)
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ LocalFileIO fileIO = new LocalFileIO();
+ long fileSize = fileIO.getFileSize(path);
+
+ Pair<SimpleColStats[], FileInfo> result =
+ extractor.extractWithFileInfo(fileIO, path, fileSize);
+ assertThat(result.getRight().getRowCount()).isEqualTo(numRows);
+ assertThat(result.getLeft()).isNotNull();
+ assertThat(result.getLeft()).hasSize(fieldCount);
+ }
+
+ @Test
+ void testPartialStatsColumnsFromMetadata() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .field("f_double", DataTypes.DOUBLE())
+ .build();
+ Path path = newPath();
+ String statsColumns = "f_int";
+
+ FormatWriter writer = createWriter(rowType, path, statsColumns);
+ writer.addElement(GenericRow.of(1, BinaryString.fromString("a"), 1.0));
+ writer.addElement(GenericRow.of(null, BinaryString.fromString("b"),
2.0));
+ writer.addElement(GenericRow.of(3, null, null));
+ writer.close();
+
+ Object metadata = writer.writerMetadata();
+ assertThat(metadata).isInstanceOf(MosaicWriterMetadata.class);
+ MosaicWriterMetadata mosaicMeta = (MosaicWriterMetadata) metadata;
+ assertThat(mosaicMeta.statsColumnNames()).containsExactly("f_int");
+
+ MosaicFileFormat format = createFormat(statsColumns);
+ int fieldCount = rowType.getFieldCount();
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, fieldCount)
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ LocalFileIO fileIO = new LocalFileIO();
+ long fileSize = fileIO.getFileSize(path);
+
+ SimpleColStats[] fromMetadata = extractor.extract(fileIO, path,
fileSize, metadata);
+
+ // f_int has stats: min=1, max=3, nullCount=1
+ assertThat(fromMetadata[0].min()).isEqualTo(1);
+ assertThat(fromMetadata[0].max()).isEqualTo(3);
+ assertThat(fromMetadata[0].nullCount()).isEqualTo(1L);
+
+ // f_string and f_double have no stats (not in statsColumns)
+ assertThat(fromMetadata[1].min()).isNull();
+ assertThat(fromMetadata[1].max()).isNull();
+ assertThat(fromMetadata[1].nullCount()).isNull();
+ assertThat(fromMetadata[2].min()).isNull();
+ assertThat(fromMetadata[2].max()).isNull();
+ assertThat(fromMetadata[2].nullCount()).isNull();
+ }
+
+ @Test
+ void testStatsOnMiddleColumn() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .field("f_double", DataTypes.DOUBLE())
+ .build();
+ Path path = newPath();
+ String statsColumns = "f_string";
+
+ FormatWriter writer = createWriter(rowType, path, statsColumns);
+ writer.addElement(GenericRow.of(1, BinaryString.fromString("banana"),
1.0));
+ writer.addElement(GenericRow.of(2, BinaryString.fromString("apple"),
2.0));
+ writer.addElement(GenericRow.of(3, null, 3.0));
+ writer.close();
+
+ Object metadata = writer.writerMetadata();
+ MosaicFileFormat format = createFormat(statsColumns);
+ int fieldCount = rowType.getFieldCount();
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, fieldCount)
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ LocalFileIO fileIO = new LocalFileIO();
+ long fileSize = fileIO.getFileSize(path);
+
+ SimpleColStats[] fromMetadata = extractor.extract(fileIO, path,
fileSize, metadata);
+
+ // f_int has no stats
+ assertThat(fromMetadata[0].min()).isNull();
+ assertThat(fromMetadata[0].max()).isNull();
+ assertThat(fromMetadata[0].nullCount()).isNull();
+
+ // f_string has stats: min="apple", max="banana", nullCount=1
+
assertThat(fromMetadata[1].min()).isEqualTo(BinaryString.fromString("apple"));
+
assertThat(fromMetadata[1].max()).isEqualTo(BinaryString.fromString("banana"));
+ assertThat(fromMetadata[1].nullCount()).isEqualTo(1L);
+
+ // f_double has no stats
+ assertThat(fromMetadata[2].min()).isNull();
+ assertThat(fromMetadata[2].max()).isNull();
+ assertThat(fromMetadata[2].nullCount()).isNull();
+ }
+
+ @Test
+ void testPartialStatsColumnsFromFile() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("f_int", DataTypes.INT())
+ .field("f_string", DataTypes.STRING())
+ .field("f_double", DataTypes.DOUBLE())
+ .build();
+ Path path = newPath();
+ String statsColumns = "f_string";
+
+ FormatWriter writer = createWriter(rowType, path, statsColumns);
+ writer.addElement(GenericRow.of(1, BinaryString.fromString("banana"),
1.0));
+ writer.addElement(GenericRow.of(2, BinaryString.fromString("apple"),
2.0));
+ writer.addElement(GenericRow.of(3, null, 3.0));
+ writer.close();
+
+ // Extract from file (no writer metadata), simulating fallback path
+ MosaicFileFormat format = createFormat(statsColumns);
+ int fieldCount = rowType.getFieldCount();
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, fieldCount)
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ LocalFileIO fileIO = new LocalFileIO();
+ long fileSize = fileIO.getFileSize(path);
+
+ SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize);
+
+ // f_int has no stats in file
+ assertThat(fromFile[0].min()).isNull();
+ assertThat(fromFile[0].max()).isNull();
+ assertThat(fromFile[0].nullCount()).isNull();
+
+ // f_string has stats
+
assertThat(fromFile[1].min()).isEqualTo(BinaryString.fromString("apple"));
+
assertThat(fromFile[1].max()).isEqualTo(BinaryString.fromString("banana"));
+ assertThat(fromFile[1].nullCount()).isEqualTo(1L);
+
+ // f_double has no stats in file
+ assertThat(fromFile[2].min()).isNull();
+ assertThat(fromFile[2].max()).isNull();
+ assertThat(fromFile[2].nullCount()).isNull();
+ }
+
+ @Test
+ void testFallbackToFileWhenMetadataIsNull() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ Path path = newPath();
+ String statsColumns = "f0,f1";
+
+ FormatWriter writer = createWriter(rowType, path, statsColumns);
+ writer.addElement(GenericRow.of(10, BinaryString.fromString("test")));
+ writer.close();
+
+ MosaicFileFormat format = createFormat(statsColumns);
+ int fieldCount = rowType.getFieldCount();
+ SimpleColStatsCollector.Factory[] collectors =
+ IntStream.range(0, fieldCount)
+ .mapToObj(i -> SimpleColStatsCollector.from("full"))
+ .toArray(SimpleColStatsCollector.Factory[]::new);
+
+ SimpleStatsExtractor extractor = format.createStatsExtractor(rowType,
collectors).get();
+ LocalFileIO fileIO = new LocalFileIO();
+ long fileSize = fileIO.getFileSize(path);
+
+ SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize);
+ SimpleColStats[] fromNull = extractor.extract(fileIO, path, fileSize,
null);
+
+ assertThat(fromNull).isEqualTo(fromFile);
+ }
+
+ private Path newPath() {
+ return new Path(tempDir.toUri().toString(), UUID.randomUUID() +
".mosaic");
+ }
+
+ private FormatWriter createWriter(RowType rowType, Path path, String
statsColumns)
+ throws IOException {
+ MosaicFileFormat format = createFormat(statsColumns);
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ LocalFileIO fileIO = new LocalFileIO();
+ return writerFactory.create(fileIO.newOutputStream(path, false),
"zstd");
+ }
+
+ private static MosaicFileFormat createFormat() {
+ return createFormat("");
+ }
+
+ private static MosaicFileFormat createFormat(String statsColumns) {
+ Options options = new Options();
+ if (!statsColumns.isEmpty()) {
+ options.set(MosaicFileFormat.STATS_COLUMNS, statsColumns);
+ }
+ return new MosaicFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+ }
+
+ private static boolean isNativeAvailable() {
+ try {
+ Class.forName("org.apache.paimon.mosaic.NativeLib");
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 5a336fb76c..d2c02ae4d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,7 @@ under the License.
<module>paimon-api</module>
<module>paimon-lumina</module>
<module>paimon-vortex</module>
+ <module>paimon-mosaic</module>
<module>paimon-tantivy</module>
</modules>