This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 1794ef2 fix: preserve Python projection schema for empty reads (#28)
1794ef2 is described below
commit 1794ef25915708f4828162fd07210cb36bb9cfb1
Author: QuakeWang <[email protected]>
AuthorDate: Thu May 21 15:24:11 2026 +0800
fix: preserve Python projection schema for empty reads (#28)
---
python/mosaic/mosaic.py | 20 +++++++++++++++----
python/tests/test_mosaic.py | 48 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 64 insertions(+), 4 deletions(-)
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 91c62ac..b08096d 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -284,6 +284,7 @@ class MosaicReader:
if rc != 0:
_check_error("export_schema failed")
self._schema = pa.Schema._import_from_c(schema_ptr)
+ self._projected_schema = None
@staticmethod
def from_input_file(read_at_fn, file_length):
@@ -332,11 +333,17 @@ class MosaicReader:
def project(self, columns):
"""Set projection on the reader. Subsequent reads only return the
named columns."""
- c_strs = [c.encode("utf-8") for c in columns]
- arr = (ctypes.c_char_p * len(columns))(*c_strs)
- rc = lib.mosaic_reader_set_projection(self._handle, arr, len(columns))
+ column_names = list(columns)
+ c_strs = [c.encode("utf-8") for c in column_names]
+ arr = (ctypes.c_char_p * len(column_names))(*c_strs)
+ rc = lib.mosaic_reader_set_projection(self._handle, arr,
len(column_names))
if rc != 0:
_check_error("set_projection failed")
+ projected_field_names = list(dict.fromkeys(column_names))
+ self._projected_schema = pa.schema(
+ [self._schema.field(name) for name in projected_field_names],
+ metadata=self._schema.metadata,
+ )
def read_row_group(self, rg_index):
rg_handle = lib.mosaic_reader_open_row_group(self._handle, rg_index)
@@ -368,7 +375,12 @@ class MosaicReader:
batches.append(self.read_row_group(rg))
if batches:
return pa.Table.from_batches(batches, schema=batches[0].schema)
- return pa.Table.from_batches([], schema=self._schema)
+ schema = (
+ self._projected_schema
+ if self._projected_schema is not None
+ else self._schema
+ )
+ return pa.Table.from_batches([], schema=schema)
def row_group_num_rows(self, rg_index):
out = ctypes.c_uint32(0)
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index a333ea4..00e68b7 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -679,6 +679,54 @@ class TestConvenience:
assert result.num_rows == 30
assert result.schema.names == []
+ def test_read_table_projection_with_zero_row_groups(self):
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("name", pa.utf8()),
+ ]
+ )
+
+ buf = io.BytesIO()
+ with MosaicWriter(buf, pa_schema):
+ pass
+
+ data = buf.getvalue()
+
+ empty_projection = read_table(
+ lambda offset, length: data[offset : offset + length],
+ len(data),
+ columns=[],
+ )
+ assert empty_projection.num_rows == 0
+ assert empty_projection.num_columns == 0
+ assert empty_projection.schema.names == []
+
+ name_projection = read_table(
+ lambda offset, length: data[offset : offset + length],
+ len(data),
+ columns=["name"],
+ )
+ assert name_projection.num_rows == 0
+ assert name_projection.num_columns == 1
+ assert name_projection.schema.names == ["name"]
+
+ duplicate_projection = read_table(
+ lambda offset, length: data[offset : offset + length],
+ len(data),
+ columns=["name", "name"],
+ )
+ assert duplicate_projection.num_rows == 0
+ assert duplicate_projection.num_columns == 1
+ assert duplicate_projection.schema.names == ["name"]
+
+ with _reader_from_bytes(data) as reader:
+ assert reader.num_row_groups == 0
+ reader.project(["name"])
+ assert reader.schema.names == ["id", "name"]
+ result = reader.read_all()
+ assert result.schema.names == ["name"]
+
def test_read_all(self):
pa_schema = pa.schema(
[pa.field("x", pa.int32()), pa.field("y", pa.utf8())]