This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 0df16ac Add Python bindings and CI test job (#8)
0df16ac is described below
commit 0df16acb520104cb36b7e882b09248b3473d02a8
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 16:24:14 2026 +0800
Add Python bindings and CI test job (#8)
---
.github/workflows/ci.yml | 34 +++
python/mosaic/__init__.py | 34 +++
python/mosaic/_ffi.py | 216 +++++++++++++++++
python/mosaic/mosaic.py | 372 ++++++++++++++++++++++++++++
python/pyproject.toml | 37 +++
python/setup.py | 66 +++++
python/tests/__init__.py | 16 ++
python/tests/test_mosaic.py | 578 ++++++++++++++++++++++++++++++++++++++++++++
8 files changed, 1353 insertions(+)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 53ee1d3..70be68d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -131,3 +131,37 @@ jobs:
- name: Run Java tests
working-directory: java
run: mvn test "-DargLine=-Djava.library.path=${{ github.workspace
}}/target/release"
+
+ python-test:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v6
+
+ - name: Rust Cache
+ uses: actions/cache@v5
+ with:
+ path: |
+ ~/.cargo/registry
+ ~/.cargo/git
+ target
+ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
+ restore-keys: |
+ ${{ runner.os }}-cargo-
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.12'
+
+ - name: Build FFI library
+ run: cargo build --release -p mosaic-ffi
+
+ - name: Install Python dependencies
+ working-directory: python
+ run: pip install pyarrow pytest
+
+ - name: Run Python tests
+ working-directory: python
+ run: pytest -v
+ env:
+ MOSAIC_LIB_PATH: ${{ github.workspace }}/target/release
diff --git a/python/mosaic/__init__.py b/python/mosaic/__init__.py
new file mode 100644
index 0000000..f9eb3b8
--- /dev/null
+++ b/python/mosaic/__init__.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from .mosaic import (
+ ColumnStatistics,
+ MosaicReader,
+ MosaicWriter,
+ WriterOptions,
+ read_table,
+ write_table,
+)
+
+__all__ = [
+ "ColumnStatistics",
+ "WriterOptions",
+ "MosaicWriter",
+ "MosaicReader",
+ "read_table",
+ "write_table",
+]
diff --git a/python/mosaic/_ffi.py b/python/mosaic/_ffi.py
new file mode 100644
index 0000000..a8a6551
--- /dev/null
+++ b/python/mosaic/_ffi.py
@@ -0,0 +1,216 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import ctypes
+import ctypes.util
+import os
+import platform
+import sys
+from ctypes import (
+ CFUNCTYPE,
+ POINTER,
+ Structure,
+ c_char_p,
+ c_double,
+ c_float,
+ c_int,
+ c_int8,
+ c_int16,
+ c_int32,
+ c_int64,
+ c_size_t,
+ c_uint8,
+ c_uint32,
+ c_uint64,
+ c_void_p,
+)
+
+
+def _load_library():
+ system = platform.system()
+ if system == "Darwin":
+ lib_name = "libmosaic_ffi.dylib"
+ elif system == "Windows":
+ lib_name = "mosaic_ffi.dll"
+ else:
+ lib_name = "libmosaic_ffi.so"
+
+ search_paths = []
+ pkg_dir = os.path.dirname(os.path.abspath(__file__))
+ search_paths.append(pkg_dir)
+ search_paths.append(os.path.join(pkg_dir, "..", ".."))
+
+ env_path = os.environ.get("MOSAIC_LIB_PATH")
+ if env_path:
+ search_paths.append(env_path)
+
+ for rel in [
+ os.path.join("..", "target", "release"),
+ os.path.join("..", "target", "debug"),
+ os.path.join("..", "..", "target", "release"),
+ os.path.join("..", "..", "target", "debug"),
+ ]:
+ search_paths.append(os.path.join(pkg_dir, rel))
+
+ for d in search_paths:
+ candidate = os.path.join(d, lib_name)
+ if os.path.isfile(candidate):
+ return ctypes.CDLL(candidate)
+
+ try:
+ return ctypes.CDLL(lib_name)
+ except OSError:
+ raise OSError(
+ f"Cannot find {lib_name}. Build the native library first with "
+ f"'cargo build --release -p mosaic-ffi', or set MOSAIC_LIB_PATH "
+ f"to the directory containing {lib_name}."
+ )
+
+
+lib = _load_library()
+
+# ======================== Callback types ========================
+
+WRITE_FN = CFUNCTYPE(c_int32, c_void_p, POINTER(c_uint8), c_size_t)
+FLUSH_FN = CFUNCTYPE(c_int32, c_void_p)
+GET_POS_FN = CFUNCTYPE(c_int64, c_void_p)
+
+READ_AT_FN = CFUNCTYPE(c_int32, c_void_p, c_uint64, POINTER(c_uint8), c_size_t)
+LENGTH_FN = CFUNCTYPE(c_uint64, c_void_p)
+
+
+class MosaicOutputFile(Structure):
+ _fields_ = [
+ ("ctx", c_void_p),
+ ("write_fn", WRITE_FN),
+ ("flush_fn", FLUSH_FN),
+ ("get_pos_fn", GET_POS_FN),
+ ]
+
+
+class MosaicInputFile(Structure):
+ _fields_ = [
+ ("ctx", c_void_p),
+ ("read_at_fn", READ_AT_FN),
+ ("length_fn", LENGTH_FN),
+ ]
+
+
+class MosaicWriterOptions(Structure):
+ _fields_ = [
+ ("compression", c_uint8),
+ ("zstd_level", c_int),
+ ("num_buckets", c_uint32),
+ ("row_group_max_size", c_uint64),
+ ("max_dict_total_bytes", c_uint32),
+ ("max_dict_entries", c_uint32),
+ ("stats_columns", POINTER(c_uint32)),
+ ("num_stats_columns", c_uint32),
+ ("page_size_threshold", c_uint32),
+ ]
+
+
+# ======================== Writer Options ========================
+
+lib.mosaic_writer_options_default.argtypes = []
+lib.mosaic_writer_options_default.restype = MosaicWriterOptions
+
+# ======================== Writer ========================
+
+lib.mosaic_writer_open.argtypes = [MosaicOutputFile, c_void_p,
MosaicWriterOptions]
+lib.mosaic_writer_open.restype = c_void_p
+
+lib.mosaic_writer_close.argtypes = [c_void_p]
+lib.mosaic_writer_close.restype = c_int
+
+lib.mosaic_writer_free.argtypes = [c_void_p]
+lib.mosaic_writer_free.restype = None
+
+lib.mosaic_writer_estimated_file_size.argtypes = [c_void_p, POINTER(c_int64)]
+lib.mosaic_writer_estimated_file_size.restype = c_int
+
+lib.mosaic_writer_write_batch.argtypes = [c_void_p, c_void_p, c_void_p]
+lib.mosaic_writer_write_batch.restype = c_int
+
+# ======================== Reader ========================
+
+lib.mosaic_reader_open.argtypes = [MosaicInputFile]
+lib.mosaic_reader_open.restype = c_void_p
+
+lib.mosaic_reader_free.argtypes = [c_void_p]
+lib.mosaic_reader_free.restype = None
+
+lib.mosaic_reader_export_schema.argtypes = [c_void_p, c_void_p]
+lib.mosaic_reader_export_schema.restype = c_int
+
+lib.mosaic_reader_num_row_groups.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_reader_num_row_groups.restype = c_int
+
+# ======================== Row Group Reader ========================
+
+lib.mosaic_reader_open_row_group.argtypes = [c_void_p, c_uint32]
+lib.mosaic_reader_open_row_group.restype = c_void_p
+
+lib.mosaic_reader_open_row_group_projected.argtypes = [
+ c_void_p, c_uint32, POINTER(c_uint32), c_uint32,
+]
+lib.mosaic_reader_open_row_group_projected.restype = c_void_p
+
+lib.mosaic_row_group_reader_free.argtypes = [c_void_p]
+lib.mosaic_row_group_reader_free.restype = None
+
+lib.mosaic_row_group_reader_num_rows.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_row_group_reader_num_rows.restype = c_int
+
+# ======================== Record Batch (Arrow C Data Interface)
========================
+
+lib.mosaic_row_group_reader_read_columns.argtypes = [c_void_p]
+lib.mosaic_row_group_reader_read_columns.restype = c_void_p
+
+lib.mosaic_record_batch_num_rows.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_record_batch_num_rows.restype = c_int
+
+lib.mosaic_record_batch_num_columns.argtypes = [c_void_p, POINTER(c_uint32)]
+lib.mosaic_record_batch_num_columns.restype = c_int
+
+lib.mosaic_record_batch_export.argtypes = [c_void_p, c_void_p, c_void_p]
+lib.mosaic_record_batch_export.restype = c_int
+
+lib.mosaic_record_batch_free.argtypes = [c_void_p]
+lib.mosaic_record_batch_free.restype = None
+
+# ======================== Row Group Stats ========================
+
+lib.mosaic_reader_row_group_num_stats.argtypes = [c_void_p, c_uint32,
POINTER(c_uint32)]
+lib.mosaic_reader_row_group_num_stats.restype = c_int
+
+lib.mosaic_reader_row_group_stat_column_index.argtypes = [c_void_p, c_uint32,
c_uint32, POINTER(c_uint32)]
+lib.mosaic_reader_row_group_stat_column_index.restype = c_int
+
+lib.mosaic_reader_row_group_stat_null_count.argtypes = [c_void_p, c_uint32,
c_uint32, POINTER(c_uint64)]
+lib.mosaic_reader_row_group_stat_null_count.restype = c_int
+
+lib.mosaic_reader_row_group_stat_min.argtypes = [c_void_p, c_uint32, c_uint32,
POINTER(c_size_t)]
+lib.mosaic_reader_row_group_stat_min.restype = POINTER(c_uint8)
+
+lib.mosaic_reader_row_group_stat_max.argtypes = [c_void_p, c_uint32, c_uint32,
POINTER(c_size_t)]
+lib.mosaic_reader_row_group_stat_max.restype = POINTER(c_uint8)
+
+# ======================== Error ========================
+
+lib.mosaic_last_error.argtypes = []
+lib.mosaic_last_error.restype = c_char_p
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
new file mode 100644
index 0000000..857d12d
--- /dev/null
+++ b/python/mosaic/mosaic.py
@@ -0,0 +1,372 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import ctypes
+
+import pyarrow as pa
+
+from . import _ffi
+from ._ffi import lib
+
+
+class _ArrowSchema(ctypes.Structure):
+ _fields_ = [
+ ("format", ctypes.c_char_p),
+ ("name", ctypes.c_char_p),
+ ("metadata", ctypes.c_char_p),
+ ("flags", ctypes.c_int64),
+ ("n_children", ctypes.c_int64),
+ ("children", ctypes.c_void_p),
+ ("dictionary", ctypes.c_void_p),
+ ("release", ctypes.c_void_p),
+ ("private_data", ctypes.c_void_p),
+ ]
+
+
+class _ArrowArray(ctypes.Structure):
+ _fields_ = [
+ ("length", ctypes.c_int64),
+ ("null_count", ctypes.c_int64),
+ ("offset", ctypes.c_int64),
+ ("n_buffers", ctypes.c_int64),
+ ("n_children", ctypes.c_int64),
+ ("buffers", ctypes.c_void_p),
+ ("children", ctypes.c_void_p),
+ ("dictionary", ctypes.c_void_p),
+ ("release", ctypes.c_void_p),
+ ("private_data", ctypes.c_void_p),
+ ]
+
+
+def _check_error(msg="operation failed"):
+ err = lib.mosaic_last_error()
+ if err:
+ raise RuntimeError(err.decode("utf-8", errors="replace"))
+ raise RuntimeError(msg)
+
+
+class WriterOptions:
+ COMPRESSION_NONE = 0
+ COMPRESSION_ZSTD = 1
+
+ def __init__(
+ self,
+ compression=1,
+ zstd_level=1,
+ num_buckets=0,
+ row_group_max_size=256 * 1024 * 1024,
+ max_dict_total_bytes=32 * 1024,
+ max_dict_entries=255,
+ stats_columns=None,
+ page_size_threshold=32 * 1024,
+ ):
+ self.compression = compression
+ self.zstd_level = zstd_level
+ self.num_buckets = num_buckets
+ self.row_group_max_size = row_group_max_size
+ self.max_dict_total_bytes = max_dict_total_bytes
+ self.max_dict_entries = max_dict_entries
+ self.stats_columns = stats_columns or []
+ self.page_size_threshold = page_size_threshold
+
+ def _to_ffi(self):
+ opts = _ffi.MosaicWriterOptions()
+ opts.compression = self.compression
+ opts.zstd_level = self.zstd_level
+ opts.num_buckets = self.num_buckets
+ opts.row_group_max_size = self.row_group_max_size
+ opts.max_dict_total_bytes = self.max_dict_total_bytes
+ opts.max_dict_entries = self.max_dict_entries
+ refs = []
+ if self.stats_columns:
+ arr = (ctypes.c_uint32 *
len(self.stats_columns))(*self.stats_columns)
+ refs.append(arr)
+ opts.stats_columns = arr
+ opts.num_stats_columns = len(self.stats_columns)
+ else:
+ opts.stats_columns = None
+ opts.num_stats_columns = 0
+ opts.page_size_threshold = self.page_size_threshold
+ return opts, refs
+
+
+class MosaicWriter:
+ def __init__(self, stream, schema, options=None):
+ if not isinstance(schema, pa.Schema):
+ raise TypeError(f"expected pyarrow.Schema, got {type(schema)}")
+
+ self._stream = stream
+ self._closed = False
+
+ self._write_callback = _ffi.WRITE_FN(self._on_write)
+ self._flush_callback = _ffi.FLUSH_FN(self._on_flush)
+ self._get_pos_callback = _ffi.GET_POS_FN(self._on_get_pos)
+ self._pos = 0
+
+ c_stream = _ffi.MosaicOutputFile()
+ c_stream.ctx = None
+ c_stream.write_fn = self._write_callback
+ c_stream.flush_fn = self._flush_callback
+ c_stream.get_pos_fn = self._get_pos_callback
+
+ c_opts, opts_refs = options._to_ffi() if options else
WriterOptions()._to_ffi()
+
+ c_schema = _ArrowSchema()
+ schema_ptr = ctypes.addressof(c_schema)
+ schema._export_to_c(schema_ptr)
+
+ self._handle = lib.mosaic_writer_open(c_stream,
ctypes.c_void_p(schema_ptr), c_opts)
+ del opts_refs
+ if not self._handle:
+ _check_error("failed to open writer")
+
+ def _on_write(self, ctx, data, length):
+ try:
+ buf = (ctypes.c_char * length).from_address(ctypes.cast(data,
ctypes.c_void_p).value)
+ self._stream.write(buf)
+ self._pos += length
+ return 0
+ except Exception:
+ return -1
+
+ def _on_flush(self, ctx):
+ try:
+ self._stream.flush()
+ return 0
+ except Exception:
+ return -1
+
+ def _on_get_pos(self, ctx):
+ return self._pos
+
+ def write(self, data):
+ if isinstance(data, pa.Table):
+ for record_batch in data.to_batches():
+ self._write_single_batch(record_batch)
+ elif isinstance(data, pa.RecordBatch):
+ self._write_single_batch(data)
+ else:
+ raise TypeError(f"expected pyarrow.RecordBatch or pyarrow.Table,
got {type(data)}")
+
+ def _write_single_batch(self, batch):
+ c_schema = _ArrowSchema()
+ c_array = _ArrowArray()
+ schema_ptr = ctypes.addressof(c_schema)
+ array_ptr = ctypes.addressof(c_array)
+ batch._export_to_c(array_ptr, schema_ptr)
+ rc = lib.mosaic_writer_write_batch(
+ self._handle,
+ ctypes.c_void_p(array_ptr),
+ ctypes.c_void_p(schema_ptr),
+ )
+ if rc != 0:
+ _check_error("write_batch failed")
+
+ def estimated_file_size(self):
+ out = ctypes.c_int64(0)
+ rc = lib.mosaic_writer_estimated_file_size(self._handle,
ctypes.byref(out))
+ if rc != 0:
+ _check_error("estimated_file_size failed")
+ return out.value
+
+ def close(self):
+ if not self._closed and self._handle:
+ self._closed = True
+ rc = lib.mosaic_writer_close(self._handle)
+ lib.mosaic_writer_free(self._handle)
+ self._handle = None
+ if rc != 0:
+ _check_error("close failed")
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def __del__(self):
+ self.close()
+
+
+class ColumnStatistics:
+ def __init__(self, column_index, null_count, min, max):
+ self.column_index = column_index
+ self.null_count = null_count
+ self.min = min
+ self.max = max
+
+ @property
+ def has_min_max(self):
+ return self.min is not None
+
+
+class MosaicReader:
+ def __init__(self, handle, refs=None):
+ self._handle = handle
+ self._refs = refs
+ c_schema = _ArrowSchema()
+ schema_ptr = ctypes.addressof(c_schema)
+ rc = lib.mosaic_reader_export_schema(handle,
ctypes.c_void_p(schema_ptr))
+ if rc != 0:
+ _check_error("export_schema failed")
+ self._schema = pa.Schema._import_from_c(schema_ptr)
+
+ @staticmethod
+ def from_input_file(read_at_fn, file_length):
+ """Create a MosaicReader from a callable and file length.
+
+ ``read_at_fn(offset, length) -> bytes`` must be thread-safe: the
+ reader may call it concurrently from multiple threads to perform
+ parallel IO.
+ """
+ @_ffi.READ_AT_FN
+ def c_read_at(ctx, offset, buf, length):
+ try:
+ data = read_at_fn(offset, length)
+ if len(data) != length:
+ return -1
+ ctypes.memmove(buf, data, length)
+ return 0
+ except Exception:
+ return -1
+
+ @_ffi.LENGTH_FN
+ def c_length(ctx):
+ return file_length
+
+ input_file = _ffi.MosaicInputFile()
+ input_file.ctx = None
+ input_file.read_at_fn = c_read_at
+ input_file.length_fn = c_length
+
+ handle = lib.mosaic_reader_open(input_file)
+ if not handle:
+ _check_error("failed to open reader")
+ return MosaicReader(handle, refs=(c_read_at, c_length, input_file))
+
+ @property
+ def schema(self):
+ return self._schema
+
+ @property
+ def num_row_groups(self):
+ out = ctypes.c_uint32(0)
+ rc = lib.mosaic_reader_num_row_groups(self._handle, ctypes.byref(out))
+ if rc != 0:
+ _check_error("num_row_groups failed")
+ return out.value
+
+ def read_row_group(self, rg_index, columns=None):
+ if columns is not None:
+ arr = (ctypes.c_uint32 * len(columns))(*columns)
+ rg_handle = lib.mosaic_reader_open_row_group_projected(
+ self._handle, rg_index, arr, len(columns),
+ )
+ else:
+ rg_handle = lib.mosaic_reader_open_row_group(self._handle,
rg_index)
+ if not rg_handle:
+ _check_error(f"failed to open row group {rg_index}")
+ rb_handle = lib.mosaic_row_group_reader_read_columns(rg_handle)
+ lib.mosaic_row_group_reader_free(rg_handle)
+ if not rb_handle:
+ _check_error("read_columns failed")
+ try:
+ c_schema = _ArrowSchema()
+ c_array = _ArrowArray()
+ schema_ptr = ctypes.addressof(c_schema)
+ array_ptr = ctypes.addressof(c_array)
+ rc = lib.mosaic_record_batch_export(
+ rb_handle,
+ ctypes.c_void_p(array_ptr),
+ ctypes.c_void_p(schema_ptr),
+ )
+ if rc != 0:
+ _check_error("record_batch_export failed")
+ return pa.RecordBatch._import_from_c(array_ptr, schema_ptr)
+ finally:
+ lib.mosaic_record_batch_free(rb_handle)
+
+ def read_all(self, columns=None):
+ batches = []
+ for rg in range(self.num_row_groups):
+ batches.append(self.read_row_group(rg, columns=columns))
+ if batches:
+ return pa.Table.from_batches(batches, schema=batches[0].schema)
+ schema = self._schema
+ if columns is not None:
+ schema = pa.schema([self._schema.field(i) for i in columns])
+ return pa.Table.from_batches([], schema=schema)
+
+ def get_row_group_statistics(self, rg_index):
+ n_out = ctypes.c_uint32(0)
+ rc = lib.mosaic_reader_row_group_num_stats(self._handle, rg_index,
ctypes.byref(n_out))
+ if rc != 0:
+ _check_error("row_group_num_stats failed")
+ n = n_out.value
+ result = []
+ for i in range(n):
+ col_idx_out = ctypes.c_uint32(0)
+ rc = lib.mosaic_reader_row_group_stat_column_index(
+ self._handle, rg_index, i, ctypes.byref(col_idx_out)
+ )
+ if rc != 0:
+ _check_error("stat_column_index failed")
+ col_idx = col_idx_out.value
+ null_count_out = ctypes.c_uint64(0)
+ rc = lib.mosaic_reader_row_group_stat_null_count(
+ self._handle, rg_index, i, ctypes.byref(null_count_out)
+ )
+ if rc != 0:
+ _check_error("stat_null_count failed")
+ null_count = null_count_out.value
+ out_len = ctypes.c_size_t(0)
+ ptr = lib.mosaic_reader_row_group_stat_min(self._handle, rg_index,
i, ctypes.byref(out_len))
+ min_val = ctypes.string_at(ptr, out_len.value) if ptr and
out_len.value > 0 else None
+ out_len = ctypes.c_size_t(0)
+ ptr = lib.mosaic_reader_row_group_stat_max(self._handle, rg_index,
i, ctypes.byref(out_len))
+ max_val = ctypes.string_at(ptr, out_len.value) if ptr and
out_len.value > 0 else None
+ result.append(ColumnStatistics(col_idx, null_count, min_val,
max_val))
+ return result
+
+ def close(self):
+ if self._handle:
+ lib.mosaic_reader_free(self._handle)
+ self._handle = None
+ if self._refs and isinstance(self._refs, tuple) and
hasattr(self._refs[0], "close"):
+ self._refs[0].close()
+ self._refs = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def __del__(self):
+ self.close()
+
+
+def write_table(table, stream, options=None):
+ if not isinstance(table, pa.Table):
+ raise TypeError(f"expected pyarrow.Table, got {type(table)}")
+ with MosaicWriter(stream, table.schema, options) as writer:
+ writer.write(table)
+
+
+def read_table(read_at_fn, file_length, columns=None):
+ with MosaicReader.from_input_file(read_at_fn, file_length) as reader:
+ return reader.read_all(columns=columns)
diff --git a/python/pyproject.toml b/python/pyproject.toml
new file mode 100644
index 0000000..893b319
--- /dev/null
+++ b/python/pyproject.toml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[build-system]
+requires = ["setuptools>=64"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "mosaic-format"
+version = "0.1.0"
+description = "Python bindings for the Mosaic columnar-bucket hybrid file
format"
+license = {text = "Apache-2.0"}
+requires-python = ">=3.9"
+dependencies = ["pyarrow"]
+
+[project.optional-dependencies]
+test = ["pytest"]
+
+[tool.setuptools.packages.find]
+include = ["mosaic*"]
+
+[tool.setuptools.package-data]
+mosaic = ["libmosaic_ffi.dylib", "libmosaic_ffi.so", "mosaic_ffi.dll"]
diff --git a/python/setup.py b/python/setup.py
new file mode 100644
index 0000000..9c8724f
--- /dev/null
+++ b/python/setup.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Build helper: copies the pre-built native library into the package
directory."""
+
+import os
+import platform
+import shutil
+
+from setuptools import setup
+from setuptools.command.build_py import build_py
+
+
+def _lib_name():
+ system = platform.system()
+ if system == "Darwin":
+ return "libmosaic_ffi.dylib"
+ elif system == "Windows":
+ return "mosaic_ffi.dll"
+ return "libmosaic_ffi.so"
+
+
+def _find_native_lib():
+ here = os.path.dirname(os.path.abspath(__file__))
+ lib = _lib_name()
+
+ env_path = os.environ.get("MOSAIC_LIB_PATH")
+ if env_path:
+ candidate = os.path.join(env_path, lib)
+ if os.path.isfile(candidate):
+ return candidate
+
+ for profile in ["release", "debug"]:
+ candidate = os.path.join(here, "..", "target", profile, lib)
+ if os.path.isfile(candidate):
+ return candidate
+
+ return None
+
+
+class BuildPyWithNativeLib(build_py):
+ def run(self):
+ src = _find_native_lib()
+ if src:
+ dst = os.path.join(
+ os.path.dirname(os.path.abspath(__file__)), "mosaic",
_lib_name()
+ )
+ shutil.copy2(src, dst)
+ super().run()
+
+
+setup(cmdclass={"build_py": BuildPyWithNativeLib})
diff --git a/python/tests/__init__.py b/python/tests/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/python/tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
new file mode 100644
index 0000000..4f188ff
--- /dev/null
+++ b/python/tests/test_mosaic.py
@@ -0,0 +1,578 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+import struct
+
+import pyarrow as pa
+import pytest
+
+from mosaic import (
+ ColumnStatistics,
+ MosaicReader,
+ MosaicWriter,
+ WriterOptions,
+ read_table,
+ write_table,
+)
+
+
+def _write_to_bytes(pa_schema, data, options=None):
+ buf = io.BytesIO()
+ with MosaicWriter(buf, pa_schema, options) as writer:
+ writer.write(data)
+ return buf.getvalue()
+
+
+def _reader_from_bytes(data):
+ return MosaicReader.from_input_file(
+ lambda offset, length: data[offset : offset + length], len(data)
+ )
+
+
+class TestRoundtrip:
+ def test_basic_roundtrip(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32(), nullable=False),
+ pa.field("name", pa.utf8()),
+ pa.field("score", pa.float64()),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(50)), type=pa.int32()),
+ pa.array([f"user_{i}" for i in range(50)]),
+ pa.array([i * 1.5 for i in range(50)]),
+ ],
+ names=["id", "name", "score"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+ assert len(data) > 32
+ assert data[-4:] == b"MOSA"
+
+ with _reader_from_bytes(data) as reader:
+ assert reader.num_row_groups >= 1
+
+ total_rows = 0
+ for rg in range(reader.num_row_groups):
+ rb = reader.read_row_group(rg)
+ total_rows += rb.num_rows
+
+ ids = rb.column("id").to_pylist()
+ names = rb.column("name").to_pylist()
+ scores = rb.column("score").to_pylist()
+
+ for j in range(rb.num_rows):
+ idx = ids[j]
+ assert names[j] == f"user_{idx}"
+ assert abs(scores[j] - idx * 1.5) < 1e-9
+
+ assert total_rows == 50
+
+ def test_null_values(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.utf8()),
+ pa.field("value", pa.float64()),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([1, 2, 3, 4], type=pa.int32()),
+ pa.array(["hello", None, "world", None]),
+ pa.array([1.0, 2.0, None, None]),
+ ],
+ names=["id", "name", "value"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.num_rows == 4
+
+ names = rb.column("name")
+ values = rb.column("value")
+
+ assert names[0].as_py() == "hello"
+ assert names[1].as_py() is None
+ assert names[2].as_py() == "world"
+ assert names[3].as_py() is None
+
+ assert values[0].as_py() == 1.0
+ assert values[1].as_py() == 2.0
+ assert values[2].as_py() is None
+ assert values[3].as_py() is None
+
+ def test_compression_none(self):
+ pa_schema = pa.schema(
+ [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+ )
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(20)), type=pa.int32()),
+ pa.array([f"v_{i}" for i in range(20)]),
+ ],
+ names=["x", "y"],
+ )
+ opts = WriterOptions(compression=WriterOptions.COMPRESSION_NONE)
+ data = _write_to_bytes(pa_schema, batch, opts)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.num_rows == 20
+ assert rb.column("x").to_pylist() == list(range(20))
+
+ def test_compression_zstd(self):
+ pa_schema = pa.schema(
+ [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+ )
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(100)), type=pa.int32()),
+ pa.array([f"v_{i}" for i in range(100)]),
+ ],
+ names=["x", "y"],
+ )
+ opts = WriterOptions(compression=WriterOptions.COMPRESSION_ZSTD,
zstd_level=3)
+ data = _write_to_bytes(pa_schema, batch, opts)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.num_rows == 100
+ assert rb.column("x").to_pylist() == list(range(100))
+
+ def test_all_types(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("f_bool", pa.bool_()),
+ pa.field("f_int8", pa.int8()),
+ pa.field("f_int16", pa.int16()),
+ pa.field("f_int32", pa.int32()),
+ pa.field("f_int64", pa.int64()),
+ pa.field("f_float32", pa.float32()),
+ pa.field("f_float64", pa.float64()),
+ pa.field("f_utf8", pa.utf8()),
+ pa.field("f_binary", pa.binary()),
+ pa.field("f_decimal", pa.decimal128(10, 2)),
+ pa.field("f_date", pa.date32()),
+ pa.field("f_timestamp", pa.timestamp("ms")),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([True, False]),
+ pa.array([42, -1], type=pa.int8()),
+ pa.array([1234, -5678], type=pa.int16()),
+ pa.array([100000, -200000], type=pa.int32()),
+ pa.array([9999999999, -9999999999], type=pa.int64()),
+ pa.array([3.14, -2.71], type=pa.float32()),
+ pa.array([2.718281828, -3.141592653]),
+ pa.array(["hello", "world"]),
+ pa.array([b"\x01\x02\x03", b"\xff\x00"], type=pa.binary()),
+ pa.array([1234567, -9876543], type=pa.decimal128(10, 2)),
+ pa.array([19000, 0], type=pa.date32()),
+ pa.array([1700000000000, 0], type=pa.timestamp("ms")),
+ ],
+ names=[
+ "f_bool", "f_int8", "f_int16", "f_int32", "f_int64",
+ "f_float32", "f_float64", "f_utf8", "f_binary",
+ "f_decimal", "f_date", "f_timestamp",
+ ],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.num_rows == 2
+
+ assert rb.column("f_bool").to_pylist() == [True, False]
+ assert rb.column("f_int8").to_pylist() == [42, -1]
+ assert rb.column("f_int16").to_pylist() == [1234, -5678]
+ assert rb.column("f_int32").to_pylist() == [100000, -200000]
+ assert rb.column("f_int64").to_pylist() == [9999999999,
-9999999999]
+ assert rb.column("f_utf8").to_pylist() == ["hello", "world"]
+ assert rb.column("f_binary").to_pylist() == [b"\x01\x02\x03",
b"\xff\x00"]
+
+ f32 = rb.column("f_float32").to_pylist()
+ assert abs(f32[0] - 3.14) < 1e-5
+ assert abs(f32[1] - (-2.71)) < 1e-5
+
+ f64 = rb.column("f_float64").to_pylist()
+ assert abs(f64[0] - 2.718281828) < 1e-9
+ assert abs(f64[1] - (-3.141592653)) < 1e-9
+
+ def test_multiple_row_groups(self):
+ pa_schema = pa.schema(
+ [pa.field("id", pa.int32()), pa.field("data", pa.int64())]
+ )
+
+ opts = WriterOptions(
+ compression=WriterOptions.COMPRESSION_NONE,
+ num_buckets=1,
+ row_group_max_size=200,
+ )
+ buf = io.BytesIO()
+ with MosaicWriter(buf, pa_schema, opts) as writer:
+ for start in range(0, 500, 50):
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(start, start + 50)),
type=pa.int32()),
+ pa.array(
+ [i * 3 for i in range(start, start + 50)],
+ type=pa.int64(),
+ ),
+ ],
+ names=["id", "data"],
+ )
+ writer.write(batch)
+ data = buf.getvalue()
+
+ with _reader_from_bytes(data) as reader:
+ assert reader.num_row_groups > 1
+
+ offset = 0
+ for rg in range(reader.num_row_groups):
+ rb = reader.read_row_group(rg)
+ ids = rb.column("id").to_pylist()
+ datas = rb.column("data").to_pylist()
+ for j in range(rb.num_rows):
+ assert ids[j] == offset + j
+ assert datas[j] == (offset + j) * 3
+ offset += rb.num_rows
+
+ assert offset == 500
+
+ def test_multiple_writes(self):
+ pa_schema = pa.schema(
+ [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+ )
+
+ buf = io.BytesIO()
+ with MosaicWriter(buf, pa_schema) as writer:
+ for start in [0, 10, 20]:
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(start, start + 10)),
type=pa.int32()),
+ pa.array([f"r_{i}" for i in range(start, start + 10)]),
+ ],
+ names=["x", "y"],
+ )
+ writer.write(batch)
+
+ data = buf.getvalue()
+ with _reader_from_bytes(data) as reader:
+ table = reader.read_all()
+ assert table.num_rows == 30
+ xs = table.column("x").to_pylist()
+ assert xs == list(range(30))
+
+ def test_single_row(self):
+ pa_schema = pa.schema([pa.field("v", pa.int32())])
+ batch = pa.record_batch(
+ [pa.array([42], type=pa.int32())], names=["v"]
+ )
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ rb = reader.read_row_group(0)
+ assert rb.num_rows == 1
+ assert rb.column("v")[0].as_py() == 42
+
+ def test_zero_rows(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("v", pa.int32(), nullable=False),
+ pa.field("s", pa.utf8(), nullable=True),
+ ]
+ )
+ batch = pa.record_batch(
+ [pa.array([], type=pa.int32()), pa.array([], type=pa.utf8())],
+ names=["v", "s"],
+ )
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ table = reader.read_all()
+ assert table.num_rows == 0
+ assert table.schema.names == ["v", "s"]
+ assert table.schema.field("v").type == pa.int32()
+ assert table.schema.field("s").type == pa.utf8()
+ assert not table.schema.field("v").nullable
+ assert table.schema.field("s").nullable
+
+
+class TestProjection:
+ def test_projection_subset(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("a", pa.int32()),
+ pa.field("b", pa.utf8()),
+ pa.field("c", pa.float64()),
+ pa.field("d", pa.utf8()),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(20)), type=pa.int32()),
+ pa.array([f"val_{i}" for i in range(20)]),
+ pa.array([float(i) for i in range(20)]),
+ pa.array([f"extra_{i}" for i in range(20)]),
+ ],
+ names=["a", "b", "c", "d"],
+ )
+
+ opts = WriterOptions(num_buckets=2)
+ data = _write_to_bytes(pa_schema, batch, opts)
+
+ with _reader_from_bytes(data) as reader:
+ a_col = reader.schema.get_field_index("a")
+ b_col = reader.schema.get_field_index("b")
+
+ total_rows = 0
+ for rg in range(reader.num_row_groups):
+ rb = reader.read_row_group(rg, columns=[a_col, b_col])
+ assert rb.num_columns == 2
+ total_rows += rb.num_rows
+
+ assert total_rows == 20
+
+ def test_projection_single_column(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("a", pa.int32()),
+ pa.field("b", pa.utf8()),
+ pa.field("c", pa.float64()),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(10)), type=pa.int32()),
+ pa.array([f"v_{i}" for i in range(10)]),
+ pa.array([float(i) for i in range(10)]),
+ ],
+ names=["a", "b", "c"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ b_col = reader.schema.get_field_index("b")
+ rb = reader.read_row_group(0, columns=[b_col])
+ assert rb.num_columns == 1
+ assert rb.num_rows == 10
+
+
+class TestSchema:
+ def test_schema_roundtrip(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32(), nullable=False),
+ pa.field("name", pa.utf8(), nullable=True),
+ pa.field("score", pa.float64(), nullable=True),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([1], type=pa.int32()),
+ pa.array(["x"]),
+ pa.array([1.0]),
+ ],
+ names=["id", "name", "score"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ s = reader.schema
+ assert len(s) == 3
+ assert s.field("id").type == pa.int32()
+ assert s.field("name").type == pa.utf8()
+ assert s.field("score").type == pa.float64()
+ assert not s.field("id").nullable
+ assert s.field("name").nullable
+
+
+class TestStatistics:
+ def test_stats_basic(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.utf8()),
+ pa.field("score", pa.float64()),
+ ]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([i * 10 for i in range(10)], type=pa.int32()),
+ pa.array([f"item_{i}" for i in range(10)]),
+ pa.array([i * 1.1 for i in range(10)]),
+ ],
+ names=["id", "name", "score"],
+ )
+
+ opts = WriterOptions(stats_columns=[0, 2])
+ data = _write_to_bytes(pa_schema, batch, opts)
+
+ with _reader_from_bytes(data) as reader:
+ for rg in range(reader.num_row_groups):
+ stats = reader.get_row_group_statistics(rg)
+ assert len(stats) > 0
+ for stat in stats:
+ assert isinstance(stat, ColumnStatistics)
+ assert stat.column_index in (0, 2)
+ assert stat.null_count == 0
+ assert stat.has_min_max
+ assert stat.min is not None
+ assert stat.max is not None
+ assert len(stat.min) > 0
+ assert len(stat.max) > 0
+
+ id_stat = next(s for s in stats if s.column_index == 0)
+ min_id = struct.unpack(">i", id_stat.min)[0]
+ max_id = struct.unpack(">i", id_stat.max)[0]
+ assert min_id == 0
+ assert max_id == 90
+
+ def test_stats_with_nulls(self):
+ pa_schema = pa.schema(
+ [pa.field("a", pa.int32()), pa.field("b", pa.int64())]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array([10, None, 5, 20], type=pa.int32()),
+ pa.array([None, None, 100, 50], type=pa.int64()),
+ ],
+ names=["a", "b"],
+ )
+
+ opts = WriterOptions(stats_columns=[0, 1], num_buckets=1)
+ data = _write_to_bytes(pa_schema, batch, opts)
+
+ with _reader_from_bytes(data) as reader:
+ stats = reader.get_row_group_statistics(0)
+ assert len(stats) == 2
+
+ a_stat = next(s for s in stats if s.column_index == 0)
+ assert a_stat.null_count == 1
+ assert a_stat.has_min_max
+ min_a = struct.unpack(">i", a_stat.min)[0]
+ max_a = struct.unpack(">i", a_stat.max)[0]
+ assert min_a == 5
+ assert max_a == 20
+
+ b_stat = next(s for s in stats if s.column_index == 1)
+ assert b_stat.null_count == 2
+ assert b_stat.has_min_max
+ min_b = struct.unpack(">q", b_stat.min)[0]
+ max_b = struct.unpack(">q", b_stat.max)[0]
+ assert min_b == 50
+ assert max_b == 100
+
+ def test_stats_all_null(self):
+ pa_schema = pa.schema([pa.field("x", pa.int32())])
+
+ batch = pa.record_batch(
+ [pa.array([None, None, None], type=pa.int32())], names=["x"]
+ )
+
+ opts = WriterOptions(stats_columns=[0], num_buckets=1)
+ data = _write_to_bytes(pa_schema, batch, opts)
+
+ with _reader_from_bytes(data) as reader:
+ stats = reader.get_row_group_statistics(0)
+ assert len(stats) == 1
+ assert stats[0].null_count == 3
+ assert not stats[0].has_min_max
+ assert stats[0].min is None
+ assert stats[0].max is None
+
+
+class TestConvenience:
+ def test_write_table_read_table(self):
+ table = pa.table(
+ {
+ "id": pa.array(list(range(30)), type=pa.int32()),
+ "name": pa.array([f"user_{i}" for i in range(30)]),
+ }
+ )
+
+ buf = io.BytesIO()
+ write_table(table, buf)
+
+ data = buf.getvalue()
+ result = read_table(
+ lambda offset, length: data[offset : offset + length], len(data)
+ )
+
+ assert result.num_rows == 30
+ assert result.column("id").to_pylist() == list(range(30))
+ assert result.column("name").to_pylist() == [f"user_{i}" for i in
range(30)]
+
+ def test_read_all(self):
+ pa_schema = pa.schema(
+ [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(25)), type=pa.int32()),
+ pa.array([f"row_{i}" for i in range(25)]),
+ ],
+ names=["x", "y"],
+ )
+
+ data = _write_to_bytes(pa_schema, batch)
+
+ with _reader_from_bytes(data) as reader:
+ table = reader.read_all()
+ assert isinstance(table, pa.Table)
+ assert table.num_rows == 25
+ assert table.column("x").to_pylist() == list(range(25))
+
+
+class TestWriter:
+ def test_estimated_file_size(self):
+ pa_schema = pa.schema(
+ [pa.field("x", pa.int32()), pa.field("y", pa.utf8())]
+ )
+
+ batch = pa.record_batch(
+ [
+ pa.array(list(range(100)), type=pa.int32()),
+ pa.array([f"value_{i}" for i in range(100)]),
+ ],
+ names=["x", "y"],
+ )
+
+ buf = io.BytesIO()
+ with MosaicWriter(buf, pa_schema) as writer:
+ writer.write(batch)
+ est = writer.estimated_file_size()
+ assert est > 0