This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6fda857c01 [python] Add nested-field projection support (append-only
paths) (#7796)
6fda857c01 is described below
commit 6fda857c01a07ae981ae003d42302b47ceb4125a
Author: chaoyang <[email protected]>
AuthorDate: Sat May 9 20:16:04 2026 +0800
[python] Add nested-field projection support (append-only paths) (#7796)
`ReadBuilder.with_projection` only accepts top-level column names today;
dotted forms like `'struct.subfield'` are silently dropped. Reading just
one leaf out of a struct ends up materialising the whole parent and
discarding the rest client-side.
This PR ports nested projection on **append-only** read paths.
---
paimon-python/pypaimon/read/read_builder.py | 98 +++++++-
.../pypaimon/read/reader/format_avro_reader.py | 41 +++-
.../pypaimon/read/reader/format_pyarrow_reader.py | 96 +++++++-
paimon-python/pypaimon/read/split_read.py | 108 ++++++++-
paimon-python/pypaimon/read/table_read.py | 32 ++-
.../pypaimon/tests/test_nested_projection_e2e.py | 210 ++++++++++++++++
.../pypaimon/tests/test_projection_utility.py | 205 ++++++++++++++++
.../tests/test_read_builder_nested_projection.py | 124 ++++++++++
paimon-python/pypaimon/utils/projection.py | 267 +++++++++++++++++++++
9 files changed, 1148 insertions(+), 33 deletions(-)
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
index d45a526a69..ce95115431 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -24,6 +24,7 @@ from pypaimon.read.table_read import TableRead
from pypaimon.read.table_scan import TableScan
from pypaimon.schema.data_types import DataField
from pypaimon.table.special_fields import SpecialFields
+from pypaimon.utils.projection import Projection, _is_row_type
class ReadBuilder:
@@ -34,7 +35,14 @@ class ReadBuilder:
self.table: FileStoreTable = table
self._predicate: Optional[Predicate] = None
+ # ``_projection`` is the user-facing list of names from
+ # :meth:`with_projection`. ``_nested_paths`` is the canonical
+ # integer-path form (length-1 for top level, longer for nested
+ # ROW children) used by every consumer in this module.
+ # ``with_nested_projection`` populates only the latter; the two
+ # never both hold meaningful state simultaneously.
self._projection: Optional[List[str]] = None
+ self._nested_paths: Optional[List[List[int]]] = None
self._limit: Optional[int] = None
def with_filter(self, predicate: Predicate) -> 'ReadBuilder':
@@ -42,7 +50,19 @@ class ReadBuilder:
return self
def with_projection(self, projection: List[str]) -> 'ReadBuilder':
+ """Project to the given column names.
+
+ Names containing a dot (e.g. ``"struct.subfield"``) walk into ROW
+ children and are translated into a nested projection. Top-level-
+ only callers see the same observable behaviour as before — the
+ dotted form is opt-in. Unknown names are silently skipped to
+ preserve the pre-existing contract.
+ """
self._projection = projection
+ if projection and any('.' in name for name in projection):
+ self._nested_paths = self._resolve_dotted_paths(projection)
+ else:
+ self._nested_paths = None
return self
def with_limit(self, limit: int) -> 'ReadBuilder':
@@ -60,19 +80,83 @@ class ReadBuilder:
return TableRead(
table=self.table,
predicate=self._predicate,
- read_type=self.read_type()
+ read_type=self.read_type(),
+ nested_name_paths=self._nested_name_paths(),
)
+ def _nested_name_paths(self) -> Optional[List[List[str]]]:
+ """Resolve the current nested-projection state into a parallel list
+ of name paths against the underlying table schema. Returns ``None``
+ if the user only requested top-level projection (or no projection).
+ """
+ if not self._nested_paths:
+ return None
+ table_fields = self.table.fields
+ if self.table.options.row_tracking_enabled():
+ table_fields =
SpecialFields.row_type_with_row_tracking(table_fields)
+ return Projection.of(self._nested_paths).to_name_paths(table_fields)
+
def new_predicate_builder(self) -> PredicateBuilder:
return PredicateBuilder(self.read_type())
def read_type(self) -> List[DataField]:
table_fields = self.table.fields
- if not self._projection:
+ if not self._projection and not self._nested_paths:
return table_fields
- else:
- if self.table.options.row_tracking_enabled():
- table_fields =
SpecialFields.row_type_with_row_tracking(table_fields)
- field_map = {field.name: field for field in table_fields}
- return [field_map[name] for name in self._projection if name in
field_map]
+
+ if self.table.options.row_tracking_enabled():
+ table_fields =
SpecialFields.row_type_with_row_tracking(table_fields)
+
+ if self._nested_paths:
+ return Projection.of(self._nested_paths).project(table_fields)
+
+ field_map = {field.name: field for field in table_fields}
+ return [field_map[name] for name in self._projection if name in
field_map]
+
+ # ------------------------------------------------------------------
+ # Helpers
+ # ------------------------------------------------------------------
+
+ def _resolve_dotted_paths(self, names: List[str]) -> List[List[int]]:
+ """Translate dotted-name projection entries into integer paths
+ against the current table schema. Names without dots produce
+ length-1 paths.
+ """
+ table_fields = self.table.fields
+ if self.table.options.row_tracking_enabled():
+ table_fields =
SpecialFields.row_type_with_row_tracking(table_fields)
+ top_index = {f.name: i for i, f in enumerate(table_fields)}
+
+ paths: List[List[int]] = []
+ for name in names:
+ if '.' not in name:
+ if name not in top_index:
+ # Silently skip unknown names — preserves the
+ # pre-existing contract from the plain top-level path.
+ continue
+ paths.append([top_index[name]])
+ continue
+ parts = name.split('.')
+ top = parts[0]
+ if top not in top_index:
+ continue
+ path = [top_index[top]]
+ current_field = table_fields[path[0]]
+ ok = True
+ for part in parts[1:]:
+ if not _is_row_type(current_field.type):
+ ok = False
+ break
+ child_fields = current_field.type.fields
+ child_idx = next(
+ (i for i, f in enumerate(child_fields) if f.name == part),
+ -1)
+ if child_idx < 0:
+ ok = False
+ break
+ path.append(child_idx)
+ current_field = child_fields[child_idx]
+ if ok:
+ paths.append(path)
+ return paths
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 5dd2738944..1fe41468e8 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -35,7 +35,8 @@ class FormatAvroReader(RecordBatchReader):
"""
def __init__(self, file_io: FileIO, file_path: str, read_fields:
List[str], full_fields: List[DataField],
- push_down_predicate: Any, batch_size: int = 1024):
+ push_down_predicate: Any, batch_size: int = 1024,
+ nested_name_paths: Optional[List[List[str]]] = None):
file_path_for_io = file_io.to_filesystem_path(file_path)
self._file = file_io.filesystem.open_input_file(file_path_for_io)
self._avro_reader = fastavro.reader(self._file)
@@ -47,13 +48,30 @@ class FormatAvroReader(RecordBatchReader):
projected_data_fields = [full_fields_map[name] for name in read_fields]
self._schema =
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
+ # ``nested_name_paths`` is parallel to ``read_fields``. Top-level
+ # entries are length-1 paths and use the existing ``record.get``
+ # fast path; longer paths walk the record dict step-by-step. The
+ # path's first segment must be a real top-level Avro field —
+ # ``_get_fields_and_predicate`` upstream guarantees this.
+ if nested_name_paths is not None and len(nested_name_paths) !=
len(read_fields):
+ raise ValueError(
+ "nested_name_paths length {} does not match read_fields length
{}".format(
+ len(nested_name_paths), len(read_fields)))
+ self._nested_name_paths = nested_name_paths
+ self._has_nested = bool(
+ nested_name_paths and any(len(p) > 1 for p in nested_name_paths))
+
def read_arrow_batch(self) -> Optional[RecordBatch]:
pydict_data = {name: [] for name in self._fields}
records_in_batch = 0
for record in self._avro_reader:
- for col_name in self._fields:
- pydict_data[col_name].append(record.get(col_name))
+ if self._has_nested:
+ for col_name, path in zip(self._fields,
self._nested_name_paths):
+ pydict_data[col_name].append(_walk_avro_record(record,
path))
+ else:
+ for col_name in self._fields:
+ pydict_data[col_name].append(record.get(col_name))
records_in_batch += 1
if records_in_batch >= self._batch_size:
break
@@ -76,3 +94,20 @@ class FormatAvroReader(RecordBatchReader):
if self._file:
self._file.close()
self._file = None
+
+
+def _walk_avro_record(record, path: List[str]):
+ """Walk a list of field names through an avro record dict, returning
+ the leaf value or ``None`` if any segment is missing or hits a
+ non-dict value. ``record`` is the top-level fastavro dict; nested
+ record fields surface as nested dicts.
+ """
+ current = record
+ for name in path:
+ if current is None:
+ return None
+ if isinstance(current, dict):
+ current = current.get(name)
+ continue
+ return None
+ return current
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index ddfb368c88..0c12d27e4f 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -48,33 +48,81 @@ class FormatPyArrowReader(RecordBatchReader):
def __init__(self, file_io: FileIO, file_format: str, file_path: str,
read_fields: List[DataField],
push_down_predicate: Any, batch_size: int = 1024,
- options: CoreOptions = None):
+ options: CoreOptions = None,
+ nested_name_paths: Optional[List[List[str]]] = None):
file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format,
filesystem=file_io.filesystem)
self._file_format = file_format
self.read_fields = read_fields
self._read_field_names = [f.name for f in read_fields]
- # Identify which fields exist in the file and which are missing
+ # ``nested_name_paths`` is parallel to ``read_fields``; when
+ # any path has length > 1 the scanner is invoked with a
+ # ``{flat_name: ds.field(*path)}`` column dict.
+ if nested_name_paths is not None and len(nested_name_paths) !=
len(read_fields):
+ raise ValueError(
+ "nested_name_paths length {} does not match read_fields length
{}".format(
+ len(nested_name_paths), len(read_fields)))
+ self._nested_name_paths = nested_name_paths
+ has_nested_path = bool(
+ nested_name_paths and any(len(p) > 1 for p in nested_name_paths))
+
+ # Identify which fields exist in the file and which are missing.
+ # For nested projection, "exists" is determined by walking the
+ # whole path against the file schema; sub-field schema evolution
+ # (a leaf renamed or removed) shows up as ``missing`` and is
+ # served as a NULL column, mirroring the top-level handling.
file_schema = self.dataset.schema
- file_schema_names = set(file_schema.names)
- self.existing_fields = [f.name for f in read_fields if f.name in
file_schema_names]
- self.missing_fields = [f.name for f in read_fields if f.name not in
file_schema_names]
-
- # column name → VariantSchema for shredded columns that need assembly
+ if has_nested_path:
+ self.existing_fields = []
+ self.missing_fields = []
+ for f, path in zip(read_fields, nested_name_paths):
+ if _path_exists_in_arrow_schema(file_schema, path):
+ self.existing_fields.append(f.name)
+ else:
+ self.missing_fields.append(f.name)
+ else:
+ file_schema_names = set(file_schema.names)
+ self.existing_fields = [f.name for f in read_fields if f.name in
file_schema_names]
+ self.missing_fields = [f.name for f in read_fields if f.name not
in file_schema_names]
+
+ # column name → VariantSchema for shredded columns that need assembly.
+ # In nested mode we still want to reassemble shredded VARIANTs
+ # that were projected at the top level — only the columns actually
+ # reached via a length>1 path are skipped (those are sub-fields of
+ # some other struct, not VARIANTs themselves).
self._shredded_schemas: Dict[str, VariantSchema] = {}
if options is None or options.variant_shredding_enabled():
+ top_level_names = set(file_schema.names)
for name in self.existing_fields:
+ if name not in top_level_names:
+ continue
field_type = file_schema.field(name).type
if is_shredded_variant(field_type):
self._shredded_schemas[name] =
build_variant_schema(field_type)
- # Only pass existing fields to PyArrow scanner to avoid errors
- self.reader = self.dataset.scanner(
- columns=self.existing_fields,
- filter=push_down_predicate,
- batch_size=batch_size
- ).to_reader()
+ if has_nested_path:
+ # Dict-form columns let PyArrow read leaf fields out of nested
+ # structs without materialising the parent. The dict keys
+ # become the output column names — they're already flattened
+ # to ``a_b`` form by the upstream projection utility.
+ existing_set = set(self.existing_fields)
+ columns_dict = {}
+ for f, path in zip(read_fields, nested_name_paths):
+ if f.name in existing_set:
+ columns_dict[f.name] = ds.field(*path)
+ self.reader = self.dataset.scanner(
+ columns=columns_dict,
+ filter=push_down_predicate,
+ batch_size=batch_size
+ ).to_reader()
+ else:
+ # Only pass existing fields to PyArrow scanner to avoid errors
+ self.reader = self.dataset.scanner(
+ columns=self.existing_fields,
+ filter=push_down_predicate,
+ batch_size=batch_size
+ ).to_reader()
self._output_schema = (
PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields
else None
@@ -169,3 +217,25 @@ class FormatPyArrowReader(RecordBatchReader):
def close(self):
if self.reader is not None:
self.reader = None
+
+
+def _path_exists_in_arrow_schema(schema: pa.Schema, path: List[str]) -> bool:
+ """Walk ``path`` (a list of field names) through a PyArrow schema and
+ return whether every step exists. The first step is a top-level field
+ name; subsequent steps are struct child names. Missing leaves at any
+ depth (e.g. a renamed sub-field) yield ``False`` so the caller can
+ fall back to a NULL column instead of raising during scan setup.
+ """
+ if not path:
+ return False
+ if path[0] not in schema.names:
+ return False
+ current_type = schema.field(path[0]).type
+ for name in path[1:]:
+ if not pa.types.is_struct(current_type):
+ return False
+ idx = current_type.get_field_index(name)
+ if idx < 0:
+ return False
+ current_type = current_type.field(idx).type
+ return True
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index c88f49f3b0..b68d2f9e36 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
import os
from abc import ABC, abstractmethod
from functools import partial
-from typing import Callable, List, Optional, Tuple
+from typing import Callable, Dict, List, Optional, Tuple
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
@@ -89,7 +89,8 @@ class SplitRead(ABC):
predicate: Optional[Predicate],
read_type: List[DataField],
split: Split,
- row_tracking_enabled: bool):
+ row_tracking_enabled: bool,
+ nested_name_paths: Optional[List[List[str]]] = None):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
@@ -98,6 +99,9 @@ class SplitRead(ABC):
self.split = split
self.row_tracking_enabled = row_tracking_enabled
self.value_arity = len(read_type)
+ # Parallel to ``read_type``; each entry is the original-schema
+ # name path. ``None`` when no projection or top-level only.
+ self.nested_name_paths = nested_name_paths
self.trimmed_primary_key = self.table.trimmed_primary_keys
self.read_fields = read_type
@@ -127,6 +131,21 @@ class SplitRead(ABC):
else:
self.predicate_for_reader = None
+ def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
+ """``{flat_name: original_path}`` for the rows of ``read_type``
+ that go through a nested path (``len > 1``). ``None`` when no
+ such path exists, so callers stay on the top-level fast path.
+ """
+ if not self.nested_name_paths:
+ return None
+ if not any(len(p) > 1 for p in self.nested_name_paths):
+ return None
+ out: Dict[str, List[str]] = {}
+ for f, path in zip(self.read_fields[:self.value_arity],
+ self.nested_name_paths):
+ out[f.name] = path
+ return out
+
def _push_down_predicate(self) -> Optional[Predicate]:
if self.predicate is None:
return None
@@ -168,22 +187,46 @@ class SplitRead(ABC):
end = r.to - file.first_row_id
row_indices.extend(range(start, end + 1))
+ # Map the parallel ``self.nested_name_paths`` (aligned with
+ # ``self.read_fields``) into the same order the format reader
+ # will see, indexed by output column name. Set to ``None`` when
+ # no path has length > 1 so the reader stays on its top-level
+ # fast path.
+ nested_path_by_name = self._nested_path_by_name()
+ has_nested = nested_path_by_name is not None
+
format_reader: RecordBatchReader
if file_format == CoreOptions.FILE_FORMAT_AVRO:
+ avro_nested_paths = (
+ [nested_path_by_name[name] for name in read_file_fields]
+ if has_nested else None
+ )
format_reader = FormatAvroReader(self.table.file_io, file_path,
read_file_fields,
- self.read_fields,
read_arrow_predicate, batch_size=batch_size)
+ self.read_fields,
read_arrow_predicate, batch_size=batch_size,
+
nested_name_paths=avro_nested_paths)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
+ if has_nested:
+ raise NotImplementedError(
+ "Nested-field projection is not supported on BLOB files")
blob_as_descriptor =
CoreOptions.blob_as_descriptor(self.table.options)
format_reader = FormatBlobReader(self.table.file_io, file_path,
read_file_fields,
self.read_fields,
read_arrow_predicate, blob_as_descriptor,
batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_LANCE:
+ if has_nested:
+ # Lance has no nested column pruning today; project the
+ # parent struct in full and extract sub-fields client-side.
+ raise NotImplementedError(
+ "Nested-field projection is not supported on Lance files")
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
format_reader = FormatLanceReader(self.table.file_io, file_path,
ordered_read_fields,
read_arrow_predicate,
batch_size=batch_size,
row_range=row_range,
row_indices=row_indices)
elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
+ if has_nested:
+ raise NotImplementedError(
+ "Nested-field projection is not supported on Vortex files")
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
predicate_fields = _get_all_fields(self.push_down_predicate) if
self.push_down_predicate else set()
@@ -194,10 +237,15 @@ class SplitRead(ABC):
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields
if n in name_to_field]
+ ordered_nested_paths = (
+ [nested_path_by_name[f.name] for f in ordered_read_fields]
+ if has_nested else None
+ )
format_reader = FormatPyArrowReader(
self.table.file_io, file_format, file_path,
ordered_read_fields, read_arrow_predicate,
batch_size=batch_size,
- options=self.table.options)
+ options=self.table.options,
+ nested_name_paths=ordered_nested_paths)
elif file_format in ('json', 'csv'):
raise NotImplementedError(
f"Reading '{file_format}' format is not yet supported in
Python SDK. "
@@ -251,6 +299,10 @@ class SplitRead(ABC):
return reader
def _get_fields_and_predicate(self, schema_id: int, read_fields):
+ # In nested mode the flat name (``mv_latest_version``) never
+ # appears in the file schema; reachability is decided by the
+ # path's top-level entry instead.
+ nested_path_by_name = self._nested_path_by_name()
key = (schema_id, tuple(read_fields))
if key not in self.schema_id_2_fields:
schema = self.table.schema_manager.get_schema(schema_id)
@@ -262,7 +314,20 @@ class SplitRead(ABC):
if self.table.is_primary_key_table:
schema_field_names.add('_SEQUENCE_NUMBER')
schema_field_names.add('_VALUE_KIND')
- read_file_fields = [read_field for read_field in read_fields if
read_field in schema_field_names]
+
+ def _is_reachable(name: str) -> bool:
+ if name in schema_field_names:
+ return True
+ if nested_path_by_name is not None:
+ path = nested_path_by_name.get(name)
+ if path:
+ return path[0] in schema_field_names
+ return False
+
+ read_file_fields = [
+ read_field for read_field in read_fields
+ if _is_reachable(read_field)
+ ]
read_predicate =
trim_predicate_by_fields(self.push_down_predicate, read_file_fields)
read_arrow_predicate = read_predicate.to_arrow() if read_predicate
else None
self.schema_id_2_fields[key] = (read_file_fields,
read_arrow_predicate)
@@ -300,6 +365,10 @@ class SplitRead(ABC):
return all_data_fields
def create_index_mapping(self):
+ if self._nested_path_by_name() is not None:
+ # Format reader already emits flat columns aligned with
+ # ``read_fields``; the id-based remap can't see leaf IDs.
+ return None
base_index_mapping = self._create_base_index_mapping(self.read_fields,
self._get_read_data_fields())
trimmed_key_mapping, _ =
self._get_trimmed_fields(self._get_read_data_fields(),
self._get_all_data_fields())
if base_index_mapping is None:
@@ -341,6 +410,10 @@ class SplitRead(ABC):
return None
def _get_final_read_data_fields(self) -> List[str]:
+ if self._nested_path_by_name() is not None:
+ # Trimmed-fields filters by ID and drops nested leaves;
+ # hand the format reader the user-facing flat names directly.
+ return self._remove_partition_fields(list(self.read_fields))
_, trimmed_fields = self._get_trimmed_fields(
self._get_read_data_fields(), self._get_all_data_fields()
)
@@ -395,6 +468,23 @@ class SplitRead(ABC):
return PartitionInfo(partition_mapping, self.split.partition)
def _construct_partition_mapping(self) -> List[int]:
+ if self._nested_path_by_name() is not None:
+ # Nested fields carry leaf IDs that don't match top-level
+ # data-field IDs, so the trimmed-fields machinery can't see
+ # them. Build the mapping directly from ``self.read_fields``:
+ # entries whose flat name equals a partition key get a
+ # negative index, the rest get a sequential read index.
+ partition_names = self.table.partition_keys
+ mapping = [0] * (len(self.read_fields) + 1)
+ p_count = 0
+ for i, field in enumerate(self.read_fields):
+ if field.name in partition_names:
+ partition_index = partition_names.index(field.name)
+ mapping[i] = -(partition_index + 1)
+ p_count += 1
+ else:
+ mapping[i] = (i - p_count) + 1
+ return mapping
_, trimmed_fields = self._get_trimmed_fields(
self._get_read_data_fields(), self._get_all_data_fields()
)
@@ -537,13 +627,17 @@ class DataEvolutionSplitRead(SplitRead):
predicate: Optional[Predicate],
read_type: List[DataField],
split: Split,
- row_tracking_enabled: bool):
+ row_tracking_enabled: bool,
+ nested_name_paths: Optional[List[List[str]]] = None):
self.row_ranges = None
actual_split = split
if isinstance(split, IndexedSplit):
self.row_ranges = split.row_ranges()
actual_split = split.data_split()
- super().__init__(table, predicate, read_type, actual_split,
row_tracking_enabled)
+ super().__init__(
+ table, predicate, read_type, actual_split, row_tracking_enabled,
+ nested_name_paths=nested_name_paths,
+ )
def _push_down_predicate(self) -> Optional[Predicate]:
# Data evolution: files may have different schemas, so we don't push
predicate
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 40cc337aaa..4f53e49126 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -40,7 +40,8 @@ class TableRead:
table,
predicate: Optional[Predicate],
read_type: List[DataField],
- include_row_kind: bool = False
+ include_row_kind: bool = False,
+ nested_name_paths: Optional[List[List[str]]] = None,
):
from pypaimon.table.file_store_table import FileStoreTable
@@ -48,6 +49,10 @@ class TableRead:
self.predicate = predicate
self.read_type = read_type
self.include_row_kind = include_row_kind
+ # Parallel to ``read_type``; each entry is the original-schema
+ # name path for the field. ``None`` when no projection (or only
+ # top-level projection) is set.
+ self.nested_name_paths = nested_name_paths
def to_iterator(self, splits: List[Split]) -> Iterator:
def _record_generator():
@@ -268,6 +273,15 @@ class TableRead:
def _create_split_read(self, split: Split) -> SplitRead:
if self.table.is_primary_key_table and not split.raw_convertible:
+ if self.nested_name_paths and any(
+ len(p) > 1 for p in self.nested_name_paths):
+ # The merge function needs full parent structs; outer
+ # projection that walks the path to recover leaves is a
+ # separate change. Project the parent struct and extract
+ # client-side until then.
+ raise NotImplementedError(
+ "Nested-field projection on primary-key tables that "
+ "require a merge read is not yet supported")
return MergeFileSplitRead(
table=self.table,
predicate=self.predicate,
@@ -276,12 +290,23 @@ class TableRead:
row_tracking_enabled=False
)
elif self.table.options.data_evolution_enabled():
+ if self.nested_name_paths and any(
+ len(p) > 1 for p in self.nested_name_paths):
+ # Multi-file union for data-evolution tables matches files
+ # by top-level field ID; a nested ``read_field`` carries
+ # its leaf ID, which never matches and would silently
+ # produce all-NULL columns. Refuse loudly until the
+ # union path is taught to walk paths.
+ raise NotImplementedError(
+ "Nested-field projection on data-evolution tables is "
+ "not yet supported")
return DataEvolutionSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
split=split,
- row_tracking_enabled=True
+ row_tracking_enabled=True,
+ nested_name_paths=self.nested_name_paths,
)
else:
return RawFileSplitRead(
@@ -289,7 +314,8 @@ class TableRead:
predicate=self.predicate,
read_type=self.read_type,
split=split,
- row_tracking_enabled=self.table.options.row_tracking_enabled()
+ row_tracking_enabled=self.table.options.row_tracking_enabled(),
+ nested_name_paths=self.nested_name_paths,
)
@staticmethod
diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
new file mode 100644
index 0000000000..87ab3935f4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
@@ -0,0 +1,210 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class _AppendOnlyNestedBase(unittest.TestCase):
+ """Append-only table whose ``mv`` column is a nested struct, used to
+ exercise file-level Parquet/ORC pushdown of nested projection."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+
+ cls.pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('mv', pa.struct([
+ ('latest_version', pa.int64()),
+ ('latest_value', pa.string()),
+ ])),
+ ('val', pa.string()),
+ ])
+ cls.rows = [
+ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'},
'val': 'x'},
+ {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'},
'val': 'y'},
+ {'id': 3, 'mv': {'latest_version': 300, 'latest_value': 'c'},
'val': 'z'},
+ ]
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_table(self, name: str, file_format: str = 'parquet'):
+ identifier = 'default.{}'.format(name)
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ options={'bucket': '-1', 'file.format': file_format},
+ )
+ self.catalog.create_table(identifier, schema, False)
+ table = self.catalog.get_table(identifier)
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(pa.Table.from_pylist(self.rows, schema=self.pa_schema))
+ wb.new_commit().commit(w.prepare_commit())
+ w.close()
+ return table
+
+
+class AppendOnlyNestedParquetTest(_AppendOnlyNestedBase):
+ """Parquet path uses PyArrow's dict-form scanner with ``ds.field(*path)``
+ to push the nested column read into the engine."""
+
+ def test_dotted_name_returns_just_the_leaf(self):
+ table = self._create_table('ao_dotted_leaf')
+ rb = table.new_read_builder().with_projection(['mv.latest_version'])
+ got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+ self.assertEqual(
+ got,
+ [{'mv_latest_version': 100},
+ {'mv_latest_version': 200},
+ {'mv_latest_version': 300}])
+
+ def test_mixed_nested_and_top_level_preserves_order(self):
+ table = self._create_table('ao_mixed_order')
+ rb = table.new_read_builder().with_projection(
+ ['mv.latest_version', 'val', 'mv.latest_value'])
+ got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+ self.assertEqual(
+ got,
+ [{'mv_latest_version': 100, 'val': 'x', 'mv_latest_value': 'a'},
+ {'mv_latest_version': 200, 'val': 'y', 'mv_latest_value': 'b'},
+ {'mv_latest_version': 300, 'val': 'z', 'mv_latest_value': 'c'}])
+
+ def test_top_level_only_projection_unchanged(self):
+ """A projection without dots must keep the existing top-level
+ path — file-level pushdown still asks for plain column names,
+ no dict-form scanner."""
+ table = self._create_table('ao_top_level_unchanged')
+ rb = table.new_read_builder().with_projection(['val', 'id'])
+ got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+ self.assertEqual(
+ got,
+ [{'val': 'x', 'id': 1},
+ {'val': 'y', 'id': 2},
+ {'val': 'z', 'id': 3}])
+
+ def test_partitioned_table_with_nested_projection(self):
+ """Partition-aware reads have a separate path-mapping helper from
+ the non-partitioned fast path; regress the case where it dropped
+ non-nested top-level columns alongside the projected leaf."""
+ identifier = 'default.ao_partitioned'
+ pa_schema = pa.schema([
+ ('part', pa.string()),
+ ('id', pa.int64()),
+ ('mv', pa.struct([
+ ('latest_version', pa.int64()),
+ ('latest_value', pa.string()),
+ ])),
+ ('val', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ partition_keys=['part'],
+ options={'bucket': '-1', 'file.format': 'parquet'},
+ )
+ self.catalog.create_table(identifier, schema, False)
+ table = self.catalog.get_table(identifier)
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(pa.Table.from_pylist([
+ {'part': 'A', 'id': 1, 'mv': {'latest_version': 100,
'latest_value': 'a'}, 'val': 'x'},
+ {'part': 'B', 'id': 2, 'mv': {'latest_version': 200,
'latest_value': 'b'}, 'val': 'y'},
+ ], schema=pa_schema))
+ wb.new_commit().commit(w.prepare_commit())
+ w.close()
+
+ # Mixed projection: nested leaf, a non-partition top-level column,
+ # and the partition column itself.
+ rb = table.new_read_builder().with_projection(
+ ['part', 'mv.latest_version', 'val'])
+ got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+ got_sorted = sorted(got, key=lambda r: r['part'])
+ self.assertEqual(
+ got_sorted,
+ [{'part': 'A', 'mv_latest_version': 100, 'val': 'x'},
+ {'part': 'B', 'mv_latest_version': 200, 'val': 'y'}])
+
+ def test_avro_nested_projection_python_fallback(self):
+ """Avro has no native nested column pruning; the reader walks
+ each fastavro record dict by path and assembles the column
+ client-side."""
+ table = self._create_table('ao_avro_nested', file_format='avro')
+ rb = table.new_read_builder().with_projection(['mv.latest_version',
'val'])
+ got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+ self.assertEqual(
+ got,
+ [{'mv_latest_version': 100, 'val': 'x'},
+ {'mv_latest_version': 200, 'val': 'y'},
+ {'mv_latest_version': 300, 'val': 'z'}])
+
+ def test_avro_top_level_projection_unchanged(self):
+ """Top-level-only projection on Avro stays on the existing
+ ``record.get(name)`` fast path."""
+ table = self._create_table('ao_avro_top', file_format='avro')
+ rb = table.new_read_builder().with_projection(['val', 'id'])
+ got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+ self.assertEqual(
+ got,
+ [{'val': 'x', 'id': 1},
+ {'val': 'y', 'id': 2},
+ {'val': 'z', 'id': 3}])
+
+ def test_pk_table_merge_split_with_nested_projection_raises(self):
+ # Phase 2b lands the append-only path only; PK + nested needs an
+ # outer-projection wrapper that ships in a follow-up commit. Until
+ # then, the call must refuse loudly rather than silently corrupt
+ # the merge function input. Two commits on the same PK force the
+ # split out of the raw-convertible fast path into the merge
+ # reader, which is where the guard lives.
+ identifier = 'default.pk_nested_unsupported'
+ schema = Schema.from_pyarrow_schema(
+ self.pa_schema,
+ primary_keys=['id'],
+ options={'bucket': '1', 'file.format': 'parquet'},
+ )
+ self.catalog.create_table(identifier, schema, False)
+ table = self.catalog.get_table(identifier)
+ for batch in (self.rows, self.rows): # two overlapping commits
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ w.write_arrow(pa.Table.from_pylist(batch, schema=self.pa_schema))
+ wb.new_commit().commit(w.prepare_commit())
+ w.close()
+
+ rb = table.new_read_builder().with_projection(['mv.latest_version'])
+ splits = rb.new_scan().plan().splits()
+ # ``to_arrow`` materialises the split read; the merge path is what
+ # raises, so do it eagerly here rather than waiting for the first
+ # batch.
+ with self.assertRaises(NotImplementedError):
+ rb.new_read().to_arrow(splits)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_projection_utility.py
b/paimon-python/pypaimon/tests/test_projection_utility.py
new file mode 100644
index 0000000000..4d1c43eb16
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_projection_utility.py
@@ -0,0 +1,205 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pypaimon.schema.data_types import AtomicType, DataField, RowType
+from pypaimon.utils.projection import (NestedProjection, Projection,
+ TopLevelProjection)
+
+
+def _atomic(idx: int, name: str, type_name: str = 'BIGINT') -> DataField:
+ return DataField(idx, name, AtomicType(type_name))
+
+
+def _struct(idx: int, name: str, sub_fields) -> DataField:
+ return DataField(idx, name, RowType(False, list(sub_fields)))
+
+
+def _three_top_fields():
+ """schema: [pk: BIGINT, mv: ROW<latest_version BIGINT, latest_value
STRING>, val: STRING]"""
+ return [
+ _atomic(1, 'pk'),
+ _struct(2, 'mv', [
+ _atomic(10, 'latest_version'),
+ _atomic(11, 'latest_value', 'STRING'),
+ ]),
+ _atomic(3, 'val', 'STRING'),
+ ]
+
+
+class TopLevelProjectionTest(unittest.TestCase):
+
+ def test_factory_produces_top_level(self):
+ p = Projection.of([2, 0])
+ self.assertIsInstance(p, TopLevelProjection)
+ self.assertFalse(p.is_nested())
+
+ def test_indexes_round_trip(self):
+ p = Projection.of([2, 0, 1])
+ self.assertEqual(p.to_top_level_indexes(), [2, 0, 1])
+ # nested form lifts each top-level index into a singleton path
+ self.assertEqual(p.to_nested_indexes(), [[2], [0], [1]])
+
+ def test_project_picks_fields_in_order(self):
+ fields = _three_top_fields()
+ res = Projection.of([2, 0]).project(fields)
+ self.assertEqual([f.name for f in res], ['val', 'pk'])
+ self.assertEqual([f.id for f in res], [3, 1])
+
+ def test_to_name_paths(self):
+ fields = _three_top_fields()
+ names = Projection.of([2, 0]).to_name_paths(fields)
+ self.assertEqual(names, [['val'], ['pk']])
+
+ def test_range_factory(self):
+ p = Projection.range(1, 4)
+ self.assertEqual(p.to_top_level_indexes(), [1, 2, 3])
+
+ def test_range_zero_or_negative_returns_empty(self):
+ self.assertFalse(Projection.range(2, 2).is_nested())
+ self.assertEqual(Projection.range(2, 2).to_top_level_indexes(), [])
+ self.assertEqual(Projection.range(5, 1).to_top_level_indexes(), [])
+
+
+class NestedProjectionTest(unittest.TestCase):
+
+ def test_factory_produces_nested(self):
+ p = Projection.of([[1, 0], [1, 1]])
+ self.assertIsInstance(p, NestedProjection)
+ self.assertTrue(p.is_nested())
+
+ def test_singleton_paths_reported_not_nested(self):
+ # paths of length 1 only — observable behaviour matches top level.
+ p = Projection.of([[2], [0]])
+ self.assertIsInstance(p, NestedProjection)
+ self.assertFalse(p.is_nested())
+
+ def test_top_level_indexes_dedup_in_path_order(self):
+ p = Projection.of([[1, 0], [1, 1], [0]])
+ self.assertEqual(p.to_top_level_indexes(), [1, 0])
+
+ def test_nested_indexes_round_trip(self):
+ p = Projection.of([[1, 0], [1, 1]])
+ self.assertEqual(p.to_nested_indexes(), [[1, 0], [1, 1]])
+
+ def test_to_name_paths_walks_into_struct(self):
+ fields = _three_top_fields()
+ names = Projection.of([[1, 0], [1, 1], [0]]).to_name_paths(fields)
+ self.assertEqual(
+ names,
+ [['mv', 'latest_version'], ['mv', 'latest_value'], ['pk']])
+
+ def test_project_flattens_with_underscore_join(self):
+ fields = _three_top_fields()
+ res = Projection.of([[1, 0], [1, 1], [0]]).project(fields)
+ self.assertEqual(
+ [f.name for f in res], ['mv_latest_version', 'mv_latest_value',
'pk'])
+
+ def test_project_preserves_leaf_field_id(self):
+ # Schema-evolution remapping is by field ID, so flattened nested
+ # fields must inherit the leaf's ID — not the parent struct's.
+ fields = _three_top_fields()
+ res = Projection.of([[1, 0], [1, 1]]).project(fields)
+ self.assertEqual([f.id for f in res], [10, 11])
+
+ def test_collision_dedup_via_dollar_suffix(self):
+ # Two leaves under different parents with the same final name.
+ sub_a = _atomic(20, 'x')
+ sub_b = _atomic(21, 'x')
+ fields = [
+ _struct(1, 'a', [sub_a]),
+ _struct(2, 'b', [sub_b]),
+ ]
+ # path [0,0] -> 'a_x', path [1,0] -> 'b_x' (no collision yet).
+ res = Projection.of([[0, 0], [1, 0]]).project(fields)
+ self.assertEqual([f.name for f in res], ['a_x', 'b_x'])
+
+ # When two collapse to the SAME name, the second gets `_$N`.
+ # Build two parents whose leaves have the same compound name.
+ sub_x_only = _atomic(30, 'x')
+ fields2 = [
+ _struct(1, 'a', [sub_x_only]),
+ _atomic(2, 'a_x'), # plain top-level already named 'a_x'.
+ ]
+ res2 = Projection.of([[0, 0], [1]]).project(fields2)
+ # First path produces 'a_x'; second is already-existing 'a_x'.
+ # Collision → suffix on the second.
+ self.assertEqual(res2[0].name, 'a_x')
+ self.assertTrue(res2[1].name.startswith('a_x_$'))
+
+ def test_project_rejects_non_row_step(self):
+ # Trying to walk into an atomic field must fail loudly.
+ fields = _three_top_fields()
+ with self.assertRaises(ValueError):
+ Projection.of([[0, 0]]).project(fields)
+
+ def test_constructor_rejects_empty_paths_list(self):
+ with self.assertRaises(ValueError):
+ NestedProjection([])
+
+ def test_constructor_rejects_zero_length_path(self):
+ with self.assertRaises(ValueError):
+ NestedProjection([[]])
+
+ def test_dup_count_is_monotonic_across_distinct_collisions(self):
+ # ``_$N`` is a per-call monotonic counter — distinct collisions
+ # share the suffix space, they don't each restart at 0.
+ sub_x_1 = _atomic(20, 'x')
+ sub_x_2 = _atomic(21, 'x')
+ sub_y_1 = _atomic(30, 'y')
+ sub_y_2 = _atomic(31, 'y')
+ fields = [
+ _atomic(1, 'a_x'),
+ _struct(2, 'a', [sub_x_1]),
+ _atomic(3, 'a_y'),
+ _struct(4, 'a', [sub_y_1]), # collides via path [3, 0] → a_y
+ _struct(5, 'a', [sub_x_2, sub_y_2]), # second a.x collision
+ ]
+ # Order: a_x (top) → keeps; [1, 0] flatten → a_x (collision) → a_x_$0
+ # a_y (top) → keeps; [3, 0] flatten → a_y (collision) → a_y_$1
+ res = Projection.of(
+ [[0], [1, 0], [2], [3, 0]]).project(fields)
+ self.assertEqual(
+ [f.name for f in res], ['a_x', 'a_x_$0', 'a_y', 'a_y_$1'])
+
+ def test_of_rejects_mixed_int_and_path(self):
+ # Mixing top-level indexes and nested paths is a programming error;
+ # ``of`` should fail loudly at the call site instead of producing a
+ # broken projection that explodes downstream.
+ with self.assertRaises(TypeError):
+ Projection.of([1, [2, 3]])
+ with self.assertRaises(TypeError):
+ Projection.of([[1, 2], 3])
+
+
+class EmptyProjectionTest(unittest.TestCase):
+
+ def test_of_empty(self):
+ self.assertEqual(Projection.of([]).to_top_level_indexes(), [])
+ self.assertEqual(Projection.of([]).to_nested_indexes(), [])
+
+ def test_empty_factory(self):
+ p = Projection.empty()
+ self.assertEqual(p.project(_three_top_fields()), [])
+ self.assertFalse(p.is_nested())
+ self.assertEqual(p.to_name_paths(_three_top_fields()), [])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git
a/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py
b/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py
new file mode 100644
index 0000000000..a781f149fa
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py
@@ -0,0 +1,124 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class _ReadBuilderTestBase(unittest.TestCase):
+ """Build a primary-key table whose value column is a ROW so we can
+ exercise both top-level and nested projection paths against the
+ actual ``ReadBuilder`` API."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+
+ struct_type = pa.struct([
+ ('latest_version', pa.int64()),
+ ('latest_value', pa.string()),
+ ])
+ cls.pa_schema = pa.schema([
+ pa.field('pk', pa.int64(), nullable=False),
+ ('mv', struct_type),
+ ('val', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ cls.pa_schema, primary_keys=['pk'],
+ options={'bucket': '1', 'file.format': 'parquet'})
+ cls.catalog.create_table('default.rb_nested', schema, False)
+ cls.table = cls.catalog.get_table('default.rb_nested')
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+
+class ReadBuilderProjectionStateTest(_ReadBuilderTestBase):
+
+ def test_no_projection_returns_full_schema(self):
+ rb = self.table.new_read_builder()
+ fields = rb.read_type()
+ names = [f.name for f in fields]
+ self.assertEqual(names, ['pk', 'mv', 'val'])
+ # Without an explicit projection the read_type must NOT inject
+ # row-tracking system columns; the raw table fields are returned
+ # verbatim.
+ self.assertNotIn('_ROW_ID', names)
+ self.assertNotIn('_SEQUENCE_NUMBER', names)
+
+ def test_top_level_projection_unchanged(self):
+ rb = self.table.new_read_builder().with_projection(['val', 'pk'])
+ names = [f.name for f in rb.read_type()]
+ self.assertEqual(names, ['val', 'pk'])
+ # No nested paths derived; only names are stored.
+ self.assertIsNone(rb._nested_paths)
+
+ def test_dotted_name_resolves_to_nested_path(self):
+ rb = self.table.new_read_builder().with_projection(
+ ['mv.latest_version', 'pk'])
+ # _nested_paths is populated; user-facing names are kept on _projection
+ self.assertIsNotNone(rb._nested_paths)
+ self.assertEqual(rb._nested_paths, [[1, 0], [0]])
+ names = [f.name for f in rb.read_type()]
+ # Nested leaves get flattened to underscore-joined names.
+ self.assertEqual(names, ['mv_latest_version', 'pk'])
+
+ def test_dotted_name_unknown_top_silently_skipped(self):
+ rb = self.table.new_read_builder().with_projection(
+ ['nope.x', 'val'])
+ # Only 'val' resolved; the dot trigger still populates _nested_paths.
+ self.assertEqual(rb._nested_paths, [[2]])
+ names = [f.name for f in rb.read_type()]
+ self.assertEqual(names, ['val'])
+
+ def test_dotted_name_unknown_subfield_silently_skipped(self):
+ rb = self.table.new_read_builder().with_projection(
+ ['mv.no_such_subfield', 'pk'])
+ # The bad path drops out, the plain name survives.
+ self.assertEqual(rb._nested_paths, [[0]])
+ names = [f.name for f in rb.read_type()]
+ self.assertEqual(names, ['pk'])
+
+
+class ReadBuilderProjectionFieldIdTest(_ReadBuilderTestBase):
+
+ def test_nested_leaves_inherit_leaf_field_id(self):
+ rb = self.table.new_read_builder().with_projection(
+ ['mv.latest_version', 'mv.latest_value'])
+ leaf_ids = [f.id for f in rb.read_type()]
+ # Look up the actual leaf IDs from the table schema for assertion
+ mv_field = next(f for f in self.table.fields if f.name == 'mv')
+ sub_v = next(f for f in mv_field.type.fields
+ if f.name == 'latest_version')
+ sub_x = next(f for f in mv_field.type.fields
+ if f.name == 'latest_value')
+ self.assertEqual(leaf_ids, [sub_v.id, sub_x.id])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/utils/projection.py
b/paimon-python/pypaimon/utils/projection.py
new file mode 100644
index 0000000000..c87da7ac2c
--- /dev/null
+++ b/paimon-python/pypaimon/utils/projection.py
@@ -0,0 +1,267 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Column projection utilities.
+
+A projection maps a source row type to a flat list of ``DataField``: the
+columns the user wants to read. Two flavours:
+
+* :class:`TopLevelProjection` selects fields by their top-level index.
+* :class:`NestedProjection` accepts paths that walk into ROW children, e.g.
+ ``[[1, 0], [1, 2]]`` means "the 0th and 2nd children of the field at top
+ level index 1". The result is flattened into top-level fields whose
+ names are the underscore-joined original path (``a_b`` for ``a.b``,
+ with a ``_$N`` suffix on collisions) and whose IDs are inherited from
+ the leaf so schema-evolution remapping by field ID still works.
+"""
+
+from abc import ABC, abstractmethod
+from typing import List, Optional, Sequence
+
+from pypaimon.schema.data_types import DataField, RowType
+
+
+class Projection(ABC):
+ """Abstract base for column projection."""
+
+ @abstractmethod
+ def project(self, row_type) -> List[DataField]:
+ """Apply the projection and return the resulting flat fields."""
+
+ @abstractmethod
+ def is_nested(self) -> bool:
+ """Whether any path goes deeper than the top level."""
+
+ @abstractmethod
+ def to_top_level_indexes(self) -> List[int]:
+ """Top-level positions touched by this projection.
+
+ For nested projections, returns unique top-level indexes in path
+ order. Useful for fallback paths that can only push down at the
+ top level.
+ """
+
+ @abstractmethod
+ def to_nested_indexes(self) -> List[List[int]]:
+ """Return the projection as a list of paths, one per output field."""
+
+ @abstractmethod
+ def to_name_paths(self, row_type) -> List[List[str]]:
+ """Translate integer paths to field-name paths against ``row_type``.
+
+ For a path ``[1, 0]`` against a row whose top-level field at index 1
+ is a struct ``mv_col`` with sub-fields ``[LATEST_VERSION, ...]``,
+ returns ``[["mv_col", "LATEST_VERSION"], ...]``. Used by format
+ readers to push nested projection down to the underlying engine
+ (e.g. PyArrow's ``ds.field(*name_path)``).
+ """
+
+ # ------------------------------------------------------------------
+ # Factories
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def empty() -> "Projection":
+ """The empty projection: no columns selected."""
+ return _EmptyProjection()
+
+ @staticmethod
+ def of(indexes_or_paths) -> "Projection":
+ """Build a projection from either ``int[]`` or ``int[][]``.
+
+ Empty input returns :func:`empty`. The input must be uniformly
+ shaped — either all integers or all sequences of integers; mixing
+ the two raises ``TypeError`` so the failure is reported at the
+ ``of`` call site rather than as an opaque error deep in
+ ``project``.
+ """
+ if not indexes_or_paths:
+ return _EmptyProjection()
+ first_is_path = isinstance(indexes_or_paths[0], (list, tuple))
+ for entry in indexes_or_paths[1:]:
+ entry_is_path = isinstance(entry, (list, tuple))
+ if entry_is_path != first_is_path:
+ raise TypeError(
+ "Projection.of expects either all top-level indexes "
+ "or all nested paths; got a mix")
+ if first_is_path:
+ return NestedProjection([list(p) for p in indexes_or_paths])
+ return TopLevelProjection(list(indexes_or_paths))
+
+ @staticmethod
+ def range(start_inclusive: int, end_exclusive: int) -> "Projection":
+ """Top-level projection over a contiguous index range."""
+ if end_exclusive <= start_inclusive:
+ return _EmptyProjection()
+ return TopLevelProjection(list(range(start_inclusive, end_exclusive)))
+
+
+class _EmptyProjection(Projection):
+
+ def project(self, row_type) -> List[DataField]:
+ return []
+
+ def is_nested(self) -> bool:
+ return False
+
+ def to_top_level_indexes(self) -> List[int]:
+ return []
+
+ def to_nested_indexes(self) -> List[List[int]]:
+ return []
+
+ def to_name_paths(self, row_type) -> List[List[str]]:
+ return []
+
+
+class TopLevelProjection(Projection):
+ """Single-level projection: pick fields by their top-level index."""
+
+ def __init__(self, indexes: Sequence[int]):
+ self.indexes = list(indexes)
+
+ def project(self, row_type) -> List[DataField]:
+ fields = _row_fields(row_type)
+ return [fields[i] for i in self.indexes]
+
+ def is_nested(self) -> bool:
+ return False
+
+ def to_top_level_indexes(self) -> List[int]:
+ return list(self.indexes)
+
+ def to_nested_indexes(self) -> List[List[int]]:
+ return [[i] for i in self.indexes]
+
+ def to_name_paths(self, row_type) -> List[List[str]]:
+ fields = _row_fields(row_type)
+ return [[fields[i].name] for i in self.indexes]
+
+
+class NestedProjection(Projection):
+ """Projection over paths that may walk into ROW children.
+
+ Each path navigates from a top-level field through successive ROW
+ children. A path of length 1 is equivalent to a top-level selection.
+ """
+
+ def __init__(self, paths: Sequence[Sequence[int]]):
+ if not paths:
+ raise ValueError("NestedProjection requires at least one path")
+ self.paths = [list(p) for p in paths]
+ for p in self.paths:
+ if len(p) == 0:
+ raise ValueError(
+ "Each projection path must have at least one index")
+ self._has_nested = any(len(p) > 1 for p in self.paths)
+
+ def is_nested(self) -> bool:
+ return self._has_nested
+
+ def to_top_level_indexes(self) -> List[int]:
+ # Preserve order, deduplicate.
+ seen = set()
+ out: List[int] = []
+ for p in self.paths:
+ top = p[0]
+ if top not in seen:
+ seen.add(top)
+ out.append(top)
+ return out
+
+ def to_nested_indexes(self) -> List[List[int]]:
+ return [list(p) for p in self.paths]
+
+ def to_name_paths(self, row_type) -> List[List[str]]:
+ fields = _row_fields(row_type)
+ result: List[List[str]] = []
+ for path in self.paths:
+ field = fields[path[0]]
+ names = [field.name]
+ for idx in path[1:]:
+ child_type = field.type
+ if not _is_row_type(child_type):
+ raise ValueError(
+ "Nested projection step expected a ROW type but got %s
"
+ "for field '%s'" % (child_type, field.name))
+ child_fields = _row_fields(child_type)
+ field = child_fields[idx]
+ names.append(field.name)
+ result.append(names)
+ return result
+
+ def project(self, row_type) -> List[DataField]:
+ # ``dup_count`` is monotonic across the whole call: it increments
+ # whenever a name has to be suffixed, regardless of which base
+ # name caused the collision. The per-call counter only guarantees
+ # uniqueness, not that ``_$0`` always represents the first
+ # collision of any given base.
+ fields = _row_fields(row_type)
+ out: List[DataField] = []
+ seen_names = set()
+ dup_count = 0
+ for path in self.paths:
+ field = fields[path[0]]
+ name_parts = [field.name]
+ for idx in path[1:]:
+ child_type = field.type
+ if not _is_row_type(child_type):
+ raise ValueError(
+ "Nested projection step expected a ROW type but got %s
"
+ "for field '%s'" % (child_type, field.name))
+ child_fields = _row_fields(child_type)
+ field = child_fields[idx]
+ name_parts.append(field.name)
+ base_name = "_".join(name_parts)
+ final_name = base_name
+ while final_name in seen_names:
+ final_name = "%s_$%d" % (base_name, dup_count)
+ dup_count += 1
+ seen_names.add(final_name)
+ # Keep the leaf field's ID so downstream schema-evolution
+ # remapping by field ID still works after rename.
+ out.append(DataField(
+ id=field.id,
+ name=final_name,
+ type=field.type,
+ description=getattr(field, 'description', None),
+ default_value=getattr(field, 'default_value', None),
+ ))
+ return out
+
+
+def _row_fields(row_type) -> List[DataField]:
+ """Return the field list of a row-like type. Accepts a RowType, a plain
+ list of DataField, or anything else with a ``.fields`` attribute.
+ """
+ if isinstance(row_type, list):
+ return row_type
+ fields: Optional[List[DataField]] = getattr(row_type, 'fields', None)
+ if fields is None:
+ raise ValueError(
+ "Projection target must be a RowType or have a .fields attribute, "
+ "got %s" % type(row_type).__name__)
+ return list(fields)
+
+
+def _is_row_type(data_type) -> bool:
+ if isinstance(data_type, RowType):
+ return True
+ # Lightweight test stubs may report ``.fields`` without subclassing
+ # RowType — accept those too.
+ return getattr(data_type, 'fields', None) is not None