This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fd60a8a073 [python] Add nested-field projection on primary-key
merge-read path (#7801)
fd60a8a073 is described below
commit fd60a8a0737bc8f82cfb4bdf406aa3ee7d6609f0
Author: chaoyang <[email protected]>
AuthorDate: Sun May 10 21:44:14 2026 +0800
[python] Add nested-field projection on primary-key merge-read path (#7801)
---
.../read/reader/outer_projection_record_reader.py | 137 ++++++++++++++
paimon-python/pypaimon/read/split_read.py | 55 +++++-
paimon-python/pypaimon/read/table_read.py | 32 +++-
.../pypaimon/tests/reader_primary_key_test.py | 39 ++++
.../pypaimon/tests/test_nested_projection_e2e.py | 73 ++++++--
.../tests/test_outer_projection_record_reader.py | 202 +++++++++++++++++++++
6 files changed, 509 insertions(+), 29 deletions(-)
diff --git
a/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
new file mode 100644
index 0000000000..9f185d5292
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
@@ -0,0 +1,137 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Outer-projection wrapper for nested-field reads.
+
+Sits above a reader whose rows still carry full ROW sub-structures, and
+emits flat rows whose slots are the values reached by walking each
+nested name path. Used on the primary-key merge-read path: the inner
+reader hands the merge function complete ROW columns (so deduplicate /
+partial-update / aggregation see the original sub-structure), and this
+wrapper extracts the user-visible flat columns afterwards.
+"""
+
+from typing import Any, List, Optional
+
+from pypaimon.read.reader.iface.record_iterator import RecordIterator
+from pypaimon.read.reader.iface.record_reader import RecordReader
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.offset_row import OffsetRow
+
+
+class OuterProjectionRecordReader(RecordReader[InternalRow]):
+ """Wraps an InternalRow reader and projects nested name paths into flat
rows."""
+
+ def __init__(
+ self,
+ inner: RecordReader[InternalRow],
+ inner_top_names: List[str],
+ name_paths: List[List[str]],
+ ):
+ if not name_paths:
+ raise ValueError("name_paths must be non-empty")
+ for path in name_paths:
+ if not path:
+ raise ValueError("each name path must contain at least one
name")
+ name_to_top_idx = {name: i for i, name in enumerate(inner_top_names)}
+ self._specs: List[_PathSpec] = []
+ for path in name_paths:
+ top_name = path[0]
+ if top_name not in name_to_top_idx:
+ raise ValueError(
+ "path top-level field %r not found in inner row schema %r"
+ % (top_name, inner_top_names))
+ self._specs.append(_PathSpec(name_to_top_idx[top_name],
list(path[1:])))
+ self._inner = inner
+ self._flat_arity = len(name_paths)
+
+ def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
+ inner_batch = self._inner.read_batch()
+ if inner_batch is None:
+ return None
+ return _OuterProjectionIterator(inner_batch, self._specs,
self._flat_arity)
+
+ def close(self) -> None:
+ self._inner.close()
+
+
+class _OuterProjectionIterator(RecordIterator[InternalRow]):
+ """Per-batch iterator that materialises one flat OffsetRow per inner
row."""
+
+ def __init__(
+ self,
+ inner: RecordIterator[InternalRow],
+ specs: List["_PathSpec"],
+ flat_arity: int,
+ ):
+ self._inner = inner
+ self._specs = specs
+ self._flat_arity = flat_arity
+ self._reused_row = OffsetRow(None, 0, flat_arity)
+
+ def next(self) -> Optional[InternalRow]:
+ inner_row = self._inner.next()
+ if inner_row is None:
+ return None
+ flat = tuple(_extract(inner_row, spec) for spec in self._specs)
+ self._reused_row.replace(flat)
+ # Inherit the inner row's RowKind so downstream consumers (e.g. the
+ # to_arrow path) keep the same +I/-D/-U/+U classification.
+ self._reused_row.set_row_kind_byte(inner_row.get_row_kind().value)
+ return self._reused_row
+
+
+class _PathSpec:
+ """Pre-resolved name path: top-level slot index plus sub-field names."""
+
+ __slots__ = ("top_idx", "sub_names")
+
+ def __init__(self, top_idx: int, sub_names: List[str]):
+ self.top_idx = top_idx
+ self.sub_names = sub_names
+
+
+def _extract(row: InternalRow, spec: _PathSpec) -> Any:
+ cur = row.get_field(spec.top_idx)
+ for name in spec.sub_names:
+ if cur is None:
+ return None
+ cur = _step_into(cur, name)
+ return cur
+
+
+def _step_into(value: Any, name: str) -> Any:
+ """Take one step into a ROW sub-structure by sub-field name.
+
+ Upstream materialises nested ROW values as plain Python dicts (e.g.
+ polars row-by-row iteration produces a dict for each struct slot),
+ so dict access is the only supported form here. Anything else is
+ rejected loudly to surface schema/wiring mismatches early.
+ """
+ if isinstance(value, dict):
+ return value.get(name)
+ if isinstance(value, InternalRow):
+ # Defensive: if the upstream reader handed us a wrapped sub-row,
+ # we cannot index it by name without its schema, so fail fast
+ # rather than guessing the slot.
+ raise TypeError(
+ "Cannot step into InternalRow by name %r without sub-schema; "
+ "expected a dict from the polars row materialisation" % (name,))
+ raise TypeError(
+ "Cannot index nested ROW step %r into value of type %s"
+ % (name, type(value).__name__))
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index e5903958d1..c5f4082b9d 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -189,15 +189,29 @@ class SplitRead(ABC):
nested_path_by_name = self._nested_path_by_name()
has_nested = nested_path_by_name is not None
+ # Cover both the merge-internal aliases (``_KEY_id``) and the
+ # bare user-facing PK name (``id``) the file actually stores.
+ name_to_field: Dict[str, DataField] = {f.name: f for f in
self.read_fields}
+ _, _trimmed_lookup_fields = self._get_trimmed_fields(
+ self._get_read_data_fields(), self._get_all_data_fields()
+ )
+ for f in _trimmed_lookup_fields:
+ name_to_field.setdefault(f.name, f)
+
format_reader: RecordBatchReader
if file_format == CoreOptions.FILE_FORMAT_AVRO:
avro_nested_paths = (
[nested_path_by_name[name] for name in read_file_fields]
if has_nested else None
)
- format_reader = FormatAvroReader(self.table.file_io, file_path,
read_file_fields,
- self.read_fields,
read_arrow_predicate, batch_size=batch_size,
-
nested_name_paths=avro_nested_paths)
+ # Pass the alias-safe union so FormatAvroReader can resolve
+ # the bare PK name (e.g. ``id``) requested by read_file_fields,
+ # even when value projection drops it from self.read_fields.
+ format_reader = FormatAvroReader(
+ self.table.file_io, file_path, read_file_fields,
+ list(name_to_field.values()),
+ read_arrow_predicate, batch_size=batch_size,
+ nested_name_paths=avro_nested_paths)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
if has_nested:
raise NotImplementedError(
@@ -210,7 +224,6 @@ class SplitRead(ABC):
if has_nested:
raise NotImplementedError(
"Nested-field projection is not supported on Lance files")
- name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
format_reader = FormatLanceReader(self.table.file_io, file_path,
ordered_read_fields,
read_arrow_predicate,
batch_size=batch_size,
@@ -219,7 +232,6 @@ class SplitRead(ABC):
if has_nested:
raise NotImplementedError(
"Nested-field projection is not supported on Vortex files")
- name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
predicate_fields = _get_all_fields(self.push_down_predicate) if
self.push_down_predicate else set()
format_reader = FormatVortexReader(self.table.file_io, file_path,
ordered_read_fields,
@@ -227,7 +239,6 @@ class SplitRead(ABC):
row_indices=row_indices,
predicate_fields=predicate_fields)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
- name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
ordered_nested_paths = (
[nested_path_by_name[f.name] for f in ordered_read_fields]
@@ -560,6 +571,27 @@ class RawFileSplitRead(SplitRead):
class MergeFileSplitRead(SplitRead):
+ def __init__(
+ self,
+ table,
+ predicate: Optional[Predicate],
+ read_type: List[DataField],
+ split: Split,
+ row_tracking_enabled: bool,
+ outer_extract_name_paths: Optional[List[List[str]]] = None):
+ # Merge functions need full ROW sub-structures, so nested paths
+ # are not pushed down here; sub-path extraction happens above
+ # the merge via OuterProjectionRecordReader.
+ super().__init__(
+ table=table,
+ predicate=predicate,
+ read_type=read_type,
+ split=split,
+ row_tracking_enabled=row_tracking_enabled,
+ nested_name_paths=None,
+ )
+ self.outer_extract_name_paths = outer_extract_name_paths
+
def kv_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> RecordReader:
file_batch_reader = self.file_reader_supplier(file, True,
self._get_final_read_data_fields(), False)
dv = dv_factory() if dv_factory else None
@@ -591,9 +623,16 @@ class MergeFileSplitRead(SplitRead):
concat_reader = ConcatRecordReader(section_readers)
kv_unwrap_reader =
KeyValueUnwrapRecordReader(DropDeleteRecordReader(concat_reader))
if self.predicate_for_reader:
- return FilterRecordReader(kv_unwrap_reader,
self.predicate_for_reader)
+ reader = FilterRecordReader(kv_unwrap_reader,
self.predicate_for_reader)
else:
- return kv_unwrap_reader
+ reader = kv_unwrap_reader
+ if self.outer_extract_name_paths:
+ from pypaimon.read.reader.outer_projection_record_reader import \
+ OuterProjectionRecordReader
+ inner_top_names = [f.name for f in
self.read_fields[-self.value_arity:]]
+ reader = OuterProjectionRecordReader(
+ reader, inner_top_names, self.outer_extract_name_paths)
+ return reader
def _get_all_data_fields(self):
return self._create_key_value_fields(self.table.fields)
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 488c964bcd..ce874532b0 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -270,17 +270,21 @@ class TableRead:
def _create_split_read(self, split: Split) -> SplitRead:
if self.table.is_primary_key_table and not split.raw_convertible:
+ inner_read_type = self.read_type
+ outer_extract_name_paths: Optional[List[List[str]]] = None
if self.nested_name_paths and any(
len(p) > 1 for p in self.nested_name_paths):
- raise NotImplementedError(
- "Nested-field projection on primary-key tables that "
- "require a merge read is not yet supported")
+ # Inner: full ROW for the merge function. Outer: extract
+ # the requested sub-paths back to the user's flat schema.
+ inner_read_type = self._widen_to_top_level_for_merge()
+ outer_extract_name_paths = self.nested_name_paths
return MergeFileSplitRead(
table=self.table,
predicate=self.predicate,
- read_type=self.read_type,
+ read_type=inner_read_type,
split=split,
- row_tracking_enabled=False
+ row_tracking_enabled=False,
+ outer_extract_name_paths=outer_extract_name_paths,
)
elif self.table.options.data_evolution_enabled():
if self.nested_name_paths and any(
@@ -306,6 +310,24 @@ class TableRead:
nested_name_paths=self.nested_name_paths,
)
+ def _widen_to_top_level_for_merge(self) -> List[DataField]:
+ """Unique top-level fields from ``self.nested_name_paths``, in path
order."""
+ table_fields_by_name = {f.name: f for f in self.table.fields}
+ seen = set()
+ widened: List[DataField] = []
+ for path in self.nested_name_paths or []:
+ top_name = path[0]
+ if top_name in seen:
+ continue
+ seen.add(top_name)
+ field = table_fields_by_name.get(top_name)
+ if field is None:
+ raise ValueError(
+ "Nested projection top-level field %r not found in "
+ "table schema" % (top_name,))
+ widened.append(field)
+ return widened
+
@staticmethod
def convert_rows_to_arrow_batch(row_tuples: List[tuple], schema:
pyarrow.Schema) -> pyarrow.RecordBatch:
columns_data = zip(*row_tuples)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 36ec678c8c..1a443e375a 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -270,6 +270,45 @@ class PkReaderTest(unittest.TestCase):
expected = self.expected.select(['dt', 'user_id', 'behavior'])
self.assertEqual(actual, expected)
+ def _assert_value_only_projection_works(self, file_format: str,
table_suffix: str):
+ # Two commits force the split through the merge path. The merge
+ # reader still needs the PK column to assemble its key, even
+ # though the user-visible projection drops it — regress the
+ # case where narrowing to value-only fields broke the file
+ # column lookup.
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '2', 'file.format': file_format})
+ self.catalog.create_table(
+ 'default.test_pk_projection_no_pk_' + table_suffix, schema, False)
+ table = self.catalog.get_table(
+ 'default.test_pk_projection_no_pk_' + table_suffix)
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder().with_projection(['behavior'])
+ actual = self._read_test_table(read_builder)
+ expected = self.expected.select(['behavior'])
+ # Projection drops PKs so we can only compare bag semantics.
+ self.assertEqual(
+ sorted([r['behavior'] for r in actual.to_pylist()],
+ key=lambda v: '' if v is None else v),
+ sorted([r['behavior'] for r in expected.to_pylist()],
+ key=lambda v: '' if v is None else v))
+
+ def test_pk_reader_with_projection_excluding_pk(self):
+ self._assert_value_only_projection_works('parquet', 'parquet')
+
+ def test_pk_reader_with_projection_excluding_pk_orc(self):
+ self._assert_value_only_projection_works('orc', 'orc')
+
+ def test_pk_reader_with_projection_excluding_pk_avro(self):
+ # Avro path resolves DataField names through ``full_fields_map``
+ # built from ``self.read_fields``; the alias-safe lookup must also
+ # cover the bare PK name (``user_id``) the file actually stores.
+ self._assert_value_only_projection_works('avro', 'avro')
+
def test_pk_reader_with_limit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
index 87ab3935f4..457f1b588e 100644
--- a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
+++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
@@ -175,35 +175,76 @@ class AppendOnlyNestedParquetTest(_AppendOnlyNestedBase):
{'val': 'y', 'id': 2},
{'val': 'z', 'id': 3}])
- def test_pk_table_merge_split_with_nested_projection_raises(self):
- # Phase 2b lands the append-only path only; PK + nested needs an
- # outer-projection wrapper that ships in a follow-up commit. Until
- # then, the call must refuse loudly rather than silently corrupt
- # the merge function input. Two commits on the same PK force the
- # split out of the raw-convertible fast path into the merge
- # reader, which is where the guard lives.
- identifier = 'default.pk_nested_unsupported'
+
+class PrimaryKeyNestedTest(_AppendOnlyNestedBase):
+ """PK tables go through the merge reader once a split is no longer
+ raw-convertible (multiple overlapping commits on the same key). The
+ merge function still needs full ROW sub-structures, so the read
+ splits inner = full-ROW from outer = flat sub-paths via an
+ OuterProjectionRecordReader."""
+
+ def _create_pk_table(self, name: str, file_format: str = 'parquet'):
+ identifier = 'default.{}'.format(name)
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['id'],
- options={'bucket': '1', 'file.format': 'parquet'},
+ options={'bucket': '1', 'file.format': file_format},
)
self.catalog.create_table(identifier, schema, False)
table = self.catalog.get_table(identifier)
- for batch in (self.rows, self.rows): # two overlapping commits
+ # Two overlapping commits force the split off the raw-convertible
+ # fast path into the merge reader.
+ for batch in (self.rows, self.rows):
wb = table.new_batch_write_builder()
w = wb.new_write()
w.write_arrow(pa.Table.from_pylist(batch, schema=self.pa_schema))
wb.new_commit().commit(w.prepare_commit())
w.close()
+ return table
- rb = table.new_read_builder().with_projection(['mv.latest_version'])
+ def _read_arrow(self, table, projection):
+ rb = table.new_read_builder().with_projection(projection)
splits = rb.new_scan().plan().splits()
- # ``to_arrow`` materialises the split read; the merge path is what
- # raises, so do it eagerly here rather than waiting for the first
- # batch.
- with self.assertRaises(NotImplementedError):
- rb.new_read().to_arrow(splits)
+ return rb.new_read().to_arrow(splits)
+
+ def test_extracts_single_nested_leaf(self):
+ table = self._create_pk_table('pk_nested_single')
+ arrow = self._read_arrow(table, ['mv.latest_version'])
+ self.assertEqual(arrow.column_names, ['mv_latest_version'])
+ versions = sorted(arrow.column('mv_latest_version').to_pylist())
+ self.assertEqual(versions, [100, 200, 300])
+
+ def test_multiple_sub_paths_under_same_struct(self):
+ table = self._create_pk_table('pk_nested_double')
+ arrow = self._read_arrow(
+ table, ['mv.latest_version', 'mv.latest_value'])
+ self.assertEqual(arrow.column_names, ['mv_latest_version',
'mv_latest_value'])
+ pairs = sorted(zip(
+ arrow.column('mv_latest_version').to_pylist(),
+ arrow.column('mv_latest_value').to_pylist()))
+ self.assertEqual(pairs, [(100, 'a'), (200, 'b'), (300, 'c')])
+
+ def test_mixed_nested_and_top_level_preserves_order(self):
+ table = self._create_pk_table('pk_nested_mixed')
+ arrow = self._read_arrow(
+ table, ['id', 'mv.latest_version', 'val'])
+ self.assertEqual(
+ arrow.column_names, ['id', 'mv_latest_version', 'val'])
+ rows = sorted(zip(
+ arrow.column('id').to_pylist(),
+ arrow.column('mv_latest_version').to_pylist(),
+ arrow.column('val').to_pylist()))
+ self.assertEqual(rows, [(1, 100, 'x'), (2, 200, 'y'), (3, 300, 'z')])
+
+ def test_avro_extracts_single_nested_leaf(self):
+ # Avro PK reads resolve DataFields through ``full_fields_map`` which
+ # historically only covered merge-internal aliases; without the
+ # alias-safe fix, this projection would raise ``KeyError: 'id'``.
+ table = self._create_pk_table('pk_avro_nested_single',
file_format='avro')
+ arrow = self._read_arrow(table, ['mv.latest_version'])
+ self.assertEqual(arrow.column_names, ['mv_latest_version'])
+ versions = sorted(arrow.column('mv_latest_version').to_pylist())
+ self.assertEqual(versions, [100, 200, 300])
if __name__ == '__main__':
diff --git
a/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py
b/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py
new file mode 100644
index 0000000000..846e90e1b4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py
@@ -0,0 +1,202 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+from typing import List, Optional
+
+from pypaimon.read.reader.iface.record_iterator import RecordIterator
+from pypaimon.read.reader.iface.record_reader import RecordReader
+from pypaimon.read.reader.outer_projection_record_reader import (
+ OuterProjectionRecordReader, _extract, _PathSpec, _step_into)
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.offset_row import OffsetRow
+
+
+class _StaticIterator(RecordIterator[InternalRow]):
+ """Iterator over a pre-built list of OffsetRows; yields None when
drained."""
+
+ def __init__(self, rows: List[OffsetRow]):
+ self._rows = list(rows)
+ self._idx = 0
+
+ def next(self) -> Optional[InternalRow]:
+ if self._idx >= len(self._rows):
+ return None
+ row = self._rows[self._idx]
+ self._idx += 1
+ return row
+
+
+class _StaticReader(RecordReader[InternalRow]):
+ """Reader that emits a single batch then EOF, with close-tracking."""
+
+ def __init__(self, rows: List[OffsetRow]):
+ self._rows = rows
+ self._delivered = False
+ self.closed = False
+
+ def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
+ if self._delivered:
+ return None
+ self._delivered = True
+ return _StaticIterator(self._rows)
+
+ def close(self) -> None:
+ self.closed = True
+
+
+def _row(*values) -> OffsetRow:
+ return OffsetRow(tuple(values), 0, len(values))
+
+
+class StepIntoTest(unittest.TestCase):
+
+ def test_dict_lookup(self):
+ self.assertEqual(_step_into({'a': 1, 'b': 2}, 'a'), 1)
+ self.assertIsNone(_step_into({'a': 1}, 'missing'))
+
+ def test_internal_row_rejected(self):
+ # Defensive: we never expect a nested InternalRow in the polars path.
+ row = OffsetRow((10, 20), 0, 2)
+ with self.assertRaises(TypeError):
+ _step_into(row, 'whatever')
+
+ def test_unsupported_type_raises(self):
+ with self.assertRaises(TypeError):
+ _step_into(42, 'name')
+
+
+class ExtractTest(unittest.TestCase):
+
+ def test_empty_sub_names_returns_top_level_field(self):
+ # Empty sub_names → return the top-level slot itself (used when
+ # mixing top-level and nested paths in the same projection).
+ row = _row(1, {'v': 100, 's': 'a'})
+ spec = _PathSpec(top_idx=0, sub_names=[])
+ self.assertEqual(_extract(row, spec), 1)
+
+ def test_nested_walk_returns_leaf(self):
+ row = _row(1, {'v': 100, 's': 'a'})
+ spec = _PathSpec(top_idx=1, sub_names=['v'])
+ self.assertEqual(_extract(row, spec), 100)
+
+ def test_none_at_top_short_circuits(self):
+ row = _row(1, None)
+ spec = _PathSpec(top_idx=1, sub_names=['v'])
+ self.assertIsNone(_extract(row, spec))
+
+ def test_missing_sub_name_returns_none(self):
+ row = _row(1, {'v': 100})
+ spec = _PathSpec(top_idx=1, sub_names=['s'])
+ self.assertIsNone(_extract(row, spec))
+
+
+class OuterProjectionRecordReaderTest(unittest.TestCase):
+
+ def _build_reader(self, rows, top_names, name_paths):
+ return OuterProjectionRecordReader(
+ _StaticReader(rows), top_names, name_paths)
+
+ def test_extracts_nested_leaf(self):
+ rows = [
+ _row(1, {'v': 100, 's': 'a'}, 'x'),
+ _row(2, {'v': 200, 's': 'b'}, 'y'),
+ ]
+ reader = self._build_reader(
+ rows,
+ top_names=['id', 'mv', 'val'],
+ name_paths=[['mv', 'v']])
+ batch = reader.read_batch()
+ out = []
+ while True:
+ r = batch.next()
+ if r is None:
+ break
+ out.append(tuple(r.get_field(i) for i in range(len(r))))
+ self.assertEqual(out, [(100,), (200,)])
+
+ def test_mixed_top_level_and_nested_preserves_order(self):
+ rows = [_row(1, {'v': 100, 's': 'a'}, 'x')]
+ reader = self._build_reader(
+ rows,
+ top_names=['id', 'mv', 'val'],
+ name_paths=[['val'], ['mv', 'v'], ['id']])
+ batch = reader.read_batch()
+ r = batch.next()
+ self.assertEqual(
+ tuple(r.get_field(i) for i in range(len(r))),
+ ('x', 100, 1))
+
+ def test_nullable_struct_returns_none(self):
+ # The wrapper reuses a single OffsetRow per batch, so consumers must
+ # materialise each row before advancing — same contract as
+ # InternalRowWrapperIterator.
+ rows = [_row(1, None, 'x'), _row(2, {'v': 200, 's': 'b'}, 'y')]
+ reader = self._build_reader(
+ rows,
+ top_names=['id', 'mv', 'val'],
+ name_paths=[['mv', 'v']])
+ batch = reader.read_batch()
+ first_value = batch.next().get_field(0)
+ second_value = batch.next().get_field(0)
+ self.assertIsNone(first_value)
+ self.assertEqual(second_value, 200)
+
+ def test_inherits_row_kind(self):
+ rows = [_row(1, {'v': 100, 's': 'a'})]
+ rows[0].set_row_kind_byte(2) # -U
+ reader = self._build_reader(
+ rows,
+ top_names=['id', 'mv'],
+ name_paths=[['mv', 'v']])
+ batch = reader.read_batch()
+ r = batch.next()
+ self.assertEqual(r.row_kind_byte, 2)
+
+ def test_eof_returns_none(self):
+ reader = self._build_reader(
+ [], top_names=['id'], name_paths=[['id']])
+ # First batch is empty (delivered) but the iterator immediately yields
None.
+ first_batch = reader.read_batch()
+ self.assertIsNone(first_batch.next())
+ # Subsequent read_batch returns None once the inner reader is drained.
+ self.assertIsNone(reader.read_batch())
+
+ def test_close_propagates(self):
+ inner = _StaticReader([_row(1, {'v': 100})])
+ reader = OuterProjectionRecordReader(
+ inner, ['id', 'mv'], [['mv', 'v']])
+ reader.close()
+ self.assertTrue(inner.closed)
+
+ def test_unknown_top_name_raises_at_construction(self):
+ with self.assertRaises(ValueError):
+ OuterProjectionRecordReader(
+ _StaticReader([]), ['id', 'mv'], [['nope', 'v']])
+
+ def test_empty_name_paths_rejected(self):
+ with self.assertRaises(ValueError):
+ OuterProjectionRecordReader(_StaticReader([]), ['id'], [])
+
+ def test_empty_individual_path_rejected(self):
+ with self.assertRaises(ValueError):
+ OuterProjectionRecordReader(_StaticReader([]), ['id'], [[]])
+
+
+if __name__ == '__main__':
+ unittest.main()