This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fd60a8a073 [python] Add nested-field projection on primary-key 
merge-read path (#7801)
fd60a8a073 is described below

commit fd60a8a0737bc8f82cfb4bdf406aa3ee7d6609f0
Author: chaoyang <[email protected]>
AuthorDate: Sun May 10 21:44:14 2026 +0800

    [python] Add nested-field projection on primary-key merge-read path (#7801)
---
 .../read/reader/outer_projection_record_reader.py  | 137 ++++++++++++++
 paimon-python/pypaimon/read/split_read.py          |  55 +++++-
 paimon-python/pypaimon/read/table_read.py          |  32 +++-
 .../pypaimon/tests/reader_primary_key_test.py      |  39 ++++
 .../pypaimon/tests/test_nested_projection_e2e.py   |  73 ++++++--
 .../tests/test_outer_projection_record_reader.py   | 202 +++++++++++++++++++++
 6 files changed, 509 insertions(+), 29 deletions(-)

diff --git 
a/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py 
b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
new file mode 100644
index 0000000000..9f185d5292
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
@@ -0,0 +1,137 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Outer-projection wrapper for nested-field reads.
+
+Sits above a reader whose rows still carry full ROW sub-structures, and
+emits flat rows whose slots are the values reached by walking each
+nested name path. Used on the primary-key merge-read path: the inner
+reader hands the merge function complete ROW columns (so deduplicate /
+partial-update / aggregation see the original sub-structure), and this
+wrapper extracts the user-visible flat columns afterwards.
+"""
+
+from typing import Any, List, Optional
+
+from pypaimon.read.reader.iface.record_iterator import RecordIterator
+from pypaimon.read.reader.iface.record_reader import RecordReader
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.offset_row import OffsetRow
+
+
+class OuterProjectionRecordReader(RecordReader[InternalRow]):
+    """Wraps an InternalRow reader and projects nested name paths into flat 
rows."""
+
+    def __init__(
+        self,
+        inner: RecordReader[InternalRow],
+        inner_top_names: List[str],
+        name_paths: List[List[str]],
+    ):
+        if not name_paths:
+            raise ValueError("name_paths must be non-empty")
+        for path in name_paths:
+            if not path:
+                raise ValueError("each name path must contain at least one 
name")
+        name_to_top_idx = {name: i for i, name in enumerate(inner_top_names)}
+        self._specs: List[_PathSpec] = []
+        for path in name_paths:
+            top_name = path[0]
+            if top_name not in name_to_top_idx:
+                raise ValueError(
+                    "path top-level field %r not found in inner row schema %r"
+                    % (top_name, inner_top_names))
+            self._specs.append(_PathSpec(name_to_top_idx[top_name], 
list(path[1:])))
+        self._inner = inner
+        self._flat_arity = len(name_paths)
+
+    def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
+        inner_batch = self._inner.read_batch()
+        if inner_batch is None:
+            return None
+        return _OuterProjectionIterator(inner_batch, self._specs, 
self._flat_arity)
+
+    def close(self) -> None:
+        self._inner.close()
+
+
+class _OuterProjectionIterator(RecordIterator[InternalRow]):
+    """Per-batch iterator that materialises one flat OffsetRow per inner 
row."""
+
+    def __init__(
+        self,
+        inner: RecordIterator[InternalRow],
+        specs: List["_PathSpec"],
+        flat_arity: int,
+    ):
+        self._inner = inner
+        self._specs = specs
+        self._flat_arity = flat_arity
+        self._reused_row = OffsetRow(None, 0, flat_arity)
+
+    def next(self) -> Optional[InternalRow]:
+        inner_row = self._inner.next()
+        if inner_row is None:
+            return None
+        flat = tuple(_extract(inner_row, spec) for spec in self._specs)
+        self._reused_row.replace(flat)
+        # Inherit the inner row's RowKind so downstream consumers (e.g. the
+        # to_arrow path) keep the same +I/-D/-U/+U classification.
+        self._reused_row.set_row_kind_byte(inner_row.get_row_kind().value)
+        return self._reused_row
+
+
+class _PathSpec:
+    """Pre-resolved name path: top-level slot index plus sub-field names."""
+
+    __slots__ = ("top_idx", "sub_names")
+
+    def __init__(self, top_idx: int, sub_names: List[str]):
+        self.top_idx = top_idx
+        self.sub_names = sub_names
+
+
+def _extract(row: InternalRow, spec: _PathSpec) -> Any:
+    cur = row.get_field(spec.top_idx)
+    for name in spec.sub_names:
+        if cur is None:
+            return None
+        cur = _step_into(cur, name)
+    return cur
+
+
+def _step_into(value: Any, name: str) -> Any:
+    """Take one step into a ROW sub-structure by sub-field name.
+
+    Upstream materialises nested ROW values as plain Python dicts (e.g.
+    polars row-by-row iteration produces a dict for each struct slot),
+    so dict access is the only supported form here. Anything else is
+    rejected loudly to surface schema/wiring mismatches early.
+    """
+    if isinstance(value, dict):
+        return value.get(name)
+    if isinstance(value, InternalRow):
+        # Defensive: if the upstream reader handed us a wrapped sub-row,
+        # we cannot index it by name without its schema, so fail fast
+        # rather than guessing the slot.
+        raise TypeError(
+            "Cannot step into InternalRow by name %r without sub-schema; "
+            "expected a dict from the polars row materialisation" % (name,))
+    raise TypeError(
+        "Cannot index nested ROW step %r into value of type %s"
+        % (name, type(value).__name__))
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index e5903958d1..c5f4082b9d 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -189,15 +189,29 @@ class SplitRead(ABC):
         nested_path_by_name = self._nested_path_by_name()
         has_nested = nested_path_by_name is not None
 
+        # Cover both the merge-internal aliases (``_KEY_id``) and the
+        # bare user-facing PK name (``id``) the file actually stores.
+        name_to_field: Dict[str, DataField] = {f.name: f for f in 
self.read_fields}
+        _, _trimmed_lookup_fields = self._get_trimmed_fields(
+            self._get_read_data_fields(), self._get_all_data_fields()
+        )
+        for f in _trimmed_lookup_fields:
+            name_to_field.setdefault(f.name, f)
+
         format_reader: RecordBatchReader
         if file_format == CoreOptions.FILE_FORMAT_AVRO:
             avro_nested_paths = (
                 [nested_path_by_name[name] for name in read_file_fields]
                 if has_nested else None
             )
-            format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_file_fields,
-                                             self.read_fields, 
read_arrow_predicate, batch_size=batch_size,
-                                             
nested_name_paths=avro_nested_paths)
+            # Pass the alias-safe union so FormatAvroReader can resolve
+            # the bare PK name (e.g. ``id``) requested by read_file_fields,
+            # even when value projection drops it from self.read_fields.
+            format_reader = FormatAvroReader(
+                self.table.file_io, file_path, read_file_fields,
+                list(name_to_field.values()),
+                read_arrow_predicate, batch_size=batch_size,
+                nested_name_paths=avro_nested_paths)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
             if has_nested:
                 raise NotImplementedError(
@@ -210,7 +224,6 @@ class SplitRead(ABC):
             if has_nested:
                 raise NotImplementedError(
                     "Nested-field projection is not supported on Lance files")
-            name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
             format_reader = FormatLanceReader(self.table.file_io, file_path, 
ordered_read_fields,
                                               read_arrow_predicate, 
batch_size=batch_size,
@@ -219,7 +232,6 @@ class SplitRead(ABC):
             if has_nested:
                 raise NotImplementedError(
                     "Nested-field projection is not supported on Vortex files")
-            name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
             predicate_fields = _get_all_fields(self.push_down_predicate) if 
self.push_down_predicate else set()
             format_reader = FormatVortexReader(self.table.file_io, file_path, 
ordered_read_fields,
@@ -227,7 +239,6 @@ class SplitRead(ABC):
                                                row_indices=row_indices,
                                                
predicate_fields=predicate_fields)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
-            name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
             ordered_nested_paths = (
                 [nested_path_by_name[f.name] for f in ordered_read_fields]
@@ -560,6 +571,27 @@ class RawFileSplitRead(SplitRead):
 
 
 class MergeFileSplitRead(SplitRead):
+    def __init__(
+            self,
+            table,
+            predicate: Optional[Predicate],
+            read_type: List[DataField],
+            split: Split,
+            row_tracking_enabled: bool,
+            outer_extract_name_paths: Optional[List[List[str]]] = None):
+        # Merge functions need full ROW sub-structures, so nested paths
+        # are not pushed down here; sub-path extraction happens above
+        # the merge via OuterProjectionRecordReader.
+        super().__init__(
+            table=table,
+            predicate=predicate,
+            read_type=read_type,
+            split=split,
+            row_tracking_enabled=row_tracking_enabled,
+            nested_name_paths=None,
+        )
+        self.outer_extract_name_paths = outer_extract_name_paths
+
     def kv_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> RecordReader:
         file_batch_reader = self.file_reader_supplier(file, True, 
self._get_final_read_data_fields(), False)
         dv = dv_factory() if dv_factory else None
@@ -591,9 +623,16 @@ class MergeFileSplitRead(SplitRead):
         concat_reader = ConcatRecordReader(section_readers)
         kv_unwrap_reader = 
KeyValueUnwrapRecordReader(DropDeleteRecordReader(concat_reader))
         if self.predicate_for_reader:
-            return FilterRecordReader(kv_unwrap_reader, 
self.predicate_for_reader)
+            reader = FilterRecordReader(kv_unwrap_reader, 
self.predicate_for_reader)
         else:
-            return kv_unwrap_reader
+            reader = kv_unwrap_reader
+        if self.outer_extract_name_paths:
+            from pypaimon.read.reader.outer_projection_record_reader import \
+                OuterProjectionRecordReader
+            inner_top_names = [f.name for f in 
self.read_fields[-self.value_arity:]]
+            reader = OuterProjectionRecordReader(
+                reader, inner_top_names, self.outer_extract_name_paths)
+        return reader
 
     def _get_all_data_fields(self):
         return self._create_key_value_fields(self.table.fields)
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 488c964bcd..ce874532b0 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -270,17 +270,21 @@ class TableRead:
 
     def _create_split_read(self, split: Split) -> SplitRead:
         if self.table.is_primary_key_table and not split.raw_convertible:
+            inner_read_type = self.read_type
+            outer_extract_name_paths: Optional[List[List[str]]] = None
             if self.nested_name_paths and any(
                     len(p) > 1 for p in self.nested_name_paths):
-                raise NotImplementedError(
-                    "Nested-field projection on primary-key tables that "
-                    "require a merge read is not yet supported")
+                # Inner: full ROW for the merge function. Outer: extract
+                # the requested sub-paths back to the user's flat schema.
+                inner_read_type = self._widen_to_top_level_for_merge()
+                outer_extract_name_paths = self.nested_name_paths
             return MergeFileSplitRead(
                 table=self.table,
                 predicate=self.predicate,
-                read_type=self.read_type,
+                read_type=inner_read_type,
                 split=split,
-                row_tracking_enabled=False
+                row_tracking_enabled=False,
+                outer_extract_name_paths=outer_extract_name_paths,
             )
         elif self.table.options.data_evolution_enabled():
             if self.nested_name_paths and any(
@@ -306,6 +310,24 @@ class TableRead:
                 nested_name_paths=self.nested_name_paths,
             )
 
+    def _widen_to_top_level_for_merge(self) -> List[DataField]:
+        """Unique top-level fields from ``self.nested_name_paths``, in path 
order."""
+        table_fields_by_name = {f.name: f for f in self.table.fields}
+        seen = set()
+        widened: List[DataField] = []
+        for path in self.nested_name_paths or []:
+            top_name = path[0]
+            if top_name in seen:
+                continue
+            seen.add(top_name)
+            field = table_fields_by_name.get(top_name)
+            if field is None:
+                raise ValueError(
+                    "Nested projection top-level field %r not found in "
+                    "table schema" % (top_name,))
+            widened.append(field)
+        return widened
+
     @staticmethod
     def convert_rows_to_arrow_batch(row_tuples: List[tuple], schema: 
pyarrow.Schema) -> pyarrow.RecordBatch:
         columns_data = zip(*row_tuples)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 36ec678c8c..1a443e375a 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -270,6 +270,45 @@ class PkReaderTest(unittest.TestCase):
         expected = self.expected.select(['dt', 'user_id', 'behavior'])
         self.assertEqual(actual, expected)
 
+    def _assert_value_only_projection_works(self, file_format: str, 
table_suffix: str):
+        # Two commits force the split through the merge path. The merge
+        # reader still needs the PK column to assemble its key, even
+        # though the user-visible projection drops it — regress the
+        # case where narrowing to value-only fields broke the file
+        # column lookup.
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            partition_keys=['dt'],
+            primary_keys=['user_id', 'dt'],
+            options={'bucket': '2', 'file.format': file_format})
+        self.catalog.create_table(
+            'default.test_pk_projection_no_pk_' + table_suffix, schema, False)
+        table = self.catalog.get_table(
+            'default.test_pk_projection_no_pk_' + table_suffix)
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder().with_projection(['behavior'])
+        actual = self._read_test_table(read_builder)
+        expected = self.expected.select(['behavior'])
+        # Projection drops PKs so we can only compare bag semantics.
+        self.assertEqual(
+            sorted([r['behavior'] for r in actual.to_pylist()],
+                   key=lambda v: '' if v is None else v),
+            sorted([r['behavior'] for r in expected.to_pylist()],
+                   key=lambda v: '' if v is None else v))
+
+    def test_pk_reader_with_projection_excluding_pk(self):
+        self._assert_value_only_projection_works('parquet', 'parquet')
+
+    def test_pk_reader_with_projection_excluding_pk_orc(self):
+        self._assert_value_only_projection_works('orc', 'orc')
+
+    def test_pk_reader_with_projection_excluding_pk_avro(self):
+        # Avro path resolves DataField names through ``full_fields_map``
+        # built from ``self.read_fields``; the alias-safe lookup must also
+        # cover the bare PK name (``user_id``) the file actually stores.
+        self._assert_value_only_projection_works('avro', 'avro')
+
     def test_pk_reader_with_limit(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema,
                                             partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py 
b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
index 87ab3935f4..457f1b588e 100644
--- a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
+++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
@@ -175,35 +175,76 @@ class AppendOnlyNestedParquetTest(_AppendOnlyNestedBase):
              {'val': 'y', 'id': 2},
              {'val': 'z', 'id': 3}])
 
-    def test_pk_table_merge_split_with_nested_projection_raises(self):
-        # Phase 2b lands the append-only path only; PK + nested needs an
-        # outer-projection wrapper that ships in a follow-up commit. Until
-        # then, the call must refuse loudly rather than silently corrupt
-        # the merge function input. Two commits on the same PK force the
-        # split out of the raw-convertible fast path into the merge
-        # reader, which is where the guard lives.
-        identifier = 'default.pk_nested_unsupported'
+
+class PrimaryKeyNestedTest(_AppendOnlyNestedBase):
+    """PK tables go through the merge reader once a split is no longer
+    raw-convertible (multiple overlapping commits on the same key). The
+    merge function still needs full ROW sub-structures, so the read
+    splits inner = full-ROW from outer = flat sub-paths via an
+    OuterProjectionRecordReader."""
+
+    def _create_pk_table(self, name: str, file_format: str = 'parquet'):
+        identifier = 'default.{}'.format(name)
         schema = Schema.from_pyarrow_schema(
             self.pa_schema,
             primary_keys=['id'],
-            options={'bucket': '1', 'file.format': 'parquet'},
+            options={'bucket': '1', 'file.format': file_format},
         )
         self.catalog.create_table(identifier, schema, False)
         table = self.catalog.get_table(identifier)
-        for batch in (self.rows, self.rows):  # two overlapping commits
+        # Two overlapping commits force the split off the raw-convertible
+        # fast path into the merge reader.
+        for batch in (self.rows, self.rows):
             wb = table.new_batch_write_builder()
             w = wb.new_write()
             w.write_arrow(pa.Table.from_pylist(batch, schema=self.pa_schema))
             wb.new_commit().commit(w.prepare_commit())
             w.close()
+        return table
 
-        rb = table.new_read_builder().with_projection(['mv.latest_version'])
+    def _read_arrow(self, table, projection):
+        rb = table.new_read_builder().with_projection(projection)
         splits = rb.new_scan().plan().splits()
-        # ``to_arrow`` materialises the split read; the merge path is what
-        # raises, so do it eagerly here rather than waiting for the first
-        # batch.
-        with self.assertRaises(NotImplementedError):
-            rb.new_read().to_arrow(splits)
+        return rb.new_read().to_arrow(splits)
+
+    def test_extracts_single_nested_leaf(self):
+        table = self._create_pk_table('pk_nested_single')
+        arrow = self._read_arrow(table, ['mv.latest_version'])
+        self.assertEqual(arrow.column_names, ['mv_latest_version'])
+        versions = sorted(arrow.column('mv_latest_version').to_pylist())
+        self.assertEqual(versions, [100, 200, 300])
+
+    def test_multiple_sub_paths_under_same_struct(self):
+        table = self._create_pk_table('pk_nested_double')
+        arrow = self._read_arrow(
+            table, ['mv.latest_version', 'mv.latest_value'])
+        self.assertEqual(arrow.column_names, ['mv_latest_version', 
'mv_latest_value'])
+        pairs = sorted(zip(
+            arrow.column('mv_latest_version').to_pylist(),
+            arrow.column('mv_latest_value').to_pylist()))
+        self.assertEqual(pairs, [(100, 'a'), (200, 'b'), (300, 'c')])
+
+    def test_mixed_nested_and_top_level_preserves_order(self):
+        table = self._create_pk_table('pk_nested_mixed')
+        arrow = self._read_arrow(
+            table, ['id', 'mv.latest_version', 'val'])
+        self.assertEqual(
+            arrow.column_names, ['id', 'mv_latest_version', 'val'])
+        rows = sorted(zip(
+            arrow.column('id').to_pylist(),
+            arrow.column('mv_latest_version').to_pylist(),
+            arrow.column('val').to_pylist()))
+        self.assertEqual(rows, [(1, 100, 'x'), (2, 200, 'y'), (3, 300, 'z')])
+
+    def test_avro_extracts_single_nested_leaf(self):
+        # Avro PK reads resolve DataFields through ``full_fields_map`` which
+        # historically only covered merge-internal aliases; without the
+        # alias-safe fix, this projection would raise ``KeyError: 'id'``.
+        table = self._create_pk_table('pk_avro_nested_single', 
file_format='avro')
+        arrow = self._read_arrow(table, ['mv.latest_version'])
+        self.assertEqual(arrow.column_names, ['mv_latest_version'])
+        versions = sorted(arrow.column('mv_latest_version').to_pylist())
+        self.assertEqual(versions, [100, 200, 300])
 
 
 if __name__ == '__main__':
diff --git 
a/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py 
b/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py
new file mode 100644
index 0000000000..846e90e1b4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py
@@ -0,0 +1,202 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+from typing import List, Optional
+
+from pypaimon.read.reader.iface.record_iterator import RecordIterator
+from pypaimon.read.reader.iface.record_reader import RecordReader
+from pypaimon.read.reader.outer_projection_record_reader import (
+    OuterProjectionRecordReader, _extract, _PathSpec, _step_into)
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.offset_row import OffsetRow
+
+
+class _StaticIterator(RecordIterator[InternalRow]):
+    """Iterator over a pre-built list of OffsetRows; yields None when 
drained."""
+
+    def __init__(self, rows: List[OffsetRow]):
+        self._rows = list(rows)
+        self._idx = 0
+
+    def next(self) -> Optional[InternalRow]:
+        if self._idx >= len(self._rows):
+            return None
+        row = self._rows[self._idx]
+        self._idx += 1
+        return row
+
+
+class _StaticReader(RecordReader[InternalRow]):
+    """Reader that emits a single batch then EOF, with close-tracking."""
+
+    def __init__(self, rows: List[OffsetRow]):
+        self._rows = rows
+        self._delivered = False
+        self.closed = False
+
+    def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
+        if self._delivered:
+            return None
+        self._delivered = True
+        return _StaticIterator(self._rows)
+
+    def close(self) -> None:
+        self.closed = True
+
+
+def _row(*values) -> OffsetRow:
+    return OffsetRow(tuple(values), 0, len(values))
+
+
+class StepIntoTest(unittest.TestCase):
+
+    def test_dict_lookup(self):
+        self.assertEqual(_step_into({'a': 1, 'b': 2}, 'a'), 1)
+        self.assertIsNone(_step_into({'a': 1}, 'missing'))
+
+    def test_internal_row_rejected(self):
+        # Defensive: we never expect a nested InternalRow in the polars path.
+        row = OffsetRow((10, 20), 0, 2)
+        with self.assertRaises(TypeError):
+            _step_into(row, 'whatever')
+
+    def test_unsupported_type_raises(self):
+        with self.assertRaises(TypeError):
+            _step_into(42, 'name')
+
+
+class ExtractTest(unittest.TestCase):
+
+    def test_empty_sub_names_returns_top_level_field(self):
+        # Empty sub_names → return the top-level slot itself (used when
+        # mixing top-level and nested paths in the same projection).
+        row = _row(1, {'v': 100, 's': 'a'})
+        spec = _PathSpec(top_idx=0, sub_names=[])
+        self.assertEqual(_extract(row, spec), 1)
+
+    def test_nested_walk_returns_leaf(self):
+        row = _row(1, {'v': 100, 's': 'a'})
+        spec = _PathSpec(top_idx=1, sub_names=['v'])
+        self.assertEqual(_extract(row, spec), 100)
+
+    def test_none_at_top_short_circuits(self):
+        row = _row(1, None)
+        spec = _PathSpec(top_idx=1, sub_names=['v'])
+        self.assertIsNone(_extract(row, spec))
+
+    def test_missing_sub_name_returns_none(self):
+        row = _row(1, {'v': 100})
+        spec = _PathSpec(top_idx=1, sub_names=['s'])
+        self.assertIsNone(_extract(row, spec))
+
+
+class OuterProjectionRecordReaderTest(unittest.TestCase):
+
+    def _build_reader(self, rows, top_names, name_paths):
+        return OuterProjectionRecordReader(
+            _StaticReader(rows), top_names, name_paths)
+
+    def test_extracts_nested_leaf(self):
+        rows = [
+            _row(1, {'v': 100, 's': 'a'}, 'x'),
+            _row(2, {'v': 200, 's': 'b'}, 'y'),
+        ]
+        reader = self._build_reader(
+            rows,
+            top_names=['id', 'mv', 'val'],
+            name_paths=[['mv', 'v']])
+        batch = reader.read_batch()
+        out = []
+        while True:
+            r = batch.next()
+            if r is None:
+                break
+            out.append(tuple(r.get_field(i) for i in range(len(r))))
+        self.assertEqual(out, [(100,), (200,)])
+
+    def test_mixed_top_level_and_nested_preserves_order(self):
+        rows = [_row(1, {'v': 100, 's': 'a'}, 'x')]
+        reader = self._build_reader(
+            rows,
+            top_names=['id', 'mv', 'val'],
+            name_paths=[['val'], ['mv', 'v'], ['id']])
+        batch = reader.read_batch()
+        r = batch.next()
+        self.assertEqual(
+            tuple(r.get_field(i) for i in range(len(r))),
+            ('x', 100, 1))
+
+    def test_nullable_struct_returns_none(self):
+        # The wrapper reuses a single OffsetRow per batch, so consumers must
+        # materialise each row before advancing — same contract as
+        # InternalRowWrapperIterator.
+        rows = [_row(1, None, 'x'), _row(2, {'v': 200, 's': 'b'}, 'y')]
+        reader = self._build_reader(
+            rows,
+            top_names=['id', 'mv', 'val'],
+            name_paths=[['mv', 'v']])
+        batch = reader.read_batch()
+        first_value = batch.next().get_field(0)
+        second_value = batch.next().get_field(0)
+        self.assertIsNone(first_value)
+        self.assertEqual(second_value, 200)
+
+    def test_inherits_row_kind(self):
+        rows = [_row(1, {'v': 100, 's': 'a'})]
+        rows[0].set_row_kind_byte(2)  # -U
+        reader = self._build_reader(
+            rows,
+            top_names=['id', 'mv'],
+            name_paths=[['mv', 'v']])
+        batch = reader.read_batch()
+        r = batch.next()
+        self.assertEqual(r.row_kind_byte, 2)
+
+    def test_eof_returns_none(self):
+        reader = self._build_reader(
+            [], top_names=['id'], name_paths=[['id']])
+        # First batch is empty (delivered) but the iterator immediately yields 
None.
+        first_batch = reader.read_batch()
+        self.assertIsNone(first_batch.next())
+        # Subsequent read_batch returns None once the inner reader is drained.
+        self.assertIsNone(reader.read_batch())
+
+    def test_close_propagates(self):
+        inner = _StaticReader([_row(1, {'v': 100})])
+        reader = OuterProjectionRecordReader(
+            inner, ['id', 'mv'], [['mv', 'v']])
+        reader.close()
+        self.assertTrue(inner.closed)
+
+    def test_unknown_top_name_raises_at_construction(self):
+        with self.assertRaises(ValueError):
+            OuterProjectionRecordReader(
+                _StaticReader([]), ['id', 'mv'], [['nope', 'v']])
+
+    def test_empty_name_paths_rejected(self):
+        with self.assertRaises(ValueError):
+            OuterProjectionRecordReader(_StaticReader([]), ['id'], [])
+
+    def test_empty_individual_path_rejected(self):
+        with self.assertRaises(ValueError):
+            OuterProjectionRecordReader(_StaticReader([]), ['id'], [[]])
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to