This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new 0df16ac  Add Python bindings and CI test job (#8)
0df16ac is described below

commit 0df16acb520104cb36b7e882b09248b3473d02a8
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 16:24:14 2026 +0800

    Add Python bindings and CI test job (#8)
---
 .github/workflows/ci.yml    |  34 +++
 python/mosaic/__init__.py   |  34 +++
 python/mosaic/_ffi.py       | 216 +++++++++++++++++
 python/mosaic/mosaic.py     | 372 ++++++++++++++++++++++++++++
 python/pyproject.toml       |  37 +++
 python/setup.py             |  66 +++++
 python/tests/__init__.py    |  16 ++
 python/tests/test_mosaic.py | 578 ++++++++++++++++++++++++++++++++++++++++++++
 8 files changed, 1353 insertions(+)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 53ee1d3..70be68d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -131,3 +131,37 @@ jobs:
       - name: Run Java tests
         working-directory: java
         run: mvn test "-DargLine=-Djava.library.path=${{ github.workspace 
}}/target/release"
+
+  python-test:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v6
+
+      - name: Rust Cache
+        uses: actions/cache@v5
+        with:
+          path: |
+            ~/.cargo/registry
+            ~/.cargo/git
+            target
+          key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+          restore-keys: |
+            ${{ runner.os }}-cargo-
+
+      - name: Set up Python
+        uses: actions/setup-python@v5
+        with:
+          python-version: '3.12'
+
+      - name: Build FFI library
+        run: cargo build --release -p mosaic-ffi
+
+      - name: Install Python dependencies
+        working-directory: python
+        run: pip install pyarrow pytest
+
+      - name: Run Python tests
+        working-directory: python
+        run: pytest -v
+        env:
+          MOSAIC_LIB_PATH: ${{ github.workspace }}/target/release
diff --git a/python/mosaic/__init__.py b/python/mosaic/__init__.py
new file mode 100644
index 0000000..f9eb3b8
--- /dev/null
+++ b/python/mosaic/__init__.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from .mosaic import (
+    ColumnStatistics,
+    MosaicReader,
+    MosaicWriter,
+    WriterOptions,
+    read_table,
+    write_table,
+)
+
+__all__ = [
+    "ColumnStatistics",
+    "WriterOptions",
+    "MosaicWriter",
+    "MosaicReader",
+    "read_table",
+    "write_table",
+]
diff --git a/python/mosaic/_ffi.py b/python/mosaic/_ffi.py
new file mode 100644
index 0000000..a8a6551
--- /dev/null
+++ b/python/mosaic/_ffi.py
@@ -0,0 +1,216 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import ctypes
+import ctypes.util
+import os
+import platform
+import sys
+from ctypes import (
+    CFUNCTYPE,
+    POINTER,
+    Structure,
+    c_char_p,
+    c_double,
+    c_float,
+    c_int,
+    c_int8,
+    c_int16,
+    c_int32,
+    c_int64,
+    c_size_t,
+    c_uint8,
+    c_uint32,
+    c_uint64,
+    c_void_p,
+)
+
+
+def _load_library():
+    system = platform.system()
+    if system == "Darwin":
+        lib_name = "libmosaic_ffi.dylib"
+    elif system == "Windows":
+        lib_name = "mosaic_ffi.dll"
+    else:
+        lib_name = "libmosaic_ffi.so"
+
+    search_paths = []
+    pkg_dir = os.path.dirname(os.path.abspath(__file__))
+    search_paths.append(pkg_dir)
+    search_paths.append(os.path.join(pkg_dir, "..", ".."))
+
+    env_path = os.environ.get("MOSAIC_LIB_PATH")
+    if env_path:
+        search_paths.append(env_path)
+
+    for rel in [
+        os.path.join("..", "target", "release"),
+        os.path.join("..", "target", "debug"),
+        os.path.join("..", "..", "target", "release"),
+        os.path.join("..", "..", "target", "debug"),
+    ]:
+        search_paths.append(os.path.join(pkg_dir, rel))
+
+    for d in search_paths:
+        candidate = os.path.join(d, lib_name)
+        if os.path.isfile(candidate):
+            return ctypes.CDLL(candidate)
+
+    try:
+        return ctypes.CDLL(lib_name)
+    except OSError:
+        raise OSError(
+            f"Cannot find {lib_name}. Build the native library first with "
+            f"'cargo build --release -p mosaic-ffi', or set MOSAIC_LIB_PATH "
+            f"to the directory containing {lib_name}."
+        )
+
+
+lib = _load_library()
+
+# ======================== Callback types ========================
+
+WRITE_FN = CFUNCTYPE(c_int32, c_void_p, POINTER(c_uint8), c_size_t)
+FLUSH_FN = CFUNCTYPE(c_int32, c_void_p)
+GET_POS_FN = CFUNCTYPE(c_int64, c_void_p)
+
+READ_AT_FN = CFUNCTYPE(c_int32, c_void_p, c_uint64, POINTER(c_uint8), c_size_t)
+LENGTH_FN = CFUNCTYPE(c_uint64, c_void_p)
+
+
+class MosaicOutputFile(Structure):
+    _fields_ = [
+        ("ctx", c_void_p),
+        ("write_fn", WRITE_FN),
+        ("flush_fn", FLUSH_FN),
+        ("get_pos_fn", GET_POS_FN),
+    ]
+
+
+class MosaicInputFile(Structure):
+    _fields_ = [
+        ("ctx", c_void_p),
+        ("read_at_fn", READ_AT_FN),
+        ("length_fn", LENGTH_FN),
+    ]
+
+
+class MosaicWriterOptions(Structure):
+    _fields_ = [
+        ("compression", c_uint8),
+        ("zstd_level", c_int),
+        ("num_buckets", c_uint32),
+        ("row_group_max_size", c_uint64),
+        ("max_dict_total_bytes", c_uint32),
+        ("max_dict_entries", c_uint32),
+        ("stats_columns", POINTER(c_uint32)),
+        ("num_stats_columns", c_uint32),
+        ("page_size_threshold", c_uint32),
+    ]
+
+
+# ======================== Writer Options ========================
+
+lib.mosaic_writer_options_default.argtypes = []
+lib.mosaic_writer_options_default.restype = MosaicWriterOptions
+
+# ======================== Writer ========================
+
+lib.mosaic_writer_open.argtypes = [MosaicOutputFile, c_void_p, 
MosaicWriterOptions]
+lib.mosaic_writer_open.restype = c_void_p
+
+lib.mosaic_writer_close.argtypes = [c_void_p]
+lib.mosaic_writer_close.restype = c_int
+
+lib.mosaic_writer_free.argtypes = [c_void_p]
+lib.mosaic_writer_free.restype = None
+
+lib.mosaic_writer_estimated_file_size.argtypes = [c_void_p, POINTER(c_int64)]
+lib.mosaic_writer_estimated_file_size.restype = c_int
+
+lib.mosaic_writer_write_batch.argtypes = [c_void_p, c_void_p, c_void_p]
+lib.mosaic_writer_write_batch.restype = c_int
+
+# ======================== Reader ========================
+
+lib.mosaic_reader_open.argtypes = [MosaicInputFile]
+lib.mosaic_reader_open.restype = c_void_p
+
+lib.mosaic_reader_free.argtypes = [c_void_p]
+lib.mosaic_reader_free.restype = None
+
+lib.mosaic_reader_export_schema.argtypes = [c_void_p, c_void_p]
+lib.mosaic_reader_export_schema.restype = c_int
+
+lib.mosaic_reader_num_row_groups.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_reader_num_row_groups.restype = c_int
+
+# ======================== Row Group Reader ========================
+
+lib.mosaic_reader_open_row_group.argtypes = [c_void_p, c_uint32]
+lib.mosaic_reader_open_row_group.restype = c_void_p
+
+lib.mosaic_reader_open_row_group_projected.argtypes = [
+    c_void_p, c_uint32, POINTER(c_uint32), c_uint32,
+]
+lib.mosaic_reader_open_row_group_projected.restype = c_void_p
+
+lib.mosaic_row_group_reader_free.argtypes = [c_void_p]
+lib.mosaic_row_group_reader_free.restype = None
+
+lib.mosaic_row_group_reader_num_rows.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_row_group_reader_num_rows.restype = c_int
+
+# ======================== Record Batch (Arrow C Data Interface) 
========================
+
+lib.mosaic_row_group_reader_read_columns.argtypes = [c_void_p]
+lib.mosaic_row_group_reader_read_columns.restype = c_void_p
+
+lib.mosaic_record_batch_num_rows.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_record_batch_num_rows.restype = c_int
+
+lib.mosaic_record_batch_num_columns.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_record_batch_num_columns.restype = c_int
+
+lib.mosaic_record_batch_export.argtypes = [c_void_p, c_void_p, c_void_p]
+lib.mosaic_record_batch_export.restype = c_int
+
+lib.mosaic_record_batch_free.argtypes = [c_void_p]
+lib.mosaic_record_batch_free.restype = None
+
+# ======================== Row Group Stats ========================
+
+lib.mosaic_reader_row_group_num_stats.argtypes = [c_void_p, c_uint32, 
POINTER(c_uint32)]
+lib.mosaic_reader_row_group_num_stats.restype = c_int
+
+lib.mosaic_reader_row_group_stat_column_index.argtypes = [c_void_p, c_uint32, 
c_uint32, POINTER(c_uint32)]
+lib.mosaic_reader_row_group_stat_column_index.restype = c_int
+
+lib.mosaic_reader_row_group_stat_null_count.argtypes = [c_void_p, c_uint32, 
c_uint32, POINTER(c_uint64)]
+lib.mosaic_reader_row_group_stat_null_count.restype = c_int
+
+lib.mosaic_reader_row_group_stat_min.argtypes = [c_void_p, c_uint32, c_uint32, 
POINTER(c_size_t)]
+lib.mosaic_reader_row_group_stat_min.restype = POINTER(c_uint8)
+
+lib.mosaic_reader_row_group_stat_max.argtypes = [c_void_p, c_uint32, c_uint32, 
POINTER(c_size_t)]
+lib.mosaic_reader_row_group_stat_max.restype = POINTER(c_uint8)
+
+# ======================== Error ========================
+
+lib.mosaic_last_error.argtypes = []
+lib.mosaic_last_error.restype = c_char_p
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
new file mode 100644
index 0000000..857d12d
--- /dev/null
+++ b/python/mosaic/mosaic.py
@@ -0,0 +1,372 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import ctypes
+
+import pyarrow as pa
+
+from . import _ffi
+from ._ffi import lib
+
+
+class _ArrowSchema(ctypes.Structure):
+    _fields_ = [
+        ("format", ctypes.c_char_p),
+        ("name", ctypes.c_char_p),
+        ("metadata", ctypes.c_char_p),
+        ("flags", ctypes.c_int64),
+        ("n_children", ctypes.c_int64),
+        ("children", ctypes.c_void_p),
+        ("dictionary", ctypes.c_void_p),
+        ("release", ctypes.c_void_p),
+        ("private_data", ctypes.c_void_p),
+    ]
+
+
+class _ArrowArray(ctypes.Structure):
+    _fields_ = [
+        ("length", ctypes.c_int64),
+        ("null_count", ctypes.c_int64),
+        ("offset", ctypes.c_int64),
+        ("n_buffers", ctypes.c_int64),
+        ("n_children", ctypes.c_int64),
+        ("buffers", ctypes.c_void_p),
+        ("children", ctypes.c_void_p),
+        ("dictionary", ctypes.c_void_p),
+        ("release", ctypes.c_void_p),
+        ("private_data", ctypes.c_void_p),
+    ]
+
+
+def _check_error(msg="operation failed"):
+    err = lib.mosaic_last_error()
+    if err:
+        raise RuntimeError(err.decode("utf-8", errors="replace"))
+    raise RuntimeError(msg)
+
+
+class WriterOptions:
+    COMPRESSION_NONE = 0
+    COMPRESSION_ZSTD = 1
+
+    def __init__(
+        self,
+        compression=1,
+        zstd_level=1,
+        num_buckets=0,
+        row_group_max_size=256 * 1024 * 1024,
+        max_dict_total_bytes=32 * 1024,
+        max_dict_entries=255,
+        stats_columns=None,
+        page_size_threshold=32 * 1024,
+    ):
+        self.compression = compression
+        self.zstd_level = zstd_level
+        self.num_buckets = num_buckets
+        self.row_group_max_size = row_group_max_size
+        self.max_dict_total_bytes = max_dict_total_bytes
+        self.max_dict_entries = max_dict_entries
+        self.stats_columns = stats_columns or []
+        self.page_size_threshold = page_size_threshold
+
+    def _to_ffi(self):
+        opts = _ffi.MosaicWriterOptions()
+        opts.compression = self.compression
+        opts.zstd_level = self.zstd_level
+        opts.num_buckets = self.num_buckets
+        opts.row_group_max_size = self.row_group_max_size
+        opts.max_dict_total_bytes = self.max_dict_total_bytes
+        opts.max_dict_entries = self.max_dict_entries
+        refs = []
+        if self.stats_columns:
+            arr = (ctypes.c_uint32 * 
len(self.stats_columns))(*self.stats_columns)
+            refs.append(arr)
+            opts.stats_columns = arr
+            opts.num_stats_columns = len(self.stats_columns)
+        else:
+            opts.stats_columns = None
+            opts.num_stats_columns = 0
+        opts.page_size_threshold = self.page_size_threshold
+        return opts, refs
+
+
+class MosaicWriter:
+    def __init__(self, stream, schema, options=None):
+        if not isinstance(schema, pa.Schema):
+            raise TypeError(f"expected pyarrow.Schema, got {type(schema)}")
+
+        self._stream = stream
+        self._closed = False
+
+        self._write_callback = _ffi.WRITE_FN(self._on_write)
+        self._flush_callback = _ffi.FLUSH_FN(self._on_flush)
+        self._get_pos_callback = _ffi.GET_POS_FN(self._on_get_pos)
+        self._pos = 0
+
+        c_stream = _ffi.MosaicOutputFile()
+        c_stream.ctx = None
+        c_stream.write_fn = self._write_callback
+        c_stream.flush_fn = self._flush_callback
+        c_stream.get_pos_fn = self._get_pos_callback
+
+        c_opts, opts_refs = options._to_ffi() if options else 
WriterOptions()._to_ffi()
+
+        c_schema = _ArrowSchema()
+        schema_ptr = ctypes.addressof(c_schema)
+        schema._export_to_c(schema_ptr)
+
+        self._handle = lib.mosaic_writer_open(c_stream, 
ctypes.c_void_p(schema_ptr), c_opts)
+        del opts_refs
+        if not self._handle:
+            _check_error("failed to open writer")
+
+    def _on_write(self, ctx, data, length):
+        try:
+            buf = (ctypes.c_char * length).from_address(ctypes.cast(data, 
ctypes.c_void_p).value)
+            self._stream.write(buf)
+            self._pos += length
+            return 0
+        except Exception:
+            return -1
+
+    def _on_flush(self, ctx):
+        try:
+            self._stream.flush()
+            return 0
+        except Exception:
+            return -1
+
+    def _on_get_pos(self, ctx):
+        return self._pos
+
+    def write(self, data):
+        if isinstance(data, pa.Table):
+            for record_batch in data.to_batches():
+                self._write_single_batch(record_batch)
+        elif isinstance(data, pa.RecordBatch):
+            self._write_single_batch(data)
+        else:
+            raise TypeError(f"expected pyarrow.RecordBatch or pyarrow.Table, 
got {type(data)}")
+
+    def _write_single_batch(self, batch):
+        c_schema = _ArrowSchema()
+        c_array = _ArrowArray()
+        schema_ptr = ctypes.addressof(c_schema)
+        array_ptr = ctypes.addressof(c_array)
+        batch._export_to_c(array_ptr, schema_ptr)
+        rc = lib.mosaic_writer_write_batch(
+            self._handle,
+            ctypes.c_void_p(array_ptr),
+            ctypes.c_void_p(schema_ptr),
+        )
+        if rc != 0:
+            _check_error("write_batch failed")
+
+    def estimated_file_size(self):
+        out = ctypes.c_int64(0)
+        rc = lib.mosaic_writer_estimated_file_size(self._handle, 
ctypes.byref(out))
+        if rc != 0:
+            _check_error("estimated_file_size failed")
+        return out.value
+
+    def close(self):
+        if not self._closed and self._handle:
+            self._closed = True
+            rc = lib.mosaic_writer_close(self._handle)
+            lib.mosaic_writer_free(self._handle)
+            self._handle = None
+            if rc != 0:
+                _check_error("close failed")
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def __del__(self):
+        self.close()
+
+
+class ColumnStatistics:
+    def __init__(self, column_index, null_count, min, max):
+        self.column_index = column_index
+        self.null_count = null_count
+        self.min = min
+        self.max = max
+
+    @property
+    def has_min_max(self):
+        return self.min is not None
+
+
+class MosaicReader:
+    def __init__(self, handle, refs=None):
+        self._handle = handle
+        self._refs = refs
+        c_schema = _ArrowSchema()
+        schema_ptr = ctypes.addressof(c_schema)
+        rc = lib.mosaic_reader_export_schema(handle, 
ctypes.c_void_p(schema_ptr))
+        if rc != 0:
+            _check_error("export_schema failed")
+        self._schema = pa.Schema._import_from_c(schema_ptr)
+
+    @staticmethod
+    def from_input_file(read_at_fn, file_length):
+        """Create a MosaicReader from a callable and file length.
+
+        ``read_at_fn(offset, length) -> bytes`` must be thread-safe: the
+        reader may call it concurrently from multiple threads to perform
+        parallel IO.
+        """
+        @_ffi.READ_AT_FN
+        def c_read_at(ctx, offset, buf, length):
+            try:
+                data = read_at_fn(offset, length)
+                if len(data) != length:
+                    return -1
+                ctypes.memmove(buf, data, length)
+                return 0
+            except Exception:
+                return -1
+
+        @_ffi.LENGTH_FN
+        def c_length(ctx):
+            return file_length
+
+        input_file = _ffi.MosaicInputFile()
+        input_file.ctx = None
+        input_file.read_at_fn = c_read_at
+        input_file.length_fn = c_length
+
+        handle = lib.mosaic_reader_open(input_file)
+        if not handle:
+            _check_error("failed to open reader")
+        return MosaicReader(handle, refs=(c_read_at, c_length, input_file))
+
+    @property
+    def schema(self):
+        return self._schema
+
+    @property
+    def num_row_groups(self):
+        out = ctypes.c_uint32(0)
+        rc = lib.mosaic_reader_num_row_groups(self._handle, ctypes.byref(out))
+        if rc != 0:
+            _check_error("num_row_groups failed")
+        return out.value
+
+    def read_row_group(self, rg_index, columns=None):
+        if columns is not None:
+            arr = (ctypes.c_uint32 * len(columns))(*columns)
+            rg_handle = lib.mosaic_reader_open_row_group_projected(
+                self._handle, rg_index, arr, len(columns),
+            )
+        else:
+            rg_handle = lib.mosaic_reader_open_row_group(self._handle, 
rg_index)
+        if not rg_handle:
+            _check_error(f"failed to open row group {rg_index}")
+        rb_handle = lib.mosaic_row_group_reader_read_columns(rg_handle)
+        lib.mosaic_row_group_reader_free(rg_handle)
+        if not rb_handle:
+            _check_error("read_columns failed")
+        try:
+            c_schema = _ArrowSchema()
+            c_array = _ArrowArray()
+            schema_ptr = ctypes.addressof(c_schema)
+            array_ptr = ctypes.addressof(c_array)
+            rc = lib.mosaic_record_batch_export(
+                rb_handle,
+                ctypes.c_void_p(array_ptr),
+                ctypes.c_void_p(schema_ptr),
+            )
+            if rc != 0:
+                _check_error("record_batch_export failed")
+            return pa.RecordBatch._import_from_c(array_ptr, schema_ptr)
+        finally:
+            lib.mosaic_record_batch_free(rb_handle)
+
+    def read_all(self, columns=None):
+        batches = []
+        for rg in range(self.num_row_groups):
+            batches.append(self.read_row_group(rg, columns=columns))
+        if batches:
+            return pa.Table.from_batches(batches, schema=batches[0].schema)
+        schema = self._schema
+        if columns is not None:
+            schema = pa.schema([self._schema.field(i) for i in columns])
+        return pa.Table.from_batches([], schema=schema)
+
+    def get_row_group_statistics(self, rg_index):
+        n_out = ctypes.c_uint32(0)
+        rc = lib.mosaic_reader_row_group_num_stats(self._handle, rg_index, 
ctypes.byref(n_out))
+        if rc != 0:
+            _check_error("row_group_num_stats failed")
+        n = n_out.value
+        result = []
+        for i in range(n):
+            col_idx_out = ctypes.c_uint32(0)
+            rc = lib.mosaic_reader_row_group_stat_column_index(
+                self._handle, rg_index, i, ctypes.byref(col_idx_out)
+            )
+            if rc != 0:
+                _check_error("stat_column_index failed")
+            col_idx = col_idx_out.value
+            null_count_out = ctypes.c_uint64(0)
+            rc = lib.mosaic_reader_row_group_stat_null_count(
+                self._handle, rg_index, i, ctypes.byref(null_count_out)
+            )
+            if rc != 0:
+                _check_error("stat_null_count failed")
+            null_count = null_count_out.value
+            out_len = ctypes.c_size_t(0)
+            ptr = lib.mosaic_reader_row_group_stat_min(self._handle, rg_index, 
i, ctypes.byref(out_len))
+            min_val = ctypes.string_at(ptr, out_len.value) if ptr and 
out_len.value > 0 else None
+            out_len = ctypes.c_size_t(0)
+            ptr = lib.mosaic_reader_row_group_stat_max(self._handle, rg_index, 
i, ctypes.byref(out_len))
+            max_val = ctypes.string_at(ptr, out_len.value) if ptr and 
out_len.value > 0 else None
+            result.append(ColumnStatistics(col_idx, null_count, min_val, 
max_val))
+        return result
+
+    def close(self):
+        if self._handle:
+            lib.mosaic_reader_free(self._handle)
+            self._handle = None
+        if self._refs and isinstance(self._refs, tuple) and 
hasattr(self._refs[0], "close"):
+            self._refs[0].close()
+            self._refs = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def __del__(self):
+        self.close()
+
+
+def write_table(table, stream, options=None):
+    if not isinstance(table, pa.Table):
+        raise TypeError(f"expected pyarrow.Table, got {type(table)}")
+    with MosaicWriter(stream, table.schema, options) as writer:
+        writer.write(table)
+
+
+def read_table(read_at_fn, file_length, columns=None):
+    with MosaicReader.from_input_file(read_at_fn, file_length) as reader:
+        return reader.read_all(columns=columns)
diff --git a/python/pyproject.toml b/python/pyproject.toml
new file mode 100644
index 0000000..893b319
--- /dev/null
+++ b/python/pyproject.toml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[build-system]
+requires = ["setuptools>=64"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "mosaic-format"
+version = "0.1.0"
+description = "Python bindings for the Mosaic columnar-bucket hybrid file 
format"
+license = {text = "Apache-2.0"}
+requires-python = ">=3.9"
+dependencies = ["pyarrow"]
+
+[project.optional-dependencies]
+test = ["pytest"]
+
+[tool.setuptools.packages.find]
+include = ["mosaic*"]
+
+[tool.setuptools.package-data]
+mosaic = ["libmosaic_ffi.dylib", "libmosaic_ffi.so", "mosaic_ffi.dll"]
diff --git a/python/setup.py b/python/setup.py
new file mode 100644
index 0000000..9c8724f
--- /dev/null
+++ b/python/setup.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Build helper: copies the pre-built native library into the package 
directory."""
+
+import os
+import platform
+import shutil
+
+from setuptools import setup
+from setuptools.command.build_py import build_py
+
+
+def _lib_name():
+    system = platform.system()
+    if system == "Darwin":
+        return "libmosaic_ffi.dylib"
+    elif system == "Windows":
+        return "mosaic_ffi.dll"
+    return "libmosaic_ffi.so"
+
+
+def _find_native_lib():
+    here = os.path.dirname(os.path.abspath(__file__))
+    lib = _lib_name()
+
+    env_path = os.environ.get("MOSAIC_LIB_PATH")
+    if env_path:
+        candidate = os.path.join(env_path, lib)
+        if os.path.isfile(candidate):
+            return candidate
+
+    for profile in ["release", "debug"]:
+        candidate = os.path.join(here, "..", "target", profile, lib)
+        if os.path.isfile(candidate):
+            return candidate
+
+    return None
+
+
+class BuildPyWithNativeLib(build_py):
+    def run(self):
+        src = _find_native_lib()
+        if src:
+            dst = os.path.join(
+                os.path.dirname(os.path.abspath(__file__)), "mosaic", 
_lib_name()
+            )
+            shutil.copy2(src, dst)
+        super().run()
+
+
+setup(cmdclass={"build_py": BuildPyWithNativeLib})
diff --git a/python/tests/__init__.py b/python/tests/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/python/tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
new file mode 100644
index 0000000..4f188ff
--- /dev/null
+++ b/python/tests/test_mosaic.py
@@ -0,0 +1,578 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import struct
+
+import pyarrow as pa
+import pytest
+
+from mosaic import (
+    ColumnStatistics,
+    MosaicReader,
+    MosaicWriter,
+    WriterOptions,
+    read_table,
+    write_table,
+)
+
+
+def _write_to_bytes(pa_schema, data, options=None):
+    buf = io.BytesIO()
+    with MosaicWriter(buf, pa_schema, options) as writer:
+        writer.write(data)
+    return buf.getvalue()
+
+
+def _reader_from_bytes(data):
+    return MosaicReader.from_input_file(
+        lambda offset, length: data[offset : offset + length], len(data)
+    )
+
+
+class TestRoundtrip:
+    def test_basic_roundtrip(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32(), nullable=False),
+                pa.field("name", pa.utf8()),
+                pa.field("score", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(50)), type=pa.int32()),
+                pa.array([f"user_{i}" for i in range(50)]),
+                pa.array([i * 1.5 for i in range(50)]),
+            ],
+            names=["id", "name", "score"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+        assert len(data) > 32
+        assert data[-4:] == b"MOSA"
+
+        with _reader_from_bytes(data) as reader:
+            assert reader.num_row_groups >= 1
+
+            total_rows = 0
+            for rg in range(reader.num_row_groups):
+                rb = reader.read_row_group(rg)
+                total_rows += rb.num_rows
+
+                ids = rb.column("id").to_pylist()
+                names = rb.column("name").to_pylist()
+                scores = rb.column("score").to_pylist()
+
+                for j in range(rb.num_rows):
+                    idx = ids[j]
+                    assert names[j] == f"user_{idx}"
+                    assert abs(scores[j] - idx * 1.5) < 1e-9
+
+            assert total_rows == 50
+
+    def test_null_values(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.utf8()),
+                pa.field("value", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([1, 2, 3, 4], type=pa.int32()),
+                pa.array(["hello", None, "world", None]),
+                pa.array([1.0, 2.0, None, None]),
+            ],
+            names=["id", "name", "value"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.num_rows == 4
+
+            names = rb.column("name")
+            values = rb.column("value")
+
+            assert names[0].as_py() == "hello"
+            assert names[1].as_py() is None
+            assert names[2].as_py() == "world"
+            assert names[3].as_py() is None
+
+            assert values[0].as_py() == 1.0
+            assert values[1].as_py() == 2.0
+            assert values[2].as_py() is None
+            assert values[3].as_py() is None
+
+    def test_compression_none(self):
+        pa_schema = pa.schema(
+            [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+        )
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(20)), type=pa.int32()),
+                pa.array([f"v_{i}" for i in range(20)]),
+            ],
+            names=["x", "y"],
+        )
+        opts = WriterOptions(compression=WriterOptions.COMPRESSION_NONE)
+        data = _write_to_bytes(pa_schema, batch, opts)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.num_rows == 20
+            assert rb.column("x").to_pylist() == list(range(20))
+
+    def test_compression_zstd(self):
+        pa_schema = pa.schema(
+            [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+        )
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(100)), type=pa.int32()),
+                pa.array([f"v_{i}" for i in range(100)]),
+            ],
+            names=["x", "y"],
+        )
+        opts = WriterOptions(compression=WriterOptions.COMPRESSION_ZSTD, 
zstd_level=3)
+        data = _write_to_bytes(pa_schema, batch, opts)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.num_rows == 100
+            assert rb.column("x").to_pylist() == list(range(100))
+
+    def test_all_types(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("f_bool", pa.bool_()),
+                pa.field("f_int8", pa.int8()),
+                pa.field("f_int16", pa.int16()),
+                pa.field("f_int32", pa.int32()),
+                pa.field("f_int64", pa.int64()),
+                pa.field("f_float32", pa.float32()),
+                pa.field("f_float64", pa.float64()),
+                pa.field("f_utf8", pa.utf8()),
+                pa.field("f_binary", pa.binary()),
+                pa.field("f_decimal", pa.decimal128(10, 2)),
+                pa.field("f_date", pa.date32()),
+                pa.field("f_timestamp", pa.timestamp("ms")),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([True, False]),
+                pa.array([42, -1], type=pa.int8()),
+                pa.array([1234, -5678], type=pa.int16()),
+                pa.array([100000, -200000], type=pa.int32()),
+                pa.array([9999999999, -9999999999], type=pa.int64()),
+                pa.array([3.14, -2.71], type=pa.float32()),
+                pa.array([2.718281828, -3.141592653]),
+                pa.array(["hello", "world"]),
+                pa.array([b"\x01\x02\x03", b"\xff\x00"], type=pa.binary()),
+                pa.array([1234567, -9876543], type=pa.decimal128(10, 2)),
+                pa.array([19000, 0], type=pa.date32()),
+                pa.array([1700000000000, 0], type=pa.timestamp("ms")),
+            ],
+            names=[
+                "f_bool", "f_int8", "f_int16", "f_int32", "f_int64",
+                "f_float32", "f_float64", "f_utf8", "f_binary",
+                "f_decimal", "f_date", "f_timestamp",
+            ],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.num_rows == 2
+
+            assert rb.column("f_bool").to_pylist() == [True, False]
+            assert rb.column("f_int8").to_pylist() == [42, -1]
+            assert rb.column("f_int16").to_pylist() == [1234, -5678]
+            assert rb.column("f_int32").to_pylist() == [100000, -200000]
+            assert rb.column("f_int64").to_pylist() == [9999999999, 
-9999999999]
+            assert rb.column("f_utf8").to_pylist() == ["hello", "world"]
+            assert rb.column("f_binary").to_pylist() == [b"\x01\x02\x03", 
b"\xff\x00"]
+
+            f32 = rb.column("f_float32").to_pylist()
+            assert abs(f32[0] - 3.14) < 1e-5
+            assert abs(f32[1] - (-2.71)) < 1e-5
+
+            f64 = rb.column("f_float64").to_pylist()
+            assert abs(f64[0] - 2.718281828) < 1e-9
+            assert abs(f64[1] - (-3.141592653)) < 1e-9
+
+    def test_multiple_row_groups(self):
+        pa_schema = pa.schema(
+            [pa.field("id", pa.int32()), pa.field("data", pa.int64())]
+        )
+
+        opts = WriterOptions(
+            compression=WriterOptions.COMPRESSION_NONE,
+            num_buckets=1,
+            row_group_max_size=200,
+        )
+        buf = io.BytesIO()
+        with MosaicWriter(buf, pa_schema, opts) as writer:
+            for start in range(0, 500, 50):
+                batch = pa.record_batch(
+                    [
+                        pa.array(list(range(start, start + 50)), 
type=pa.int32()),
+                        pa.array(
+                            [i * 3 for i in range(start, start + 50)],
+                            type=pa.int64(),
+                        ),
+                    ],
+                    names=["id", "data"],
+                )
+                writer.write(batch)
+        data = buf.getvalue()
+
+        with _reader_from_bytes(data) as reader:
+            assert reader.num_row_groups > 1
+
+            offset = 0
+            for rg in range(reader.num_row_groups):
+                rb = reader.read_row_group(rg)
+                ids = rb.column("id").to_pylist()
+                datas = rb.column("data").to_pylist()
+                for j in range(rb.num_rows):
+                    assert ids[j] == offset + j
+                    assert datas[j] == (offset + j) * 3
+                offset += rb.num_rows
+
+            assert offset == 500
+
+    def test_multiple_writes(self):
+        pa_schema = pa.schema(
+            [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+        )
+
+        buf = io.BytesIO()
+        with MosaicWriter(buf, pa_schema) as writer:
+            for start in [0, 10, 20]:
+                batch = pa.record_batch(
+                    [
+                        pa.array(list(range(start, start + 10)), 
type=pa.int32()),
+                        pa.array([f"r_{i}" for i in range(start, start + 10)]),
+                    ],
+                    names=["x", "y"],
+                )
+                writer.write(batch)
+
+        data = buf.getvalue()
+        with _reader_from_bytes(data) as reader:
+            table = reader.read_all()
+            assert table.num_rows == 30
+            xs = table.column("x").to_pylist()
+            assert xs == list(range(30))
+
+    def test_single_row(self):
+        pa_schema = pa.schema([pa.field("v", pa.int32())])
+        batch = pa.record_batch(
+            [pa.array([42], type=pa.int32())], names=["v"]
+        )
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            rb = reader.read_row_group(0)
+            assert rb.num_rows == 1
+            assert rb.column("v")[0].as_py() == 42
+
+    def test_zero_rows(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("v", pa.int32(), nullable=False),
+                pa.field("s", pa.utf8(), nullable=True),
+            ]
+        )
+        batch = pa.record_batch(
+            [pa.array([], type=pa.int32()), pa.array([], type=pa.utf8())],
+            names=["v", "s"],
+        )
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            table = reader.read_all()
+            assert table.num_rows == 0
+            assert table.schema.names == ["v", "s"]
+            assert table.schema.field("v").type == pa.int32()
+            assert table.schema.field("s").type == pa.utf8()
+            assert not table.schema.field("v").nullable
+            assert table.schema.field("s").nullable
+
+
+class TestProjection:
+    def test_projection_subset(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("a", pa.int32()),
+                pa.field("b", pa.utf8()),
+                pa.field("c", pa.float64()),
+                pa.field("d", pa.utf8()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(20)), type=pa.int32()),
+                pa.array([f"val_{i}" for i in range(20)]),
+                pa.array([float(i) for i in range(20)]),
+                pa.array([f"extra_{i}" for i in range(20)]),
+            ],
+            names=["a", "b", "c", "d"],
+        )
+
+        opts = WriterOptions(num_buckets=2)
+        data = _write_to_bytes(pa_schema, batch, opts)
+
+        with _reader_from_bytes(data) as reader:
+            a_col = reader.schema.get_field_index("a")
+            b_col = reader.schema.get_field_index("b")
+
+            total_rows = 0
+            for rg in range(reader.num_row_groups):
+                rb = reader.read_row_group(rg, columns=[a_col, b_col])
+                assert rb.num_columns == 2
+                total_rows += rb.num_rows
+
+            assert total_rows == 20
+
+    def test_projection_single_column(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("a", pa.int32()),
+                pa.field("b", pa.utf8()),
+                pa.field("c", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(10)), type=pa.int32()),
+                pa.array([f"v_{i}" for i in range(10)]),
+                pa.array([float(i) for i in range(10)]),
+            ],
+            names=["a", "b", "c"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            b_col = reader.schema.get_field_index("b")
+            rb = reader.read_row_group(0, columns=[b_col])
+            assert rb.num_columns == 1
+            assert rb.num_rows == 10
+
+
+class TestSchema:
+    def test_schema_roundtrip(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32(), nullable=False),
+                pa.field("name", pa.utf8(), nullable=True),
+                pa.field("score", pa.float64(), nullable=True),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([1], type=pa.int32()),
+                pa.array(["x"]),
+                pa.array([1.0]),
+            ],
+            names=["id", "name", "score"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            s = reader.schema
+            assert len(s) == 3
+            assert s.field("id").type == pa.int32()
+            assert s.field("name").type == pa.utf8()
+            assert s.field("score").type == pa.float64()
+            assert not s.field("id").nullable
+            assert s.field("name").nullable
+
+
+class TestStatistics:
+    def test_stats_basic(self):
+        pa_schema = pa.schema(
+            [
+                pa.field("id", pa.int32()),
+                pa.field("name", pa.utf8()),
+                pa.field("score", pa.float64()),
+            ]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([i * 10 for i in range(10)], type=pa.int32()),
+                pa.array([f"item_{i}" for i in range(10)]),
+                pa.array([i * 1.1 for i in range(10)]),
+            ],
+            names=["id", "name", "score"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 2])
+        data = _write_to_bytes(pa_schema, batch, opts)
+
+        with _reader_from_bytes(data) as reader:
+            for rg in range(reader.num_row_groups):
+                stats = reader.get_row_group_statistics(rg)
+                assert len(stats) > 0
+                for stat in stats:
+                    assert isinstance(stat, ColumnStatistics)
+                    assert stat.column_index in (0, 2)
+                    assert stat.null_count == 0
+                    assert stat.has_min_max
+                    assert stat.min is not None
+                    assert stat.max is not None
+                    assert len(stat.min) > 0
+                    assert len(stat.max) > 0
+
+                id_stat = next(s for s in stats if s.column_index == 0)
+                min_id = struct.unpack(">i", id_stat.min)[0]
+                max_id = struct.unpack(">i", id_stat.max)[0]
+                assert min_id == 0
+                assert max_id == 90
+
+    def test_stats_with_nulls(self):
+        pa_schema = pa.schema(
+            [pa.field("a", pa.int32()), pa.field("b", pa.int64())]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array([10, None, 5, 20], type=pa.int32()),
+                pa.array([None, None, 100, 50], type=pa.int64()),
+            ],
+            names=["a", "b"],
+        )
+
+        opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+        data = _write_to_bytes(pa_schema, batch, opts)
+
+        with _reader_from_bytes(data) as reader:
+            stats = reader.get_row_group_statistics(0)
+            assert len(stats) == 2
+
+            a_stat = next(s for s in stats if s.column_index == 0)
+            assert a_stat.null_count == 1
+            assert a_stat.has_min_max
+            min_a = struct.unpack(">i", a_stat.min)[0]
+            max_a = struct.unpack(">i", a_stat.max)[0]
+            assert min_a == 5
+            assert max_a == 20
+
+            b_stat = next(s for s in stats if s.column_index == 1)
+            assert b_stat.null_count == 2
+            assert b_stat.has_min_max
+            min_b = struct.unpack(">q", b_stat.min)[0]
+            max_b = struct.unpack(">q", b_stat.max)[0]
+            assert min_b == 50
+            assert max_b == 100
+
+    def test_stats_all_null(self):
+        pa_schema = pa.schema([pa.field("x", pa.int32())])
+
+        batch = pa.record_batch(
+            [pa.array([None, None, None], type=pa.int32())], names=["x"]
+        )
+
+        opts = WriterOptions(stats_columns=[0], num_buckets=1)
+        data = _write_to_bytes(pa_schema, batch, opts)
+
+        with _reader_from_bytes(data) as reader:
+            stats = reader.get_row_group_statistics(0)
+            assert len(stats) == 1
+            assert stats[0].null_count == 3
+            assert not stats[0].has_min_max
+            assert stats[0].min is None
+            assert stats[0].max is None
+
+
+class TestConvenience:
+    def test_write_table_read_table(self):
+        table = pa.table(
+            {
+                "id": pa.array(list(range(30)), type=pa.int32()),
+                "name": pa.array([f"user_{i}" for i in range(30)]),
+            }
+        )
+
+        buf = io.BytesIO()
+        write_table(table, buf)
+
+        data = buf.getvalue()
+        result = read_table(
+            lambda offset, length: data[offset : offset + length], len(data)
+        )
+
+        assert result.num_rows == 30
+        assert result.column("id").to_pylist() == list(range(30))
+        assert result.column("name").to_pylist() == [f"user_{i}" for i in 
range(30)]
+
+    def test_read_all(self):
+        pa_schema = pa.schema(
+            [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(25)), type=pa.int32()),
+                pa.array([f"row_{i}" for i in range(25)]),
+            ],
+            names=["x", "y"],
+        )
+
+        data = _write_to_bytes(pa_schema, batch)
+
+        with _reader_from_bytes(data) as reader:
+            table = reader.read_all()
+            assert isinstance(table, pa.Table)
+            assert table.num_rows == 25
+            assert table.column("x").to_pylist() == list(range(25))
+
+
+class TestWriter:
+    def test_estimated_file_size(self):
+        pa_schema = pa.schema(
+            [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+        )
+
+        batch = pa.record_batch(
+            [
+                pa.array(list(range(100)), type=pa.int32()),
+                pa.array([f"value_{i}" for i in range(100)]),
+            ],
+            names=["x", "y"],
+        )
+
+        buf = io.BytesIO()
+        with MosaicWriter(buf, pa_schema) as writer:
+            writer.write(batch)
+            est = writer.estimated_file_size()
+            assert est > 0

Reply via email to