This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f11727ae35 [python] Adjust nested-field projection support
f11727ae35 is described below

commit f11727ae3581860c7ebc544054f6f63c2ea67a4f
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 9 20:34:43 2026 +0800

    [python] Adjust nested-field projection support
---
 docs/content/pypaimon/python-api.md                | 18 ++++++++++++
 paimon-python/pypaimon/read/read_builder.py        | 14 ++++-----
 .../pypaimon/read/reader/format_avro_reader.py     | 11 ++-----
 .../pypaimon/read/reader/format_pyarrow_reader.py  | 24 +--------------
 paimon-python/pypaimon/read/split_read.py          | 34 +++++-----------------
 paimon-python/pypaimon/read/table_read.py          | 12 --------
 .../pypaimon/tests/test_projection_utility.py      | 12 ++++----
 paimon-python/pypaimon/utils/projection.py         | 17 ++++-------
 8 files changed, 45 insertions(+), 97 deletions(-)

diff --git a/docs/content/pypaimon/python-api.md 
b/docs/content/pypaimon/python-api.md
index 6e4f96d5dc..50a0117891 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -322,6 +322,24 @@ You can also pushdown projection by `ReadBuilder`:
 read_builder = read_builder.with_projection(['f3', 'f2'])
 ```
 
+For tables with nested struct columns, you can project individual sub-fields 
using dotted names:
+
+```python
+# Given a table with schema: id BIGINT, info ROW<name STRING, age INT>, val 
STRING
+
+# Select a nested sub-field and a top-level field
+read_builder = read_builder.with_projection(['info.name', 'val'])
+
+# The result columns are flattened with underscore-joined names:
+# info_name, val
+```
+
+This pushes the nested column pruning down to the file reader (Parquet/ORC use 
PyArrow's native nested projection; Avro falls back to Python-side extraction). 
Only the requested leaves are read from disk.
+
+Limitations:
+- Nested projection is only supported on append-only tables. Primary-key 
tables that require a merge read will raise `NotImplementedError`.
+- `ARRAY<ROW>` and `MAP` nested paths are not yet supported.
+
 ### Generate Splits
 
 Then you can step into Scan Plan stage to get `splits`:
diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
index ce95115431..77f050a22f 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -24,7 +24,7 @@ from pypaimon.read.table_read import TableRead
 from pypaimon.read.table_scan import TableScan
 from pypaimon.schema.data_types import DataField
 from pypaimon.table.special_fields import SpecialFields
-from pypaimon.utils.projection import Projection, _is_row_type
+from pypaimon.utils.projection import Projection, is_row_type
 
 
 class ReadBuilder:
@@ -35,12 +35,10 @@ class ReadBuilder:
 
         self.table: FileStoreTable = table
         self._predicate: Optional[Predicate] = None
-        # ``_projection`` is the user-facing list of names from
-        # :meth:`with_projection`. ``_nested_paths`` is the canonical
-        # integer-path form (length-1 for top level, longer for nested
-        # ROW children) used by every consumer in this module.
-        # ``with_nested_projection`` populates only the latter; the two
-        # never both hold meaningful state simultaneously.
+        # ``_projection`` stores the user-facing name list from
+        # :meth:`with_projection`. When dotted names are present,
+        # ``_nested_paths`` is also populated and takes precedence
+        # in ``read_type()`` and downstream consumers.
         self._projection: Optional[List[str]] = None
         self._nested_paths: Optional[List[List[int]]] = None
         self._limit: Optional[int] = None
@@ -145,7 +143,7 @@ class ReadBuilder:
             current_field = table_fields[path[0]]
             ok = True
             for part in parts[1:]:
-                if not _is_row_type(current_field.type):
+                if not is_row_type(current_field.type):
                     ok = False
                     break
                 child_fields = current_field.type.fields
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py 
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 1fe41468e8..8ba967af28 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -48,11 +48,6 @@ class FormatAvroReader(RecordBatchReader):
         projected_data_fields = [full_fields_map[name] for name in read_fields]
         self._schema = 
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
 
-        # ``nested_name_paths`` is parallel to ``read_fields``. Top-level
-        # entries are length-1 paths and use the existing ``record.get``
-        # fast path; longer paths walk the record dict step-by-step. The
-        # path's first segment must be a real top-level Avro field —
-        # ``_get_fields_and_predicate`` upstream guarantees this.
         if nested_name_paths is not None and len(nested_name_paths) != 
len(read_fields):
             raise ValueError(
                 "nested_name_paths length {} does not match read_fields length 
{}".format(
@@ -97,10 +92,8 @@ class FormatAvroReader(RecordBatchReader):
 
 
 def _walk_avro_record(record, path: List[str]):
-    """Walk a list of field names through an avro record dict, returning
-    the leaf value or ``None`` if any segment is missing or hits a
-    non-dict value. ``record`` is the top-level fastavro dict; nested
-    record fields surface as nested dicts.
+    """Walk field names through a nested avro record dict, returning the
+    leaf value or ``None`` if any segment is missing.
     """
     current = record
     for name in path:
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py 
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index 0c12d27e4f..3fdf202c7c 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -56,9 +56,6 @@ class FormatPyArrowReader(RecordBatchReader):
         self.read_fields = read_fields
         self._read_field_names = [f.name for f in read_fields]
 
-        # ``nested_name_paths`` is parallel to ``read_fields``; when
-        # any path has length > 1 the scanner is invoked with a
-        # ``{flat_name: ds.field(*path)}`` column dict.
         if nested_name_paths is not None and len(nested_name_paths) != 
len(read_fields):
             raise ValueError(
                 "nested_name_paths length {} does not match read_fields length 
{}".format(
@@ -67,11 +64,6 @@ class FormatPyArrowReader(RecordBatchReader):
         has_nested_path = bool(
             nested_name_paths and any(len(p) > 1 for p in nested_name_paths))
 
-        # Identify which fields exist in the file and which are missing.
-        # For nested projection, "exists" is determined by walking the
-        # whole path against the file schema; sub-field schema evolution
-        # (a leaf renamed or removed) shows up as ``missing`` and is
-        # served as a NULL column, mirroring the top-level handling.
         file_schema = self.dataset.schema
         if has_nested_path:
             self.existing_fields = []
@@ -86,11 +78,6 @@ class FormatPyArrowReader(RecordBatchReader):
             self.existing_fields = [f.name for f in read_fields if f.name in 
file_schema_names]
             self.missing_fields = [f.name for f in read_fields if f.name not 
in file_schema_names]
 
-        # column name → VariantSchema for shredded columns that need assembly.
-        # In nested mode we still want to reassemble shredded VARIANTs
-        # that were projected at the top level — only the columns actually
-        # reached via a length>1 path are skipped (those are sub-fields of
-        # some other struct, not VARIANTs themselves).
         self._shredded_schemas: Dict[str, VariantSchema] = {}
         if options is None or options.variant_shredding_enabled():
             top_level_names = set(file_schema.names)
@@ -102,10 +89,6 @@ class FormatPyArrowReader(RecordBatchReader):
                     self._shredded_schemas[name] = 
build_variant_schema(field_type)
 
         if has_nested_path:
-            # Dict-form columns let PyArrow read leaf fields out of nested
-            # structs without materialising the parent. The dict keys
-            # become the output column names — they're already flattened
-            # to ``a_b`` form by the upstream projection utility.
             existing_set = set(self.existing_fields)
             columns_dict = {}
             for f, path in zip(read_fields, nested_name_paths):
@@ -220,12 +203,7 @@ class FormatPyArrowReader(RecordBatchReader):
 
 
 def _path_exists_in_arrow_schema(schema: pa.Schema, path: List[str]) -> bool:
-    """Walk ``path`` (a list of field names) through a PyArrow schema and
-    return whether every step exists. The first step is a top-level field
-    name; subsequent steps are struct child names. Missing leaves at any
-    depth (e.g. a renamed sub-field) yield ``False`` so the caller can
-    fall back to a NULL column instead of raising during scan setup.
-    """
+    """Check whether a name path is fully resolvable in the given schema."""
     if not path:
         return False
     if path[0] not in schema.names:
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index b68d2f9e36..e5903958d1 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -99,14 +99,13 @@ class SplitRead(ABC):
         self.split = split
         self.row_tracking_enabled = row_tracking_enabled
         self.value_arity = len(read_type)
-        # Parallel to ``read_type``; each entry is the original-schema
-        # name path. ``None`` when no projection or top-level only.
         self.nested_name_paths = nested_name_paths
 
         self.trimmed_primary_key = self.table.trimmed_primary_keys
         self.read_fields = read_type
         if isinstance(self, MergeFileSplitRead):
             self.read_fields = self._create_key_value_fields(read_type)
+        self._cached_nested_path_by_name = self._compute_nested_path_by_name()
         self.schema_id_2_fields = {}
         self.deletion_file_readers = {}
         # Apply filter only when all predicate columns are read by this scan,
@@ -131,11 +130,7 @@ class SplitRead(ABC):
         else:
             self.predicate_for_reader = None
 
-    def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
-        """``{flat_name: original_path}`` for the rows of ``read_type``
-        that go through a nested path (``len > 1``). ``None`` when no
-        such path exists, so callers stay on the top-level fast path.
-        """
+    def _compute_nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
         if not self.nested_name_paths:
             return None
         if not any(len(p) > 1 for p in self.nested_name_paths):
@@ -146,6 +141,9 @@ class SplitRead(ABC):
             out[f.name] = path
         return out
 
+    def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
+        return self._cached_nested_path_by_name
+
     def _push_down_predicate(self) -> Optional[Predicate]:
         if self.predicate is None:
             return None
@@ -187,11 +185,7 @@ class SplitRead(ABC):
                     end = r.to - file.first_row_id
                     row_indices.extend(range(start, end + 1))
 
-        # Map the parallel ``self.nested_name_paths`` (aligned with
-        # ``self.read_fields``) into the same order the format reader
-        # will see, indexed by output column name. Set to ``None`` when
-        # no path has length > 1 so the reader stays on its top-level
-        # fast path.
+        # Map nested paths into the order the format reader will see.
         nested_path_by_name = self._nested_path_by_name()
         has_nested = nested_path_by_name is not None
 
@@ -214,8 +208,6 @@ class SplitRead(ABC):
                                              batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_LANCE:
             if has_nested:
-                # Lance has no nested column pruning today; project the
-                # parent struct in full and extract sub-fields client-side.
                 raise NotImplementedError(
                     "Nested-field projection is not supported on Lance files")
             name_to_field = {f.name: f for f in self.read_fields}
@@ -299,12 +291,9 @@ class SplitRead(ABC):
         return reader
 
     def _get_fields_and_predicate(self, schema_id: int, read_fields):
-        # In nested mode the flat name (``mv_latest_version``) never
-        # appears in the file schema; reachability is decided by the
-        # path's top-level entry instead.
-        nested_path_by_name = self._nested_path_by_name()
         key = (schema_id, tuple(read_fields))
         if key not in self.schema_id_2_fields:
+            nested_path_by_name = self._nested_path_by_name()
             schema = self.table.schema_manager.get_schema(schema_id)
             schema_fields = (
                 SpecialFields.row_type_with_row_tracking(schema.fields)
@@ -366,8 +355,6 @@ class SplitRead(ABC):
 
     def create_index_mapping(self):
         if self._nested_path_by_name() is not None:
-            # Format reader already emits flat columns aligned with
-            # ``read_fields``; the id-based remap can't see leaf IDs.
             return None
         base_index_mapping = self._create_base_index_mapping(self.read_fields, 
self._get_read_data_fields())
         trimmed_key_mapping, _ = 
self._get_trimmed_fields(self._get_read_data_fields(), 
self._get_all_data_fields())
@@ -411,8 +398,6 @@ class SplitRead(ABC):
 
     def _get_final_read_data_fields(self) -> List[str]:
         if self._nested_path_by_name() is not None:
-            # Trimmed-fields filters by ID and drops nested leaves;
-            # hand the format reader the user-facing flat names directly.
             return self._remove_partition_fields(list(self.read_fields))
         _, trimmed_fields = self._get_trimmed_fields(
             self._get_read_data_fields(), self._get_all_data_fields()
@@ -469,11 +454,6 @@ class SplitRead(ABC):
 
     def _construct_partition_mapping(self) -> List[int]:
         if self._nested_path_by_name() is not None:
-            # Nested fields carry leaf IDs that don't match top-level
-            # data-field IDs, so the trimmed-fields machinery can't see
-            # them. Build the mapping directly from ``self.read_fields``:
-            # entries whose flat name equals a partition key get a
-            # negative index, the rest get a sequential read index.
             partition_names = self.table.partition_keys
             mapping = [0] * (len(self.read_fields) + 1)
             p_count = 0
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 4f53e49126..488c964bcd 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -49,9 +49,6 @@ class TableRead:
         self.predicate = predicate
         self.read_type = read_type
         self.include_row_kind = include_row_kind
-        # Parallel to ``read_type``; each entry is the original-schema
-        # name path for the field. ``None`` when no projection (or only
-        # top-level projection) is set.
         self.nested_name_paths = nested_name_paths
 
     def to_iterator(self, splits: List[Split]) -> Iterator:
@@ -275,10 +272,6 @@ class TableRead:
         if self.table.is_primary_key_table and not split.raw_convertible:
             if self.nested_name_paths and any(
                     len(p) > 1 for p in self.nested_name_paths):
-                # The merge function needs full parent structs; outer
-                # projection that walks the path to recover leaves is a
-                # separate change. Project the parent struct and extract
-                # client-side until then.
                 raise NotImplementedError(
                     "Nested-field projection on primary-key tables that "
                     "require a merge read is not yet supported")
@@ -292,11 +285,6 @@ class TableRead:
         elif self.table.options.data_evolution_enabled():
             if self.nested_name_paths and any(
                     len(p) > 1 for p in self.nested_name_paths):
-                # Multi-file union for data-evolution tables matches files
-                # by top-level field ID; a nested ``read_field`` carries
-                # its leaf ID, which never matches and would silently
-                # produce all-NULL columns. Refuse loudly until the
-                # union path is taught to walk paths.
                 raise NotImplementedError(
                     "Nested-field projection on data-evolution tables is "
                     "not yet supported")
diff --git a/paimon-python/pypaimon/tests/test_projection_utility.py 
b/paimon-python/pypaimon/tests/test_projection_utility.py
index 4d1c43eb16..660c2c2dd6 100644
--- a/paimon-python/pypaimon/tests/test_projection_utility.py
+++ b/paimon-python/pypaimon/tests/test_projection_utility.py
@@ -130,7 +130,7 @@ class NestedProjectionTest(unittest.TestCase):
         res = Projection.of([[0, 0], [1, 0]]).project(fields)
         self.assertEqual([f.name for f in res], ['a_x', 'b_x'])
 
-        # When two collapse to the SAME name, the second gets `_$N`.
+        # When two collapse to the SAME name, the second gets `__N`.
         # Build two parents whose leaves have the same compound name.
         sub_x_only = _atomic(30, 'x')
         fields2 = [
@@ -141,7 +141,7 @@ class NestedProjectionTest(unittest.TestCase):
         # First path produces 'a_x'; second is already-existing 'a_x'.
         # Collision → suffix on the second.
         self.assertEqual(res2[0].name, 'a_x')
-        self.assertTrue(res2[1].name.startswith('a_x_$'))
+        self.assertTrue(res2[1].name.startswith('a_x__'))
 
     def test_project_rejects_non_row_step(self):
         # Trying to walk into an atomic field must fail loudly.
@@ -158,7 +158,7 @@ class NestedProjectionTest(unittest.TestCase):
             NestedProjection([[]])
 
     def test_dup_count_is_monotonic_across_distinct_collisions(self):
-        # ``_$N`` is a per-call monotonic counter — distinct collisions
+        # ``__N`` is a per-call monotonic counter — distinct collisions
         # share the suffix space, they don't each restart at 0.
         sub_x_1 = _atomic(20, 'x')
         sub_x_2 = _atomic(21, 'x')
@@ -171,12 +171,12 @@ class NestedProjectionTest(unittest.TestCase):
             _struct(4, 'a', [sub_y_1]),  # collides via path [3, 0] → a_y
             _struct(5, 'a', [sub_x_2, sub_y_2]),  # second a.x collision
         ]
-        # Order: a_x (top) → keeps; [1, 0] flatten → a_x (collision) → a_x_$0
-        # a_y (top) → keeps; [3, 0] flatten → a_y (collision) → a_y_$1
+        # Order: a_x (top) → keeps; [1, 0] flatten → a_x (collision) → a_x__0
+        # a_y (top) → keeps; [3, 0] flatten → a_y (collision) → a_y__1
         res = Projection.of(
             [[0], [1, 0], [2], [3, 0]]).project(fields)
         self.assertEqual(
-            [f.name for f in res], ['a_x', 'a_x_$0', 'a_y', 'a_y_$1'])
+            [f.name for f in res], ['a_x', 'a_x__0', 'a_y', 'a_y__1'])
 
     def test_of_rejects_mixed_int_and_path(self):
         # Mixing top-level indexes and nested paths is a programming error;
diff --git a/paimon-python/pypaimon/utils/projection.py 
b/paimon-python/pypaimon/utils/projection.py
index c87da7ac2c..53d3b02235 100644
--- a/paimon-python/pypaimon/utils/projection.py
+++ b/paimon-python/pypaimon/utils/projection.py
@@ -26,7 +26,7 @@ columns the user wants to read. Two flavours:
   ``[[1, 0], [1, 2]]`` means "the 0th and 2nd children of the field at top
   level index 1". The result is flattened into top-level fields whose
   names are the underscore-joined original path (``a_b`` for ``a.b``,
-  with a ``_$N`` suffix on collisions) and whose IDs are inherited from
+  with a ``__N`` suffix on collisions) and whose IDs are inherited from
   the leaf so schema-evolution remapping by field ID still works.
 """
 
@@ -195,7 +195,7 @@ class NestedProjection(Projection):
             names = [field.name]
             for idx in path[1:]:
                 child_type = field.type
-                if not _is_row_type(child_type):
+                if not is_row_type(child_type):
                     raise ValueError(
                         "Nested projection step expected a ROW type but got %s 
"
                         "for field '%s'" % (child_type, field.name))
@@ -206,11 +206,6 @@ class NestedProjection(Projection):
         return result
 
     def project(self, row_type) -> List[DataField]:
-        # ``dup_count`` is monotonic across the whole call: it increments
-        # whenever a name has to be suffixed, regardless of which base
-        # name caused the collision. The per-call counter only guarantees
-        # uniqueness, not that ``_$0`` always represents the first
-        # collision of any given base.
         fields = _row_fields(row_type)
         out: List[DataField] = []
         seen_names = set()
@@ -220,7 +215,7 @@ class NestedProjection(Projection):
             name_parts = [field.name]
             for idx in path[1:]:
                 child_type = field.type
-                if not _is_row_type(child_type):
+                if not is_row_type(child_type):
                     raise ValueError(
                         "Nested projection step expected a ROW type but got %s 
"
                         "for field '%s'" % (child_type, field.name))
@@ -230,7 +225,7 @@ class NestedProjection(Projection):
             base_name = "_".join(name_parts)
             final_name = base_name
             while final_name in seen_names:
-                final_name = "%s_$%d" % (base_name, dup_count)
+                final_name = "%s__%d" % (base_name, dup_count)
                 dup_count += 1
             seen_names.add(final_name)
             # Keep the leaf field's ID so downstream schema-evolution
@@ -259,9 +254,7 @@ def _row_fields(row_type) -> List[DataField]:
     return list(fields)
 
 
-def _is_row_type(data_type) -> bool:
+def is_row_type(data_type) -> bool:
     if isinstance(data_type, RowType):
         return True
-    # Lightweight test stubs may report ``.fields`` without subclassing
-    # RowType — accept those too.
     return getattr(data_type, 'fields', None) is not None

Reply via email to