This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a31504aaf [python] Integrate paimon-mosaic format into PyPaimon 
(#8098)
9a31504aaf is described below

commit 9a31504aaf8e4ee293aee80c65a403bf586a10ca
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 3 16:45:51 2026 +0800

    [python] Integrate paimon-mosaic format into PyPaimon (#8098)
---
 .github/workflows/paimon-python-checks.yml         |   1 +
 docs/docs/concepts/spec/fileformat.md              |  58 +++-
 docs/docs/pypaimon/index.md                        |  15 +
 .../pypaimon/common/options/core_options.py        |   1 +
 .../pypaimon/filesystem/caching_file_io.py         |   3 +
 .../pypaimon/filesystem/hdfs_native_file_io.py     |   9 +
 paimon-python/pypaimon/filesystem/local_file_io.py |  10 +
 .../pypaimon/filesystem/pyarrow_file_io.py         |   9 +
 .../pypaimon/read/reader/format_mosaic_reader.py   | 140 ++++++++++
 paimon-python/pypaimon/read/split_read.py          |  10 +-
 .../tests/test_format_mosaic_reader_writer.py      | 304 +++++++++++++++++++++
 .../pypaimon/tests/test_format_mosaic_table.py     | 193 +++++++++++++
 .../pypaimon/write/writer/data_vector_writer.py    |   2 +
 paimon-python/pypaimon/write/writer/data_writer.py |   2 +
 .../write/writer/dedicated_format_writer.py        |   2 +
 paimon-python/setup.py                             |   3 +
 16 files changed, 760 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 6a88767590..5b918f5829 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -138,6 +138,7 @@ jobs:
             if python -c "import sys; sys.exit(0 if sys.version_info >= (3, 
11) else 1)"; then
               python -m pip install vortex-data==0.70.0
             fi
+            python -m pip install 'paimon-mosaic>=0.1.0'
           fi
           df -h
 
diff --git a/docs/docs/concepts/spec/fileformat.md 
b/docs/docs/concepts/spec/fileformat.md
index c458efe448..39092054a0 100644
--- a/docs/docs/concepts/spec/fileformat.md
+++ b/docs/docs/concepts/spec/fileformat.md
@@ -24,9 +24,11 @@ under the License.
 
 # File Format
 
-Currently, supports Parquet, Avro, ORC, CSV, JSON, Lance, and Row file formats.
+Currently, supports Parquet, Avro, ORC, CSV, JSON, Lance, Vortex, Mosaic, and 
Row file formats.
 - Recommended column format is Parquet, which has a high compression rate and 
fast column projection queries.
 - Recommended row based format is Avro, which has good performance on reading 
and writing full row (all columns).
+- Recommended format for wide tables is 
[Mosaic](https://paimon.apache.org/docs/mosaic/), a columnar-bucket hybrid 
format with column bucketing for parallel I/O.
+- Recommended columnar format for point lookups is 
[Vortex](https://github.com/spiraldb/vortex), which uses adaptive encoding for 
excellent point-query performance and efficient vector data compression.
 - Recommended format for row-number based O(1) lookups is Row, which stores 
data in row-oriented blocks with ZSTD compression and supports fast random 
access by row number.
 - Recommended testing format is CSV, which has better readability but the 
worst read-write performance.
 - Recommended format for ML workloads is Lance, which is optimized for vector 
search and machine learning use cases.
@@ -755,6 +757,60 @@ Limitations:
 1. Lance file format does not support `MAP` type.
 2. Lance file format does not support `TIMESTAMP_LOCAL_ZONE` type.
 
+## VORTEX
+
+[Vortex](https://github.com/spiraldb/vortex) is a columnar file format that 
uses adaptive, data-dependent encodings to achieve high compression ratios 
while maintaining fast scan performance. It supports native predicate pushdown 
and efficient column projection.
+
+Key features:
+- **Adaptive Encoding**: Automatically selects the best encoding per column 
based on data distribution
+- **Native Predicate Pushdown**: Supports filter expressions pushed down to 
the scan layer
+- **Column Projection**: Only reads requested columns from disk
+
+Limitations:
+1. Vortex does not support `MAP` or `MULTISET` types.
+
+## MOSAIC
+
+[Mosaic](https://paimon.apache.org/docs/mosaic/) is a columnar-bucket hybrid 
format optimized for wide tables. It groups columns into buckets and compresses 
each bucket independently with ZSTD, enabling efficient column projection that 
only reads the buckets containing requested columns.
+
+Key features:
+- **Column Bucketing**: Columns are grouped into configurable buckets for 
parallel I/O, significantly reducing read amplification on wide tables
+- **Row Group Statistics**: Per-row-group min/max/null_count statistics enable 
row group skipping during scan
+- **ZSTD Compression**: All data is compressed with ZSTD (configurable level)
+- **Arrow-native**: Uses Apache Arrow as the in-memory representation for 
zero-copy integration
+
+Format Options:
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>mosaic.num-buckets</h5></td>
+      <td style="word-wrap: break-word;">auto</td>
+      <td>Integer</td>
+      <td>Number of column buckets for parallel I/O. When set to 0 or not 
specified, the format auto-determines the bucket count.</td>
+    </tr>
+    <tr>
+      <td><h5>mosaic.stats-columns</h5></td>
+      <td style="word-wrap: break-word;">(empty)</td>
+      <td>String</td>
+      <td>Comma-separated column names to collect min/max statistics for 
filter pushdown. Empty means no statistics are collected.</td>
+    </tr>
+    </tbody>
+</table>
+
+Limitations:
+1. Mosaic does not support complex types: ARRAY, MAP, MULTISET, ROW, VARIANT, 
BLOB, VECTOR.
+
+For more details, see the [Mosaic 
documentation](https://paimon.apache.org/docs/mosaic/).
+
 ## ROW
 
 The Row format is a row-oriented storage format designed for O(1) random 
access by row number. Data is organized in blocks with ZSTD Level 1 
compression. Each block contains complete rows serialized in a compact binary 
format with an offset array for direct row positioning.
diff --git a/docs/docs/pypaimon/index.md b/docs/docs/pypaimon/index.md
index 18989aaf82..e35f8b8bf7 100644
--- a/docs/docs/pypaimon/index.md
+++ b/docs/docs/pypaimon/index.md
@@ -50,3 +50,18 @@ pip3 install dist/*.tar.gz
 ```
 
 The command will install the package and core dependencies to your local 
Python environment.
+
+## Optional Dependencies
+
+PyPaimon supports multiple file formats via optional extras:
+
+```shell
+# Mosaic format (columnar-bucket hybrid, optimized for wide tables)
+pip install pypaimon[mosaic]
+
+# Lance format (optimized for ML / vector search)
+pip install pypaimon[lance]
+
+# Vortex format (requires Python >= 3.11)
+pip install pypaimon[vortex]
+```
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 5a51878294..9ca8796f96 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -80,6 +80,7 @@ class CoreOptions:
     FILE_FORMAT_LANCE: str = "lance"
     FILE_FORMAT_VORTEX: str = "vortex"
     FILE_FORMAT_ROW: str = "row"
+    FILE_FORMAT_MOSAIC: str = "mosaic"
 
     # Basic options
     AUTO_CREATE: ConfigOption[bool] = (
diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py 
b/paimon-python/pypaimon/filesystem/caching_file_io.py
index 2384ed0ca8..8aa64e227f 100644
--- a/paimon-python/pypaimon/filesystem/caching_file_io.py
+++ b/paimon-python/pypaimon/filesystem/caching_file_io.py
@@ -446,6 +446,9 @@ class CachingFileIO(FileIO):
     def write_blob(self, *args, **kwargs):
         return self._delegate.write_blob(*args, **kwargs)
 
+    def write_mosaic(self, *args, **kwargs):
+        return self._delegate.write_mosaic(*args, **kwargs)
+
     def write_vortex(self, *args, **kwargs):
         return self._delegate.write_vortex(*args, **kwargs)
 
diff --git a/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py 
b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
index ba6fc0f4b6..aa68e3b747 100644
--- a/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
+++ b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py
@@ -667,6 +667,15 @@ class HdfsNativeFileIO(FileIO):
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
 
+    def write_mosaic(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import mosaic
+            with self.new_output_stream(path) as output_stream:
+                mosaic.write_table(data, output_stream)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Mosaic file {path}: {e}") 
from e
+
     def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
         try:
             import vortex
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py 
b/paimon-python/pypaimon/filesystem/local_file_io.py
index 8385790885..d3f5f81f4f 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -393,6 +393,16 @@ class LocalFileIO(FileIO):
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
 
+    def write_mosaic(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import mosaic
+            os.makedirs(os.path.dirname(path), exist_ok=True)
+            with open(path, 'wb') as f:
+                mosaic.write_table(data, f)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Mosaic file {path}: {e}") 
from e
+
     def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
         try:
             import vortex
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 79df9f79cb..12d3e91b5c 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -642,6 +642,15 @@ class PyArrowFileIO(FileIO):
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Lance file {path}: {e}") from 
e
 
+    def write_mosaic(self, path: str, data: pyarrow.Table, **kwargs):
+        try:
+            import mosaic
+            with self.new_output_stream(path) as output_stream:
+                mosaic.write_table(data, output_stream)
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Mosaic file {path}: {e}") 
from e
+
     def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
         try:
             import vortex
diff --git a/paimon-python/pypaimon/read/reader/format_mosaic_reader.py 
b/paimon-python/pypaimon/read/reader/format_mosaic_reader.py
new file mode 100644
index 0000000000..4ab7d3b698
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_mosaic_reader.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, List, Optional
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.file_io import FileIO, supports_pread, pread
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.special_fields import SpecialFields
+
+
+class FormatMosaicReader(RecordBatchReader):
+
+    def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[DataField],
+                 push_down_predicate: Any, batch_size: int = 1024):
+        from mosaic import MosaicReader
+
+        self._read_field_names = [f.name for f in read_fields]
+        self._batch_size = batch_size
+
+        stream = file_io.new_input_stream(file_path)
+        file_length = file_io.get_file_size(file_path)
+
+        if supports_pread(stream):
+            self._stream = stream
+
+            def read_at(offset, length):
+                return pread(stream, length, offset)
+        else:
+            self._stream = None
+            file_data = stream.read()
+            stream.close()
+            file_length = len(file_data)
+
+            def read_at(offset, length):
+                return file_data[offset:offset + length]
+
+        self._reader = MosaicReader.from_input_file(read_at, file_length)
+
+        file_schema_names = set(f.name for f in self._reader.schema)
+        self.existing_fields = [f.name for f in read_fields if f.name in 
file_schema_names]
+        self.missing_fields = [f.name for f in read_fields if f.name not in 
file_schema_names]
+
+        if self.existing_fields:
+            self._reader.project(self.existing_fields)
+
+        if self.missing_fields:
+            output_schema = PyarrowFieldParser.from_paimon_schema(read_fields)
+            self._missing_out_fields = []
+            for name in self.missing_fields:
+                idx = output_schema.get_field_index(name)
+                col_type = output_schema.field(idx).type if idx >= 0 else 
pa.null()
+                nullable = not SpecialFields.is_system_field(name)
+                self._missing_out_fields.append(pa.field(name, col_type, 
nullable=nullable))
+
+        self._current_rg = 0
+        self._num_row_groups = self._reader.num_row_groups
+        self._current_batches = None
+
+        if push_down_predicate is not None:
+            self._predicate = push_down_predicate
+        else:
+            self._predicate = None
+
+    def _next_row_group_batches(self):
+        while self._current_rg < self._num_row_groups:
+            batch = self._reader.read_row_group(self._current_rg)
+            self._current_rg += 1
+
+            if batch.num_rows == 0:
+                continue
+
+            batch = self._fill_missing_fields(batch)
+
+            if self._predicate is not None:
+                dataset = ds.InMemoryDataset(pa.Table.from_batches([batch]))
+                scanner = dataset.scanner(filter=self._predicate, 
batch_size=self._batch_size)
+                return scanner.to_reader()
+            else:
+                return iter(pa.Table.from_batches([batch]).to_batches(
+                    max_chunksize=self._batch_size))
+        return None
+
+    def _fill_missing_fields(self, batch: RecordBatch) -> RecordBatch:
+        if not self.missing_fields:
+            return batch
+
+        all_columns = []
+        out_fields = []
+        for field_name in self._read_field_names:
+            if field_name in self.existing_fields:
+                col_idx = self.existing_fields.index(field_name)
+                all_columns.append(batch.column(col_idx))
+                out_fields.append(batch.schema.field(col_idx))
+            else:
+                miss_idx = self.missing_fields.index(field_name)
+                out_field = self._missing_out_fields[miss_idx]
+                all_columns.append(pa.nulls(batch.num_rows, 
type=out_field.type))
+                out_fields.append(out_field)
+        return pa.RecordBatch.from_arrays(all_columns, 
schema=pa.schema(out_fields))
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        while True:
+            if self._current_batches is not None:
+                try:
+                    if hasattr(self._current_batches, 'read_next_batch'):
+                        return self._current_batches.read_next_batch()
+                    else:
+                        return next(self._current_batches)
+                except StopIteration:
+                    self._current_batches = None
+
+            self._current_batches = self._next_row_group_batches()
+            if self._current_batches is None:
+                return None
+
+    def close(self):
+        if self._stream is not None:
+            self._stream.close()
+            self._stream = None
+        self._reader = None
+        self._current_batches = None
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index ddb349e0ca..643c40c86a 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -47,6 +47,7 @@ from pypaimon.read.reader.format_blob_reader import 
FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
 from pypaimon.read.reader.format_row_reader import FormatRowReader
+from pypaimon.read.reader.format_mosaic_reader import FormatMosaicReader
 from pypaimon.read.reader.format_vortex_reader import FormatVortexReader
 from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
                                                             RowPositionReader, 
EmptyRecordBatchReader)
@@ -269,6 +270,13 @@ class SplitRead(ABC):
                                                row_indices=row_indices,
                                                shard_range=shard_range,
                                                
predicate_fields=predicate_fields)
+        elif file_format == CoreOptions.FILE_FORMAT_MOSAIC:
+            if has_nested:
+                raise NotImplementedError(
+                    "Nested-field projection is not supported on Mosaic files")
+            ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
+            format_reader = FormatMosaicReader(self.table.file_io, file_path, 
ordered_read_fields,
+                                               read_arrow_predicate, 
batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
             ordered_nested_paths = (
@@ -303,7 +311,7 @@ class SplitRead(ABC):
         elif file_format in ('json', 'csv'):
             raise NotImplementedError(
                 f"Reading '{file_format}' format is not yet supported in 
Python SDK. "
-                f"Supported formats: parquet, orc, avro, lance, blob, row.")
+                f"Supported formats: parquet, orc, avro, lance, vortex, 
mosaic, blob, row.")
         else:
             raise ValueError(f"Unexpected file format: {file_format}")
 
diff --git a/paimon-python/pypaimon/tests/test_format_mosaic_reader_writer.py 
b/paimon-python/pypaimon/tests/test_format_mosaic_reader_writer.py
new file mode 100644
index 0000000000..154d56cd3c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_format_mosaic_reader_writer.py
@@ -0,0 +1,304 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import tempfile
+
+import pyarrow as pa
+import pytest
+
+import mosaic
+from pypaimon.read.reader.format_mosaic_reader import FormatMosaicReader
+from pypaimon.schema.data_types import AtomicType, DataField
+
+
+class SimpleFileIO:
+    """Minimal FileIO for testing."""
+
+    def get_file_size(self, path):
+        return os.path.getsize(path)
+
+    def new_input_stream(self, path):
+        return open(path, 'rb')
+
+
+def _write_mosaic_file(path, data: pa.Table):
+    with open(path, 'wb') as f:
+        mosaic.write_table(data, f)
+
+
+def _read_mosaic_file(path, read_fields, push_down_predicate=None):
+    file_io = SimpleFileIO()
+    reader = FormatMosaicReader(file_io, path, read_fields,
+                                push_down_predicate, batch_size=1024)
+    batches = []
+    while True:
+        batch = reader.read_arrow_batch()
+        if batch is None:
+            break
+        batches.append(batch)
+    reader.close()
+    if not batches:
+        return pa.table({f.name: pa.array([], type=pa.int32()) for f in 
read_fields})
+    return pa.Table.from_batches(batches)
+
+
+class TestFormatMosaicReaderWriter:
+
+    def test_basic_int_string(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+        ]
+        data = pa.table({
+            "id": pa.array([1, 2, 3], type=pa.int32()),
+            "name": pa.array(["alice", "bob", "charlie"], type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            result = _read_mosaic_file(path, fields)
+            assert result.column("id").to_pylist() == [1, 2, 3]
+            assert result.column("name").to_pylist() == ["alice", "bob", 
"charlie"]
+        finally:
+            os.unlink(path)
+
+    def test_all_primitive_types(self):
+        fields = [
+            DataField(0, "bool_col", AtomicType("BOOLEAN")),
+            DataField(1, "tinyint_col", AtomicType("TINYINT")),
+            DataField(2, "smallint_col", AtomicType("SMALLINT")),
+            DataField(3, "int_col", AtomicType("INT")),
+            DataField(4, "bigint_col", AtomicType("BIGINT")),
+            DataField(5, "float_col", AtomicType("FLOAT")),
+            DataField(6, "double_col", AtomicType("DOUBLE")),
+            DataField(7, "string_col", AtomicType("STRING")),
+            DataField(8, "binary_col", AtomicType("BYTES")),
+        ]
+        data = pa.table({
+            "bool_col": pa.array([True, False], type=pa.bool_()),
+            "tinyint_col": pa.array([1, -1], type=pa.int8()),
+            "smallint_col": pa.array([100, -100], type=pa.int16()),
+            "int_col": pa.array([1000, -1000], type=pa.int32()),
+            "bigint_col": pa.array([100000, -100000], type=pa.int64()),
+            "float_col": pa.array([1.5, -2.5], type=pa.float32()),
+            "double_col": pa.array([3.14, -2.71], type=pa.float64()),
+            "string_col": pa.array(["hello", "world"], type=pa.string()),
+            "binary_col": pa.array([b"\x01\x02", b"\x03\x04"], 
type=pa.binary()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            result = _read_mosaic_file(path, fields)
+            assert result.column("bool_col").to_pylist() == [True, False]
+            assert result.column("tinyint_col").to_pylist() == [1, -1]
+            assert result.column("smallint_col").to_pylist() == [100, -100]
+            assert result.column("int_col").to_pylist() == [1000, -1000]
+            assert result.column("bigint_col").to_pylist() == [100000, -100000]
+            assert result.column("float_col").to_pylist()[0] == 
pytest.approx(1.5)
+            assert result.column("double_col").to_pylist() == 
[pytest.approx(3.14), pytest.approx(-2.71)]
+            assert result.column("string_col").to_pylist() == ["hello", 
"world"]
+            assert result.column("binary_col").to_pylist() == [b"\x01\x02", 
b"\x03\x04"]
+        finally:
+            os.unlink(path)
+
+    def test_nulls(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+        ]
+        data = pa.table({
+            "id": pa.array([1, None, 3], type=pa.int32()),
+            "name": pa.array([None, "bob", None], type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            result = _read_mosaic_file(path, fields)
+            assert result.column("id").to_pylist() == [1, None, 3]
+            assert result.column("name").to_pylist() == [None, "bob", None]
+        finally:
+            os.unlink(path)
+
+    def test_decimal(self):
+        from decimal import Decimal
+
+        fields = [
+            DataField(0, "d1", AtomicType("DECIMAL(10, 2)")),
+        ]
+        data = pa.table({
+            "d1": pa.array([Decimal("123.45"), Decimal("-67.89")], 
type=pa.decimal128(10, 2)),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            result = _read_mosaic_file(path, fields)
+            assert result.column("d1").to_pylist() == [Decimal("123.45"), 
Decimal("-67.89")]
+        finally:
+            os.unlink(path)
+
+    def test_timestamp(self):
+        fields = [
+            DataField(0, "ts_millis", AtomicType("TIMESTAMP(3)")),
+        ]
+        data = pa.table({
+            "ts_millis": pa.array([1000, 2000], type=pa.timestamp('ms')),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            result = _read_mosaic_file(path, fields)
+            assert result.num_rows == 2
+        finally:
+            os.unlink(path)
+
+    def test_column_projection(self):
+        data = pa.table({
+            "id": pa.array([1, 2, 3], type=pa.int32()),
+            "name": pa.array(["a", "b", "c"], type=pa.string()),
+            "value": pa.array([1.1, 2.2, 3.3], type=pa.float64()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            projected_fields = [
+                DataField(0, "id", AtomicType("INT")),
+                DataField(2, "value", AtomicType("DOUBLE")),
+            ]
+            result = _read_mosaic_file(path, projected_fields)
+            assert result.num_columns == 2
+            assert result.column("id").to_pylist() == [1, 2, 3]
+            assert result.column("value").to_pylist() == [
+                pytest.approx(1.1), pytest.approx(2.2), pytest.approx(3.3)]
+        finally:
+            os.unlink(path)
+
+    def test_schema_evolution_missing_field(self):
+        """Reading a file that doesn't have a column added later (schema 
evolution)."""
+        data = pa.table({
+            "id": pa.array([1, 2], type=pa.int32()),
+            "name": pa.array(["a", "b"], type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            fields_read = [
+                DataField(0, "id", AtomicType("INT")),
+                DataField(1, "name", AtomicType("STRING")),
+                DataField(2, "score", AtomicType("DOUBLE")),
+            ]
+            result = _read_mosaic_file(path, fields_read)
+            assert result.column("id").to_pylist() == [1, 2]
+            assert result.column("name").to_pylist() == ["a", "b"]
+            assert result.column("score").to_pylist() == [None, None]
+        finally:
+            os.unlink(path)
+
+    def test_predicate_pushdown(self):
+        import pyarrow.compute as pc
+
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+        ]
+        data = pa.table({
+            "id": pa.array(list(range(100)), type=pa.int32()),
+            "name": pa.array([f"user_{i}" for i in range(100)], 
type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            predicate = pc.field("id") > 95
+            result = _read_mosaic_file(path, fields, 
push_down_predicate=predicate)
+            assert result.num_rows == 4
+            assert all(v > 95 for v in result.column("id").to_pylist())
+        finally:
+            os.unlink(path)
+
+    def test_large_dataset(self):
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "data", AtomicType("STRING")),
+        ]
+        num_rows = 10000
+        data = pa.table({
+            "id": pa.array(list(range(num_rows)), type=pa.int32()),
+            "data": pa.array([f"value_{i}" for i in range(num_rows)], 
type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            _write_mosaic_file(path, data)
+            result = _read_mosaic_file(path, fields)
+            assert result.num_rows == num_rows
+            assert result.column("id").to_pylist() == list(range(num_rows))
+        finally:
+            os.unlink(path)
+
+    def test_write_mosaic_local_file_io(self):
+        """Test write_mosaic via LocalFileIO."""
+        from pypaimon.filesystem.local_file_io import LocalFileIO
+
+        data = pa.table({
+            "id": pa.array([1, 2, 3], type=pa.int32()),
+            "name": pa.array(["a", "b", "c"], type=pa.string()),
+        })
+
+        with tempfile.NamedTemporaryFile(suffix=".mosaic", delete=False) as 
tmp:
+            path = tmp.name
+
+        try:
+            file_io = LocalFileIO({})
+            file_io.write_mosaic(path, data)
+
+            assert os.path.getsize(path) > 0
+
+            fields = [
+                DataField(0, "id", AtomicType("INT")),
+                DataField(1, "name", AtomicType("STRING")),
+            ]
+            result = _read_mosaic_file(path, fields)
+            assert result.column("id").to_pylist() == [1, 2, 3]
+            assert result.column("name").to_pylist() == ["a", "b", "c"]
+        finally:
+            os.unlink(path)
diff --git a/paimon-python/pypaimon/tests/test_format_mosaic_table.py 
b/paimon-python/pypaimon/tests/test_format_mosaic_table.py
new file mode 100644
index 0000000000..82a769cdf5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_format_mosaic_table.py
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Integration tests for the Mosaic file format across table types."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class MosaicFormatAppendOnlyTest(unittest.TestCase):
+    """Test Mosaic format with append-only (non-primary-key) tables."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('user_id', pa.int32()),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            ('dt', pa.string()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _write_and_read(self, table, data):
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        return read_builder.new_read().to_arrow(splits)
+
+    def test_append_only_no_partition(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, options={'file.format': 'mosaic'})
+        self.catalog.create_table('default.ao_mosaic_no_part', schema, False)
+        table = self.catalog.get_table('default.ao_mosaic_no_part')
+
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [1001, 1002, 1003],
+            'behavior': ['buy', 'click', 'view'],
+            'dt': ['2024-01-01', '2024-01-01', '2024-01-02'],
+        }, schema=self.pa_schema)
+
+        result = self._write_and_read(table, data)
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(sorted(result.column('user_id').to_pylist()), [1, 2, 
3])
+
+    def test_append_only_with_partition(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, partition_keys=['dt'],
+            options={'file.format': 'mosaic'})
+        self.catalog.create_table('default.ao_mosaic_partitioned', schema, 
False)
+        table = self.catalog.get_table('default.ao_mosaic_partitioned')
+
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['buy', 'click', 'view', 'buy'],
+            'dt': ['p1', 'p1', 'p2', 'p2'],
+        }, schema=self.pa_schema)
+
+        result = self._write_and_read(table, data)
+        self.assertEqual(result.num_rows, 4)
+
+    def test_append_only_multiple_commits(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, options={'file.format': 'mosaic'})
+        self.catalog.create_table('default.ao_mosaic_multi', schema, False)
+        table = self.catalog.get_table('default.ao_mosaic_multi')
+
+        write_builder = table.new_batch_write_builder()
+
+        for i in range(3):
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'user_id': [i * 10 + 1, i * 10 + 2],
+                'item_id': [int(i * 100 + 1), int(i * 100 + 2)],
+                'behavior': ['buy', 'click'],
+                'dt': ['2024-01-01', '2024-01-01'],
+            }, schema=self.pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        table = self.catalog.get_table('default.ao_mosaic_multi')
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+        self.assertEqual(result.num_rows, 6)
+
+
+class MosaicFormatPrimaryKeyTest(unittest.TestCase):
+    """Test Mosaic format with primary-key tables."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('pk', pa.int32()),
+            ('value', pa.string()),
+            ('amount', pa.float64()),
+        ])
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_primary_key_deduplicate(self):
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema, primary_keys=['pk'],
+            options={'file.format': 'mosaic'})
+        self.catalog.create_table('default.pk_mosaic_dedup', schema, False)
+        table = self.catalog.get_table('default.pk_mosaic_dedup')
+
+        write_builder = table.new_batch_write_builder()
+
+        # First write
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = pa.Table.from_pydict({
+            'pk': [1, 2, 3],
+            'value': ['a', 'b', 'c'],
+            'amount': [1.0, 2.0, 3.0],
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Second write (update pk=2)
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data2 = pa.Table.from_pydict({
+            'pk': [2],
+            'value': ['updated'],
+            'amount': [22.0],
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table = self.catalog.get_table('default.pk_mosaic_dedup')
+        read_builder = table.new_read_builder()
+        splits = read_builder.new_scan().plan().splits()
+        result = read_builder.new_read().to_arrow(splits)
+
+        self.assertEqual(result.num_rows, 3)
+        result_sorted = result.sort_by('pk')
+        self.assertEqual(result_sorted.column('value').to_pylist(), ['a', 
'updated', 'c'])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py 
b/paimon-python/pypaimon/write/writer/data_vector_writer.py
index 9de5e2a27a..d06d425936 100644
--- a/paimon-python/pypaimon/write/writer/data_vector_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py
@@ -212,6 +212,8 @@ class DataVectorWriter(DataWriter):
             self.file_io.write_lance(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
             self.file_io.write_vortex(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_MOSAIC:
+            self.file_io.write_mosaic(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
             self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
         else:
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 70c7594b08..313caa7f6d 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -198,6 +198,8 @@ class DataWriter(ABC):
             self.file_io.write_lance(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
             self.file_io.write_vortex(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_MOSAIC:
+            self.file_io.write_mosaic(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
             self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
         else:
diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py 
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index 6e7ea57781..a682da864b 100644
--- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -382,6 +382,8 @@ class DedicatedFormatWriter(DataWriter):
             self.file_io.write_lance(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_VORTEX:
             self.file_io.write_vortex(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_MOSAIC:
+            self.file_io.write_mosaic(file_path, data)
         elif self.file_format == CoreOptions.FILE_FORMAT_ROW:
             self.file_io.write_row(file_path, data, zstd_level=self.zstd_level)
         else:
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 7355e25bf9..e4974bdc9b 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -174,6 +174,9 @@ setup(
         'vortex': [
             'vortex-data==0.70.0; python_version>="3.11"',
         ],
+        'mosaic': [
+            'paimon-mosaic>=0.1.0',
+        ],
         'lumina': [
             'lumina-data>=0.1.0'
         ],


Reply via email to