This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fda857c01 [python] Add nested-field projection support (append-only 
paths) (#7796)
6fda857c01 is described below

commit 6fda857c01a07ae981ae003d42302b47ceb4125a
Author: chaoyang <[email protected]>
AuthorDate: Sat May 9 20:16:04 2026 +0800

    [python] Add nested-field projection support (append-only paths) (#7796)
    
    `ReadBuilder.with_projection` only accepts top-level column names today;
    dotted forms like `'struct.subfield'` are silently dropped. Reading just
    one leaf out of a struct ends up materialising the whole parent and
    discarding the rest client-side.
    
    This PR ports nested projection on **append-only** read paths.
---
 paimon-python/pypaimon/read/read_builder.py        |  98 +++++++-
 .../pypaimon/read/reader/format_avro_reader.py     |  41 +++-
 .../pypaimon/read/reader/format_pyarrow_reader.py  |  96 +++++++-
 paimon-python/pypaimon/read/split_read.py          | 108 ++++++++-
 paimon-python/pypaimon/read/table_read.py          |  32 ++-
 .../pypaimon/tests/test_nested_projection_e2e.py   | 210 ++++++++++++++++
 .../pypaimon/tests/test_projection_utility.py      | 205 ++++++++++++++++
 .../tests/test_read_builder_nested_projection.py   | 124 ++++++++++
 paimon-python/pypaimon/utils/projection.py         | 267 +++++++++++++++++++++
 9 files changed, 1148 insertions(+), 33 deletions(-)

diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
index d45a526a69..ce95115431 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -24,6 +24,7 @@ from pypaimon.read.table_read import TableRead
 from pypaimon.read.table_scan import TableScan
 from pypaimon.schema.data_types import DataField
 from pypaimon.table.special_fields import SpecialFields
+from pypaimon.utils.projection import Projection, _is_row_type
 
 
 class ReadBuilder:
@@ -34,7 +35,14 @@ class ReadBuilder:
 
         self.table: FileStoreTable = table
         self._predicate: Optional[Predicate] = None
+        # ``_projection`` is the user-facing list of names from
+        # :meth:`with_projection`. ``_nested_paths`` is the canonical
+        # integer-path form (length-1 for top level, longer for nested
+        # ROW children) used by every consumer in this module.
+        # ``with_nested_projection`` populates only the latter; the two
+        # never both hold meaningful state simultaneously.
         self._projection: Optional[List[str]] = None
+        self._nested_paths: Optional[List[List[int]]] = None
         self._limit: Optional[int] = None
 
     def with_filter(self, predicate: Predicate) -> 'ReadBuilder':
@@ -42,7 +50,19 @@ class ReadBuilder:
         return self
 
     def with_projection(self, projection: List[str]) -> 'ReadBuilder':
+        """Project to the given column names.
+
+        Names containing a dot (e.g. ``"struct.subfield"``) walk into ROW
+        children and are translated into a nested projection. Top-level-
+        only callers see the same observable behaviour as before — the
+        dotted form is opt-in. Unknown names are silently skipped to
+        preserve the pre-existing contract.
+        """
         self._projection = projection
+        if projection and any('.' in name for name in projection):
+            self._nested_paths = self._resolve_dotted_paths(projection)
+        else:
+            self._nested_paths = None
         return self
 
     def with_limit(self, limit: int) -> 'ReadBuilder':
@@ -60,19 +80,83 @@ class ReadBuilder:
         return TableRead(
             table=self.table,
             predicate=self._predicate,
-            read_type=self.read_type()
+            read_type=self.read_type(),
+            nested_name_paths=self._nested_name_paths(),
         )
 
+    def _nested_name_paths(self) -> Optional[List[List[str]]]:
+        """Resolve the current nested-projection state into a parallel list
+        of name paths against the underlying table schema. Returns ``None``
+        if the user only requested top-level projection (or no projection).
+        """
+        if not self._nested_paths:
+            return None
+        table_fields = self.table.fields
+        if self.table.options.row_tracking_enabled():
+            table_fields = 
SpecialFields.row_type_with_row_tracking(table_fields)
+        return Projection.of(self._nested_paths).to_name_paths(table_fields)
+
     def new_predicate_builder(self) -> PredicateBuilder:
         return PredicateBuilder(self.read_type())
 
     def read_type(self) -> List[DataField]:
         table_fields = self.table.fields
 
-        if not self._projection:
+        if not self._projection and not self._nested_paths:
             return table_fields
-        else:
-            if self.table.options.row_tracking_enabled():
-                table_fields = 
SpecialFields.row_type_with_row_tracking(table_fields)
-            field_map = {field.name: field for field in table_fields}
-            return [field_map[name] for name in self._projection if name in 
field_map]
+
+        if self.table.options.row_tracking_enabled():
+            table_fields = 
SpecialFields.row_type_with_row_tracking(table_fields)
+
+        if self._nested_paths:
+            return Projection.of(self._nested_paths).project(table_fields)
+
+        field_map = {field.name: field for field in table_fields}
+        return [field_map[name] for name in self._projection if name in 
field_map]
+
+    # ------------------------------------------------------------------
+    # Helpers
+    # ------------------------------------------------------------------
+
+    def _resolve_dotted_paths(self, names: List[str]) -> List[List[int]]:
+        """Translate dotted-name projection entries into integer paths
+        against the current table schema. Names without dots produce
+        length-1 paths.
+        """
+        table_fields = self.table.fields
+        if self.table.options.row_tracking_enabled():
+            table_fields = 
SpecialFields.row_type_with_row_tracking(table_fields)
+        top_index = {f.name: i for i, f in enumerate(table_fields)}
+
+        paths: List[List[int]] = []
+        for name in names:
+            if '.' not in name:
+                if name not in top_index:
+                    # Silently skip unknown names — preserves the
+                    # pre-existing contract from the plain top-level path.
+                    continue
+                paths.append([top_index[name]])
+                continue
+            parts = name.split('.')
+            top = parts[0]
+            if top not in top_index:
+                continue
+            path = [top_index[top]]
+            current_field = table_fields[path[0]]
+            ok = True
+            for part in parts[1:]:
+                if not _is_row_type(current_field.type):
+                    ok = False
+                    break
+                child_fields = current_field.type.fields
+                child_idx = next(
+                    (i for i, f in enumerate(child_fields) if f.name == part),
+                    -1)
+                if child_idx < 0:
+                    ok = False
+                    break
+                path.append(child_idx)
+                current_field = child_fields[child_idx]
+            if ok:
+                paths.append(path)
+        return paths
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py 
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 5dd2738944..1fe41468e8 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -35,7 +35,8 @@ class FormatAvroReader(RecordBatchReader):
     """
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[str], full_fields: List[DataField],
-                 push_down_predicate: Any, batch_size: int = 1024):
+                 push_down_predicate: Any, batch_size: int = 1024,
+                 nested_name_paths: Optional[List[List[str]]] = None):
         file_path_for_io = file_io.to_filesystem_path(file_path)
         self._file = file_io.filesystem.open_input_file(file_path_for_io)
         self._avro_reader = fastavro.reader(self._file)
@@ -47,13 +48,30 @@ class FormatAvroReader(RecordBatchReader):
         projected_data_fields = [full_fields_map[name] for name in read_fields]
         self._schema = 
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
 
+        # ``nested_name_paths`` is parallel to ``read_fields``. Top-level
+        # entries are length-1 paths and use the existing ``record.get``
+        # fast path; longer paths walk the record dict step-by-step. The
+        # path's first segment must be a real top-level Avro field —
+        # ``_get_fields_and_predicate`` upstream guarantees this.
+        if nested_name_paths is not None and len(nested_name_paths) != 
len(read_fields):
+            raise ValueError(
+                "nested_name_paths length {} does not match read_fields length 
{}".format(
+                    len(nested_name_paths), len(read_fields)))
+        self._nested_name_paths = nested_name_paths
+        self._has_nested = bool(
+            nested_name_paths and any(len(p) > 1 for p in nested_name_paths))
+
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         pydict_data = {name: [] for name in self._fields}
         records_in_batch = 0
 
         for record in self._avro_reader:
-            for col_name in self._fields:
-                pydict_data[col_name].append(record.get(col_name))
+            if self._has_nested:
+                for col_name, path in zip(self._fields, 
self._nested_name_paths):
+                    pydict_data[col_name].append(_walk_avro_record(record, 
path))
+            else:
+                for col_name in self._fields:
+                    pydict_data[col_name].append(record.get(col_name))
             records_in_batch += 1
             if records_in_batch >= self._batch_size:
                 break
@@ -76,3 +94,20 @@ class FormatAvroReader(RecordBatchReader):
         if self._file:
             self._file.close()
             self._file = None
+
+
+def _walk_avro_record(record, path: List[str]):
+    """Walk a list of field names through an avro record dict, returning
+    the leaf value or ``None`` if any segment is missing or hits a
+    non-dict value. ``record`` is the top-level fastavro dict; nested
+    record fields surface as nested dicts.
+    """
+    current = record
+    for name in path:
+        if current is None:
+            return None
+        if isinstance(current, dict):
+            current = current.get(name)
+            continue
+        return None
+    return current
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py 
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index ddfb368c88..0c12d27e4f 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -48,33 +48,81 @@ class FormatPyArrowReader(RecordBatchReader):
     def __init__(self, file_io: FileIO, file_format: str, file_path: str,
                  read_fields: List[DataField],
                  push_down_predicate: Any, batch_size: int = 1024,
-                 options: CoreOptions = None):
+                 options: CoreOptions = None,
+                 nested_name_paths: Optional[List[List[str]]] = None):
         file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
         self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format, 
filesystem=file_io.filesystem)
         self._file_format = file_format
         self.read_fields = read_fields
         self._read_field_names = [f.name for f in read_fields]
 
-        # Identify which fields exist in the file and which are missing
+        # ``nested_name_paths`` is parallel to ``read_fields``; when
+        # any path has length > 1 the scanner is invoked with a
+        # ``{flat_name: ds.field(*path)}`` column dict.
+        if nested_name_paths is not None and len(nested_name_paths) != 
len(read_fields):
+            raise ValueError(
+                "nested_name_paths length {} does not match read_fields length 
{}".format(
+                    len(nested_name_paths), len(read_fields)))
+        self._nested_name_paths = nested_name_paths
+        has_nested_path = bool(
+            nested_name_paths and any(len(p) > 1 for p in nested_name_paths))
+
+        # Identify which fields exist in the file and which are missing.
+        # For nested projection, "exists" is determined by walking the
+        # whole path against the file schema; sub-field schema evolution
+        # (a leaf renamed or removed) shows up as ``missing`` and is
+        # served as a NULL column, mirroring the top-level handling.
         file_schema = self.dataset.schema
-        file_schema_names = set(file_schema.names)
-        self.existing_fields = [f.name for f in read_fields if f.name in 
file_schema_names]
-        self.missing_fields = [f.name for f in read_fields if f.name not in 
file_schema_names]
-
-        # column name → VariantSchema for shredded columns that need assembly
+        if has_nested_path:
+            self.existing_fields = []
+            self.missing_fields = []
+            for f, path in zip(read_fields, nested_name_paths):
+                if _path_exists_in_arrow_schema(file_schema, path):
+                    self.existing_fields.append(f.name)
+                else:
+                    self.missing_fields.append(f.name)
+        else:
+            file_schema_names = set(file_schema.names)
+            self.existing_fields = [f.name for f in read_fields if f.name in 
file_schema_names]
+            self.missing_fields = [f.name for f in read_fields if f.name not 
in file_schema_names]
+
+        # column name → VariantSchema for shredded columns that need assembly.
+        # In nested mode we still want to reassemble shredded VARIANTs
+        # that were projected at the top level — only the columns actually
+        # reached via a length>1 path are skipped (those are sub-fields of
+        # some other struct, not VARIANTs themselves).
         self._shredded_schemas: Dict[str, VariantSchema] = {}
         if options is None or options.variant_shredding_enabled():
+            top_level_names = set(file_schema.names)
             for name in self.existing_fields:
+                if name not in top_level_names:
+                    continue
                 field_type = file_schema.field(name).type
                 if is_shredded_variant(field_type):
                     self._shredded_schemas[name] = 
build_variant_schema(field_type)
 
-        # Only pass existing fields to PyArrow scanner to avoid errors
-        self.reader = self.dataset.scanner(
-            columns=self.existing_fields,
-            filter=push_down_predicate,
-            batch_size=batch_size
-        ).to_reader()
+        if has_nested_path:
+            # Dict-form columns let PyArrow read leaf fields out of nested
+            # structs without materialising the parent. The dict keys
+            # become the output column names — they're already flattened
+            # to ``a_b`` form by the upstream projection utility.
+            existing_set = set(self.existing_fields)
+            columns_dict = {}
+            for f, path in zip(read_fields, nested_name_paths):
+                if f.name in existing_set:
+                    columns_dict[f.name] = ds.field(*path)
+            self.reader = self.dataset.scanner(
+                columns=columns_dict,
+                filter=push_down_predicate,
+                batch_size=batch_size
+            ).to_reader()
+        else:
+            # Only pass existing fields to PyArrow scanner to avoid errors
+            self.reader = self.dataset.scanner(
+                columns=self.existing_fields,
+                filter=push_down_predicate,
+                batch_size=batch_size
+            ).to_reader()
 
         self._output_schema = (
             PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields 
else None
@@ -169,3 +217,25 @@ class FormatPyArrowReader(RecordBatchReader):
     def close(self):
         if self.reader is not None:
             self.reader = None
+
+
+def _path_exists_in_arrow_schema(schema: pa.Schema, path: List[str]) -> bool:
+    """Walk ``path`` (a list of field names) through a PyArrow schema and
+    return whether every step exists. The first step is a top-level field
+    name; subsequent steps are struct child names. Missing leaves at any
+    depth (e.g. a renamed sub-field) yield ``False`` so the caller can
+    fall back to a NULL column instead of raising during scan setup.
+    """
+    if not path:
+        return False
+    if path[0] not in schema.names:
+        return False
+    current_type = schema.field(path[0]).type
+    for name in path[1:]:
+        if not pa.types.is_struct(current_type):
+            return False
+        idx = current_type.get_field_index(name)
+        if idx < 0:
+            return False
+        current_type = current_type.field(idx).type
+    return True
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index c88f49f3b0..b68d2f9e36 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
 import os
 from abc import ABC, abstractmethod
 from functools import partial
-from typing import Callable, List, Optional, Tuple
+from typing import Callable, Dict, List, Optional, Tuple
 
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
@@ -89,7 +89,8 @@ class SplitRead(ABC):
             predicate: Optional[Predicate],
             read_type: List[DataField],
             split: Split,
-            row_tracking_enabled: bool):
+            row_tracking_enabled: bool,
+            nested_name_paths: Optional[List[List[str]]] = None):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -98,6 +99,9 @@ class SplitRead(ABC):
         self.split = split
         self.row_tracking_enabled = row_tracking_enabled
         self.value_arity = len(read_type)
+        # Parallel to ``read_type``; each entry is the original-schema
+        # name path. ``None`` when no projection or top-level only.
+        self.nested_name_paths = nested_name_paths
 
         self.trimmed_primary_key = self.table.trimmed_primary_keys
         self.read_fields = read_type
@@ -127,6 +131,21 @@ class SplitRead(ABC):
         else:
             self.predicate_for_reader = None
 
+    def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
+        """``{flat_name: original_path}`` for the rows of ``read_type``
+        that go through a nested path (``len > 1``). ``None`` when no
+        such path exists, so callers stay on the top-level fast path.
+        """
+        if not self.nested_name_paths:
+            return None
+        if not any(len(p) > 1 for p in self.nested_name_paths):
+            return None
+        out: Dict[str, List[str]] = {}
+        for f, path in zip(self.read_fields[:self.value_arity],
+                           self.nested_name_paths):
+            out[f.name] = path
+        return out
+
     def _push_down_predicate(self) -> Optional[Predicate]:
         if self.predicate is None:
             return None
@@ -168,22 +187,46 @@ class SplitRead(ABC):
                     end = r.to - file.first_row_id
                     row_indices.extend(range(start, end + 1))
 
+        # Map the parallel ``self.nested_name_paths`` (aligned with
+        # ``self.read_fields``) into the same order the format reader
+        # will see, indexed by output column name. Set to ``None`` when
+        # no path has length > 1 so the reader stays on its top-level
+        # fast path.
+        nested_path_by_name = self._nested_path_by_name()
+        has_nested = nested_path_by_name is not None
+
         format_reader: RecordBatchReader
         if file_format == CoreOptions.FILE_FORMAT_AVRO:
+            avro_nested_paths = (
+                [nested_path_by_name[name] for name in read_file_fields]
+                if has_nested else None
+            )
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_file_fields,
-                                             self.read_fields, 
read_arrow_predicate, batch_size=batch_size)
+                                             self.read_fields, 
read_arrow_predicate, batch_size=batch_size,
+                                             
nested_name_paths=avro_nested_paths)
         elif file_format == CoreOptions.FILE_FORMAT_BLOB:
+            if has_nested:
+                raise NotImplementedError(
+                    "Nested-field projection is not supported on BLOB files")
             blob_as_descriptor = 
CoreOptions.blob_as_descriptor(self.table.options)
             format_reader = FormatBlobReader(self.table.file_io, file_path, 
read_file_fields,
                                              self.read_fields, 
read_arrow_predicate, blob_as_descriptor,
                                              batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_LANCE:
+            if has_nested:
+                # Lance has no nested column pruning today; project the
+                # parent struct in full and extract sub-fields client-side.
+                raise NotImplementedError(
+                    "Nested-field projection is not supported on Lance files")
             name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
             format_reader = FormatLanceReader(self.table.file_io, file_path, 
ordered_read_fields,
                                               read_arrow_predicate, 
batch_size=batch_size,
                                               row_range=row_range, 
row_indices=row_indices)
         elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
+            if has_nested:
+                raise NotImplementedError(
+                    "Nested-field projection is not supported on Vortex files")
             name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
             predicate_fields = _get_all_fields(self.push_down_predicate) if 
self.push_down_predicate else set()
@@ -194,10 +237,15 @@ class SplitRead(ABC):
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
+            ordered_nested_paths = (
+                [nested_path_by_name[f.name] for f in ordered_read_fields]
+                if has_nested else None
+            )
             format_reader = FormatPyArrowReader(
                 self.table.file_io, file_format, file_path,
                 ordered_read_fields, read_arrow_predicate, 
batch_size=batch_size,
-                options=self.table.options)
+                options=self.table.options,
+                nested_name_paths=ordered_nested_paths)
         elif file_format in ('json', 'csv'):
             raise NotImplementedError(
                 f"Reading '{file_format}' format is not yet supported in 
Python SDK. "
@@ -251,6 +299,10 @@ class SplitRead(ABC):
         return reader
 
     def _get_fields_and_predicate(self, schema_id: int, read_fields):
+        # In nested mode the flat name (``mv_latest_version``) never
+        # appears in the file schema; reachability is decided by the
+        # path's top-level entry instead.
+        nested_path_by_name = self._nested_path_by_name()
         key = (schema_id, tuple(read_fields))
         if key not in self.schema_id_2_fields:
             schema = self.table.schema_manager.get_schema(schema_id)
@@ -262,7 +314,20 @@ class SplitRead(ABC):
             if self.table.is_primary_key_table:
                 schema_field_names.add('_SEQUENCE_NUMBER')
                 schema_field_names.add('_VALUE_KIND')
-            read_file_fields = [read_field for read_field in read_fields if 
read_field in schema_field_names]
+
+            def _is_reachable(name: str) -> bool:
+                if name in schema_field_names:
+                    return True
+                if nested_path_by_name is not None:
+                    path = nested_path_by_name.get(name)
+                    if path:
+                        return path[0] in schema_field_names
+                return False
+
+            read_file_fields = [
+                read_field for read_field in read_fields
+                if _is_reachable(read_field)
+            ]
             read_predicate = 
trim_predicate_by_fields(self.push_down_predicate, read_file_fields)
             read_arrow_predicate = read_predicate.to_arrow() if read_predicate 
else None
             self.schema_id_2_fields[key] = (read_file_fields, 
read_arrow_predicate)
@@ -300,6 +365,10 @@ class SplitRead(ABC):
         return all_data_fields
 
     def create_index_mapping(self):
+        if self._nested_path_by_name() is not None:
+            # Format reader already emits flat columns aligned with
+            # ``read_fields``; the id-based remap can't see leaf IDs.
+            return None
         base_index_mapping = self._create_base_index_mapping(self.read_fields, 
self._get_read_data_fields())
         trimmed_key_mapping, _ = 
self._get_trimmed_fields(self._get_read_data_fields(), 
self._get_all_data_fields())
         if base_index_mapping is None:
@@ -341,6 +410,10 @@ class SplitRead(ABC):
         return None
 
     def _get_final_read_data_fields(self) -> List[str]:
+        if self._nested_path_by_name() is not None:
+            # Trimmed-fields filters by ID and drops nested leaves;
+            # hand the format reader the user-facing flat names directly.
+            return self._remove_partition_fields(list(self.read_fields))
         _, trimmed_fields = self._get_trimmed_fields(
             self._get_read_data_fields(), self._get_all_data_fields()
         )
@@ -395,6 +468,23 @@ class SplitRead(ABC):
         return PartitionInfo(partition_mapping, self.split.partition)
 
     def _construct_partition_mapping(self) -> List[int]:
+        if self._nested_path_by_name() is not None:
+            # Nested fields carry leaf IDs that don't match top-level
+            # data-field IDs, so the trimmed-fields machinery can't see
+            # them. Build the mapping directly from ``self.read_fields``:
+            # entries whose flat name equals a partition key get a
+            # negative index, the rest get a sequential read index.
+            partition_names = self.table.partition_keys
+            mapping = [0] * (len(self.read_fields) + 1)
+            p_count = 0
+            for i, field in enumerate(self.read_fields):
+                if field.name in partition_names:
+                    partition_index = partition_names.index(field.name)
+                    mapping[i] = -(partition_index + 1)
+                    p_count += 1
+                else:
+                    mapping[i] = (i - p_count) + 1
+            return mapping
         _, trimmed_fields = self._get_trimmed_fields(
             self._get_read_data_fields(), self._get_all_data_fields()
         )
@@ -537,13 +627,17 @@ class DataEvolutionSplitRead(SplitRead):
             predicate: Optional[Predicate],
             read_type: List[DataField],
             split: Split,
-            row_tracking_enabled: bool):
+            row_tracking_enabled: bool,
+            nested_name_paths: Optional[List[List[str]]] = None):
         self.row_ranges = None
         actual_split = split
         if isinstance(split, IndexedSplit):
             self.row_ranges = split.row_ranges()
             actual_split = split.data_split()
-        super().__init__(table, predicate, read_type, actual_split, 
row_tracking_enabled)
+        super().__init__(
+            table, predicate, read_type, actual_split, row_tracking_enabled,
+            nested_name_paths=nested_name_paths,
+        )
 
     def _push_down_predicate(self) -> Optional[Predicate]:
         # Data evolution: files may have different schemas, so we don't push 
predicate
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 40cc337aaa..4f53e49126 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -40,7 +40,8 @@ class TableRead:
         table,
         predicate: Optional[Predicate],
         read_type: List[DataField],
-        include_row_kind: bool = False
+        include_row_kind: bool = False,
+        nested_name_paths: Optional[List[List[str]]] = None,
     ):
         from pypaimon.table.file_store_table import FileStoreTable
 
@@ -48,6 +49,10 @@ class TableRead:
         self.predicate = predicate
         self.read_type = read_type
         self.include_row_kind = include_row_kind
+        # Parallel to ``read_type``; each entry is the original-schema
+        # name path for the field. ``None`` when no projection (or only
+        # top-level projection) is set.
+        self.nested_name_paths = nested_name_paths
 
     def to_iterator(self, splits: List[Split]) -> Iterator:
         def _record_generator():
@@ -268,6 +273,15 @@ class TableRead:
 
     def _create_split_read(self, split: Split) -> SplitRead:
         if self.table.is_primary_key_table and not split.raw_convertible:
+            if self.nested_name_paths and any(
+                    len(p) > 1 for p in self.nested_name_paths):
+                # The merge function needs full parent structs; outer
+                # projection that walks the path to recover leaves is a
+                # separate change. Project the parent struct and extract
+                # client-side until then.
+                raise NotImplementedError(
+                    "Nested-field projection on primary-key tables that "
+                    "require a merge read is not yet supported")
             return MergeFileSplitRead(
                 table=self.table,
                 predicate=self.predicate,
@@ -276,12 +290,23 @@ class TableRead:
                 row_tracking_enabled=False
             )
         elif self.table.options.data_evolution_enabled():
+            if self.nested_name_paths and any(
+                    len(p) > 1 for p in self.nested_name_paths):
+                # Multi-file union for data-evolution tables matches files
+                # by top-level field ID; a nested ``read_field`` carries
+                # its leaf ID, which never matches and would silently
+                # produce all-NULL columns. Refuse loudly until the
+                # union path is taught to walk paths.
+                raise NotImplementedError(
+                    "Nested-field projection on data-evolution tables is "
+                    "not yet supported")
             return DataEvolutionSplitRead(
                 table=self.table,
                 predicate=self.predicate,
                 read_type=self.read_type,
                 split=split,
-                row_tracking_enabled=True
+                row_tracking_enabled=True,
+                nested_name_paths=self.nested_name_paths,
             )
         else:
             return RawFileSplitRead(
@@ -289,7 +314,8 @@ class TableRead:
                 predicate=self.predicate,
                 read_type=self.read_type,
                 split=split,
-                row_tracking_enabled=self.table.options.row_tracking_enabled()
+                row_tracking_enabled=self.table.options.row_tracking_enabled(),
+                nested_name_paths=self.nested_name_paths,
             )
 
     @staticmethod
diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py 
b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
new file mode 100644
index 0000000000..87ab3935f4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py
@@ -0,0 +1,210 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class _AppendOnlyNestedBase(unittest.TestCase):
+    """Append-only table whose ``mv`` column is a nested struct, used to
+    exercise file-level Parquet/ORC pushdown of nested projection."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', False)
+
+        cls.pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('mv', pa.struct([
+                ('latest_version', pa.int64()),
+                ('latest_value', pa.string()),
+            ])),
+            ('val', pa.string()),
+        ])
+        cls.rows = [
+            {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 
'val': 'x'},
+            {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b'}, 
'val': 'y'},
+            {'id': 3, 'mv': {'latest_version': 300, 'latest_value': 'c'}, 
'val': 'z'},
+        ]
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_table(self, name: str, file_format: str = 'parquet'):
+        identifier = 'default.{}'.format(name)
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            options={'bucket': '-1', 'file.format': file_format},
+        )
+        self.catalog.create_table(identifier, schema, False)
+        table = self.catalog.get_table(identifier)
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        w.write_arrow(pa.Table.from_pylist(self.rows, schema=self.pa_schema))
+        wb.new_commit().commit(w.prepare_commit())
+        w.close()
+        return table
+
+
+class AppendOnlyNestedParquetTest(_AppendOnlyNestedBase):
+    """Parquet path uses PyArrow's dict-form scanner with ``ds.field(*path)``
+    to push the nested column read into the engine."""
+
+    def test_dotted_name_returns_just_the_leaf(self):
+        table = self._create_table('ao_dotted_leaf')
+        rb = table.new_read_builder().with_projection(['mv.latest_version'])
+        got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+        self.assertEqual(
+            got,
+            [{'mv_latest_version': 100},
+             {'mv_latest_version': 200},
+             {'mv_latest_version': 300}])
+
+    def test_mixed_nested_and_top_level_preserves_order(self):
+        table = self._create_table('ao_mixed_order')
+        rb = table.new_read_builder().with_projection(
+            ['mv.latest_version', 'val', 'mv.latest_value'])
+        got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+        self.assertEqual(
+            got,
+            [{'mv_latest_version': 100, 'val': 'x', 'mv_latest_value': 'a'},
+             {'mv_latest_version': 200, 'val': 'y', 'mv_latest_value': 'b'},
+             {'mv_latest_version': 300, 'val': 'z', 'mv_latest_value': 'c'}])
+
+    def test_top_level_only_projection_unchanged(self):
+        """A projection without dots must keep the existing top-level
+        path — file-level pushdown still asks for plain column names,
+        no dict-form scanner."""
+        table = self._create_table('ao_top_level_unchanged')
+        rb = table.new_read_builder().with_projection(['val', 'id'])
+        got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+        self.assertEqual(
+            got,
+            [{'val': 'x', 'id': 1},
+             {'val': 'y', 'id': 2},
+             {'val': 'z', 'id': 3}])
+
+    def test_partitioned_table_with_nested_projection(self):
+        """Partition-aware reads have a separate path-mapping helper from
+        the non-partitioned fast path; regress the case where it dropped
+        non-nested top-level columns alongside the projected leaf."""
+        identifier = 'default.ao_partitioned'
+        pa_schema = pa.schema([
+            ('part', pa.string()),
+            ('id', pa.int64()),
+            ('mv', pa.struct([
+                ('latest_version', pa.int64()),
+                ('latest_value', pa.string()),
+            ])),
+            ('val', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=['part'],
+            options={'bucket': '-1', 'file.format': 'parquet'},
+        )
+        self.catalog.create_table(identifier, schema, False)
+        table = self.catalog.get_table(identifier)
+        wb = table.new_batch_write_builder()
+        w = wb.new_write()
+        w.write_arrow(pa.Table.from_pylist([
+            {'part': 'A', 'id': 1, 'mv': {'latest_version': 100, 
'latest_value': 'a'}, 'val': 'x'},
+            {'part': 'B', 'id': 2, 'mv': {'latest_version': 200, 
'latest_value': 'b'}, 'val': 'y'},
+        ], schema=pa_schema))
+        wb.new_commit().commit(w.prepare_commit())
+        w.close()
+
+        # Mixed projection: nested leaf, a non-partition top-level column,
+        # and the partition column itself.
+        rb = table.new_read_builder().with_projection(
+            ['part', 'mv.latest_version', 'val'])
+        got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+        got_sorted = sorted(got, key=lambda r: r['part'])
+        self.assertEqual(
+            got_sorted,
+            [{'part': 'A', 'mv_latest_version': 100, 'val': 'x'},
+             {'part': 'B', 'mv_latest_version': 200, 'val': 'y'}])
+
+    def test_avro_nested_projection_python_fallback(self):
+        """Avro has no native nested column pruning; the reader walks
+        each fastavro record dict by path and assembles the column
+        client-side."""
+        table = self._create_table('ao_avro_nested', file_format='avro')
+        rb = table.new_read_builder().with_projection(['mv.latest_version', 
'val'])
+        got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+        self.assertEqual(
+            got,
+            [{'mv_latest_version': 100, 'val': 'x'},
+             {'mv_latest_version': 200, 'val': 'y'},
+             {'mv_latest_version': 300, 'val': 'z'}])
+
+    def test_avro_top_level_projection_unchanged(self):
+        """Top-level-only projection on Avro stays on the existing
+        ``record.get(name)`` fast path."""
+        table = self._create_table('ao_avro_top', file_format='avro')
+        rb = table.new_read_builder().with_projection(['val', 'id'])
+        got = rb.new_read().to_arrow(rb.new_scan().plan().splits()).to_pylist()
+        self.assertEqual(
+            got,
+            [{'val': 'x', 'id': 1},
+             {'val': 'y', 'id': 2},
+             {'val': 'z', 'id': 3}])
+
+    def test_pk_table_merge_split_with_nested_projection_raises(self):
+        # Phase 2b lands the append-only path only; PK + nested needs an
+        # outer-projection wrapper that ships in a follow-up commit. Until
+        # then, the call must refuse loudly rather than silently corrupt
+        # the merge function input. Two commits on the same PK force the
+        # split out of the raw-convertible fast path into the merge
+        # reader, which is where the guard lives.
+        identifier = 'default.pk_nested_unsupported'
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            primary_keys=['id'],
+            options={'bucket': '1', 'file.format': 'parquet'},
+        )
+        self.catalog.create_table(identifier, schema, False)
+        table = self.catalog.get_table(identifier)
+        for batch in (self.rows, self.rows):  # two overlapping commits
+            wb = table.new_batch_write_builder()
+            w = wb.new_write()
+            w.write_arrow(pa.Table.from_pylist(batch, schema=self.pa_schema))
+            wb.new_commit().commit(w.prepare_commit())
+            w.close()
+
+        rb = table.new_read_builder().with_projection(['mv.latest_version'])
+        splits = rb.new_scan().plan().splits()
+        # ``to_arrow`` materialises the split read; the merge path is what
+        # raises, so do it eagerly here rather than waiting for the first
+        # batch.
+        with self.assertRaises(NotImplementedError):
+            rb.new_read().to_arrow(splits)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/test_projection_utility.py 
b/paimon-python/pypaimon/tests/test_projection_utility.py
new file mode 100644
index 0000000000..4d1c43eb16
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_projection_utility.py
@@ -0,0 +1,205 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import unittest
+
+from pypaimon.schema.data_types import AtomicType, DataField, RowType
+from pypaimon.utils.projection import (NestedProjection, Projection,
+                                       TopLevelProjection)
+
+
+def _atomic(idx: int, name: str, type_name: str = 'BIGINT') -> DataField:
+    return DataField(idx, name, AtomicType(type_name))
+
+
+def _struct(idx: int, name: str, sub_fields) -> DataField:
+    return DataField(idx, name, RowType(False, list(sub_fields)))
+
+
+def _three_top_fields():
+    """schema: [pk: BIGINT, mv: ROW<latest_version BIGINT, latest_value 
STRING>, val: STRING]"""
+    return [
+        _atomic(1, 'pk'),
+        _struct(2, 'mv', [
+            _atomic(10, 'latest_version'),
+            _atomic(11, 'latest_value', 'STRING'),
+        ]),
+        _atomic(3, 'val', 'STRING'),
+    ]
+
+
+class TopLevelProjectionTest(unittest.TestCase):
+
+    def test_factory_produces_top_level(self):
+        p = Projection.of([2, 0])
+        self.assertIsInstance(p, TopLevelProjection)
+        self.assertFalse(p.is_nested())
+
+    def test_indexes_round_trip(self):
+        p = Projection.of([2, 0, 1])
+        self.assertEqual(p.to_top_level_indexes(), [2, 0, 1])
+        # nested form lifts each top-level index into a singleton path
+        self.assertEqual(p.to_nested_indexes(), [[2], [0], [1]])
+
+    def test_project_picks_fields_in_order(self):
+        fields = _three_top_fields()
+        res = Projection.of([2, 0]).project(fields)
+        self.assertEqual([f.name for f in res], ['val', 'pk'])
+        self.assertEqual([f.id for f in res], [3, 1])
+
+    def test_to_name_paths(self):
+        fields = _three_top_fields()
+        names = Projection.of([2, 0]).to_name_paths(fields)
+        self.assertEqual(names, [['val'], ['pk']])
+
+    def test_range_factory(self):
+        p = Projection.range(1, 4)
+        self.assertEqual(p.to_top_level_indexes(), [1, 2, 3])
+
+    def test_range_zero_or_negative_returns_empty(self):
+        self.assertFalse(Projection.range(2, 2).is_nested())
+        self.assertEqual(Projection.range(2, 2).to_top_level_indexes(), [])
+        self.assertEqual(Projection.range(5, 1).to_top_level_indexes(), [])
+
+
+class NestedProjectionTest(unittest.TestCase):
+
+    def test_factory_produces_nested(self):
+        p = Projection.of([[1, 0], [1, 1]])
+        self.assertIsInstance(p, NestedProjection)
+        self.assertTrue(p.is_nested())
+
+    def test_singleton_paths_reported_not_nested(self):
+        # paths of length 1 only — observable behaviour matches top level.
+        p = Projection.of([[2], [0]])
+        self.assertIsInstance(p, NestedProjection)
+        self.assertFalse(p.is_nested())
+
+    def test_top_level_indexes_dedup_in_path_order(self):
+        p = Projection.of([[1, 0], [1, 1], [0]])
+        self.assertEqual(p.to_top_level_indexes(), [1, 0])
+
+    def test_nested_indexes_round_trip(self):
+        p = Projection.of([[1, 0], [1, 1]])
+        self.assertEqual(p.to_nested_indexes(), [[1, 0], [1, 1]])
+
+    def test_to_name_paths_walks_into_struct(self):
+        fields = _three_top_fields()
+        names = Projection.of([[1, 0], [1, 1], [0]]).to_name_paths(fields)
+        self.assertEqual(
+            names,
+            [['mv', 'latest_version'], ['mv', 'latest_value'], ['pk']])
+
+    def test_project_flattens_with_underscore_join(self):
+        fields = _three_top_fields()
+        res = Projection.of([[1, 0], [1, 1], [0]]).project(fields)
+        self.assertEqual(
+            [f.name for f in res], ['mv_latest_version', 'mv_latest_value', 
'pk'])
+
+    def test_project_preserves_leaf_field_id(self):
+        # Schema-evolution remapping is by field ID, so flattened nested
+        # fields must inherit the leaf's ID — not the parent struct's.
+        fields = _three_top_fields()
+        res = Projection.of([[1, 0], [1, 1]]).project(fields)
+        self.assertEqual([f.id for f in res], [10, 11])
+
+    def test_collision_dedup_via_dollar_suffix(self):
+        # Two leaves under different parents with the same final name.
+        sub_a = _atomic(20, 'x')
+        sub_b = _atomic(21, 'x')
+        fields = [
+            _struct(1, 'a', [sub_a]),
+            _struct(2, 'b', [sub_b]),
+        ]
+        # path [0,0] -> 'a_x', path [1,0] -> 'b_x' (no collision yet).
+        res = Projection.of([[0, 0], [1, 0]]).project(fields)
+        self.assertEqual([f.name for f in res], ['a_x', 'b_x'])
+
+        # When two collapse to the SAME name, the second gets `_$N`.
+        # Build two parents whose leaves have the same compound name.
+        sub_x_only = _atomic(30, 'x')
+        fields2 = [
+            _struct(1, 'a', [sub_x_only]),
+            _atomic(2, 'a_x'),  # plain top-level already named 'a_x'.
+        ]
+        res2 = Projection.of([[0, 0], [1]]).project(fields2)
+        # First path produces 'a_x'; second is already-existing 'a_x'.
+        # Collision → suffix on the second.
+        self.assertEqual(res2[0].name, 'a_x')
+        self.assertTrue(res2[1].name.startswith('a_x_$'))
+
+    def test_project_rejects_non_row_step(self):
+        # Trying to walk into an atomic field must fail loudly.
+        fields = _three_top_fields()
+        with self.assertRaises(ValueError):
+            Projection.of([[0, 0]]).project(fields)
+
+    def test_constructor_rejects_empty_paths_list(self):
+        with self.assertRaises(ValueError):
+            NestedProjection([])
+
+    def test_constructor_rejects_zero_length_path(self):
+        with self.assertRaises(ValueError):
+            NestedProjection([[]])
+
+    def test_dup_count_is_monotonic_across_distinct_collisions(self):
+        # ``_$N`` is a per-call monotonic counter — distinct collisions
+        # share the suffix space, they don't each restart at 0.
+        sub_x_1 = _atomic(20, 'x')
+        sub_x_2 = _atomic(21, 'x')
+        sub_y_1 = _atomic(30, 'y')
+        sub_y_2 = _atomic(31, 'y')
+        fields = [
+            _atomic(1, 'a_x'),
+            _struct(2, 'a', [sub_x_1]),
+            _atomic(3, 'a_y'),
+            _struct(4, 'a', [sub_y_1]),  # collides via path [3, 0] → a_y
+            _struct(5, 'a', [sub_x_2, sub_y_2]),  # second a.x collision
+        ]
+        # Order: a_x (top) → keeps; [1, 0] flatten → a_x (collision) → a_x_$0
+        # a_y (top) → keeps; [3, 0] flatten → a_y (collision) → a_y_$1
+        res = Projection.of(
+            [[0], [1, 0], [2], [3, 0]]).project(fields)
+        self.assertEqual(
+            [f.name for f in res], ['a_x', 'a_x_$0', 'a_y', 'a_y_$1'])
+
+    def test_of_rejects_mixed_int_and_path(self):
+        # Mixing top-level indexes and nested paths is a programming error;
+        # ``of`` should fail loudly at the call site instead of producing a
+        # broken projection that explodes downstream.
+        with self.assertRaises(TypeError):
+            Projection.of([1, [2, 3]])
+        with self.assertRaises(TypeError):
+            Projection.of([[1, 2], 3])
+
+
+class EmptyProjectionTest(unittest.TestCase):
+
+    def test_of_empty(self):
+        self.assertEqual(Projection.of([]).to_top_level_indexes(), [])
+        self.assertEqual(Projection.of([]).to_nested_indexes(), [])
+
+    def test_empty_factory(self):
+        p = Projection.empty()
+        self.assertEqual(p.project(_three_top_fields()), [])
+        self.assertFalse(p.is_nested())
+        self.assertEqual(p.to_name_paths(_three_top_fields()), [])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git 
a/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py 
b/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py
new file mode 100644
index 0000000000..a781f149fa
--- /dev/null
+++ b/paimon-python/pypaimon/tests/test_read_builder_nested_projection.py
@@ -0,0 +1,124 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+
+
+class _ReadBuilderTestBase(unittest.TestCase):
+    """Build a primary-key table whose value column is a ROW so we can
+    exercise both top-level and nested projection paths against the
+    actual ``ReadBuilder`` API."""
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', False)
+
+        struct_type = pa.struct([
+            ('latest_version', pa.int64()),
+            ('latest_value', pa.string()),
+        ])
+        cls.pa_schema = pa.schema([
+            pa.field('pk', pa.int64(), nullable=False),
+            ('mv', struct_type),
+            ('val', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            cls.pa_schema, primary_keys=['pk'],
+            options={'bucket': '1', 'file.format': 'parquet'})
+        cls.catalog.create_table('default.rb_nested', schema, False)
+        cls.table = cls.catalog.get_table('default.rb_nested')
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+
+class ReadBuilderProjectionStateTest(_ReadBuilderTestBase):
+
+    def test_no_projection_returns_full_schema(self):
+        rb = self.table.new_read_builder()
+        fields = rb.read_type()
+        names = [f.name for f in fields]
+        self.assertEqual(names, ['pk', 'mv', 'val'])
+        # Without an explicit projection the read_type must NOT inject
+        # row-tracking system columns; the raw table fields are returned
+        # verbatim.
+        self.assertNotIn('_ROW_ID', names)
+        self.assertNotIn('_SEQUENCE_NUMBER', names)
+
+    def test_top_level_projection_unchanged(self):
+        rb = self.table.new_read_builder().with_projection(['val', 'pk'])
+        names = [f.name for f in rb.read_type()]
+        self.assertEqual(names, ['val', 'pk'])
+        # No nested paths derived; only names are stored.
+        self.assertIsNone(rb._nested_paths)
+
+    def test_dotted_name_resolves_to_nested_path(self):
+        rb = self.table.new_read_builder().with_projection(
+            ['mv.latest_version', 'pk'])
+        # _nested_paths is populated; user-facing names are kept on _projection
+        self.assertIsNotNone(rb._nested_paths)
+        self.assertEqual(rb._nested_paths, [[1, 0], [0]])
+        names = [f.name for f in rb.read_type()]
+        # Nested leaves get flattened to underscore-joined names.
+        self.assertEqual(names, ['mv_latest_version', 'pk'])
+
+    def test_dotted_name_unknown_top_silently_skipped(self):
+        rb = self.table.new_read_builder().with_projection(
+            ['nope.x', 'val'])
+        # Only 'val' resolved; the dot trigger still populates _nested_paths.
+        self.assertEqual(rb._nested_paths, [[2]])
+        names = [f.name for f in rb.read_type()]
+        self.assertEqual(names, ['val'])
+
+    def test_dotted_name_unknown_subfield_silently_skipped(self):
+        rb = self.table.new_read_builder().with_projection(
+            ['mv.no_such_subfield', 'pk'])
+        # The bad path drops out, the plain name survives.
+        self.assertEqual(rb._nested_paths, [[0]])
+        names = [f.name for f in rb.read_type()]
+        self.assertEqual(names, ['pk'])
+
+
+class ReadBuilderProjectionFieldIdTest(_ReadBuilderTestBase):
+
+    def test_nested_leaves_inherit_leaf_field_id(self):
+        rb = self.table.new_read_builder().with_projection(
+            ['mv.latest_version', 'mv.latest_value'])
+        leaf_ids = [f.id for f in rb.read_type()]
+        # Look up the actual leaf IDs from the table schema for assertion
+        mv_field = next(f for f in self.table.fields if f.name == 'mv')
+        sub_v = next(f for f in mv_field.type.fields
+                     if f.name == 'latest_version')
+        sub_x = next(f for f in mv_field.type.fields
+                     if f.name == 'latest_value')
+        self.assertEqual(leaf_ids, [sub_v.id, sub_x.id])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/utils/projection.py 
b/paimon-python/pypaimon/utils/projection.py
new file mode 100644
index 0000000000..c87da7ac2c
--- /dev/null
+++ b/paimon-python/pypaimon/utils/projection.py
@@ -0,0 +1,267 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Column projection utilities.
+
+A projection maps a source row type to a flat list of ``DataField``: the
+columns the user wants to read. Two flavours:
+
+* :class:`TopLevelProjection` selects fields by their top-level index.
+* :class:`NestedProjection` accepts paths that walk into ROW children, e.g.
+  ``[[1, 0], [1, 2]]`` means "the 0th and 2nd children of the field at top
+  level index 1". The result is flattened into top-level fields whose
+  names are the underscore-joined original path (``a_b`` for ``a.b``,
+  with a ``_$N`` suffix on collisions) and whose IDs are inherited from
+  the leaf so schema-evolution remapping by field ID still works.
+"""
+
+from abc import ABC, abstractmethod
+from typing import List, Optional, Sequence
+
+from pypaimon.schema.data_types import DataField, RowType
+
+
+class Projection(ABC):
+    """Abstract base for column projection."""
+
+    @abstractmethod
+    def project(self, row_type) -> List[DataField]:
+        """Apply the projection and return the resulting flat fields."""
+
+    @abstractmethod
+    def is_nested(self) -> bool:
+        """Whether any path goes deeper than the top level."""
+
+    @abstractmethod
+    def to_top_level_indexes(self) -> List[int]:
+        """Top-level positions touched by this projection.
+
+        For nested projections, returns unique top-level indexes in path
+        order. Useful for fallback paths that can only push down at the
+        top level.
+        """
+
+    @abstractmethod
+    def to_nested_indexes(self) -> List[List[int]]:
+        """Return the projection as a list of paths, one per output field."""
+
+    @abstractmethod
+    def to_name_paths(self, row_type) -> List[List[str]]:
+        """Translate integer paths to field-name paths against ``row_type``.
+
+        For a path ``[1, 0]`` against a row whose top-level field at index 1
+        is a struct ``mv_col`` with sub-fields ``[LATEST_VERSION, ...]``,
+        returns ``[["mv_col", "LATEST_VERSION"], ...]``. Used by format
+        readers to push nested projection down to the underlying engine
+        (e.g. PyArrow's ``ds.field(*name_path)``).
+        """
+
+    # ------------------------------------------------------------------
+    # Factories
+    # ------------------------------------------------------------------
+
+    @staticmethod
+    def empty() -> "Projection":
+        """The empty projection: no columns selected."""
+        return _EmptyProjection()
+
+    @staticmethod
+    def of(indexes_or_paths) -> "Projection":
+        """Build a projection from either ``int[]`` or ``int[][]``.
+
+        Empty input returns :func:`empty`. The input must be uniformly
+        shaped — either all integers or all sequences of integers; mixing
+        the two raises ``TypeError`` so the failure is reported at the
+        ``of`` call site rather than as an opaque error deep in
+        ``project``.
+        """
+        if not indexes_or_paths:
+            return _EmptyProjection()
+        first_is_path = isinstance(indexes_or_paths[0], (list, tuple))
+        for entry in indexes_or_paths[1:]:
+            entry_is_path = isinstance(entry, (list, tuple))
+            if entry_is_path != first_is_path:
+                raise TypeError(
+                    "Projection.of expects either all top-level indexes "
+                    "or all nested paths; got a mix")
+        if first_is_path:
+            return NestedProjection([list(p) for p in indexes_or_paths])
+        return TopLevelProjection(list(indexes_or_paths))
+
+    @staticmethod
+    def range(start_inclusive: int, end_exclusive: int) -> "Projection":
+        """Top-level projection over a contiguous index range."""
+        if end_exclusive <= start_inclusive:
+            return _EmptyProjection()
+        return TopLevelProjection(list(range(start_inclusive, end_exclusive)))
+
+
+class _EmptyProjection(Projection):
+
+    def project(self, row_type) -> List[DataField]:
+        return []
+
+    def is_nested(self) -> bool:
+        return False
+
+    def to_top_level_indexes(self) -> List[int]:
+        return []
+
+    def to_nested_indexes(self) -> List[List[int]]:
+        return []
+
+    def to_name_paths(self, row_type) -> List[List[str]]:
+        return []
+
+
+class TopLevelProjection(Projection):
+    """Single-level projection: pick fields by their top-level index."""
+
+    def __init__(self, indexes: Sequence[int]):
+        self.indexes = list(indexes)
+
+    def project(self, row_type) -> List[DataField]:
+        fields = _row_fields(row_type)
+        return [fields[i] for i in self.indexes]
+
+    def is_nested(self) -> bool:
+        return False
+
+    def to_top_level_indexes(self) -> List[int]:
+        return list(self.indexes)
+
+    def to_nested_indexes(self) -> List[List[int]]:
+        return [[i] for i in self.indexes]
+
+    def to_name_paths(self, row_type) -> List[List[str]]:
+        fields = _row_fields(row_type)
+        return [[fields[i].name] for i in self.indexes]
+
+
+class NestedProjection(Projection):
+    """Projection over paths that may walk into ROW children.
+
+    Each path navigates from a top-level field through successive ROW
+    children. A path of length 1 is equivalent to a top-level selection.
+    """
+
+    def __init__(self, paths: Sequence[Sequence[int]]):
+        if not paths:
+            raise ValueError("NestedProjection requires at least one path")
+        self.paths = [list(p) for p in paths]
+        for p in self.paths:
+            if len(p) == 0:
+                raise ValueError(
+                    "Each projection path must have at least one index")
+        self._has_nested = any(len(p) > 1 for p in self.paths)
+
+    def is_nested(self) -> bool:
+        return self._has_nested
+
+    def to_top_level_indexes(self) -> List[int]:
+        # Preserve order, deduplicate.
+        seen = set()
+        out: List[int] = []
+        for p in self.paths:
+            top = p[0]
+            if top not in seen:
+                seen.add(top)
+                out.append(top)
+        return out
+
+    def to_nested_indexes(self) -> List[List[int]]:
+        return [list(p) for p in self.paths]
+
+    def to_name_paths(self, row_type) -> List[List[str]]:
+        fields = _row_fields(row_type)
+        result: List[List[str]] = []
+        for path in self.paths:
+            field = fields[path[0]]
+            names = [field.name]
+            for idx in path[1:]:
+                child_type = field.type
+                if not _is_row_type(child_type):
+                    raise ValueError(
+                        "Nested projection step expected a ROW type but got %s 
"
+                        "for field '%s'" % (child_type, field.name))
+                child_fields = _row_fields(child_type)
+                field = child_fields[idx]
+                names.append(field.name)
+            result.append(names)
+        return result
+
+    def project(self, row_type) -> List[DataField]:
+        # ``dup_count`` is monotonic across the whole call: it increments
+        # whenever a name has to be suffixed, regardless of which base
+        # name caused the collision. The per-call counter only guarantees
+        # uniqueness, not that ``_$0`` always represents the first
+        # collision of any given base.
+        fields = _row_fields(row_type)
+        out: List[DataField] = []
+        seen_names = set()
+        dup_count = 0
+        for path in self.paths:
+            field = fields[path[0]]
+            name_parts = [field.name]
+            for idx in path[1:]:
+                child_type = field.type
+                if not _is_row_type(child_type):
+                    raise ValueError(
+                        "Nested projection step expected a ROW type but got %s 
"
+                        "for field '%s'" % (child_type, field.name))
+                child_fields = _row_fields(child_type)
+                field = child_fields[idx]
+                name_parts.append(field.name)
+            base_name = "_".join(name_parts)
+            final_name = base_name
+            while final_name in seen_names:
+                final_name = "%s_$%d" % (base_name, dup_count)
+                dup_count += 1
+            seen_names.add(final_name)
+            # Keep the leaf field's ID so downstream schema-evolution
+            # remapping by field ID still works after rename.
+            out.append(DataField(
+                id=field.id,
+                name=final_name,
+                type=field.type,
+                description=getattr(field, 'description', None),
+                default_value=getattr(field, 'default_value', None),
+            ))
+        return out
+
+
+def _row_fields(row_type) -> List[DataField]:
+    """Return the field list of a row-like type. Accepts a RowType, a plain
+    list of DataField, or anything else with a ``.fields`` attribute.
+    """
+    if isinstance(row_type, list):
+        return row_type
+    fields: Optional[List[DataField]] = getattr(row_type, 'fields', None)
+    if fields is None:
+        raise ValueError(
+            "Projection target must be a RowType or have a .fields attribute, "
+            "got %s" % type(row_type).__name__)
+    return list(fields)
+
+
+def _is_row_type(data_type) -> bool:
+    if isinstance(data_type, RowType):
+        return True
+    # Lightweight test stubs may report ``.fields`` without subclassing
+    # RowType — accept those too.
+    return getattr(data_type, 'fields', None) is not None

Reply via email to