This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f11727ae35 [python] Adjust nested-field projection support
f11727ae35 is described below
commit f11727ae3581860c7ebc544054f6f63c2ea67a4f
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 9 20:34:43 2026 +0800
[python] Adjust nested-field projection support
---
docs/content/pypaimon/python-api.md | 18 ++++++++++++
paimon-python/pypaimon/read/read_builder.py | 14 ++++-----
.../pypaimon/read/reader/format_avro_reader.py | 11 ++-----
.../pypaimon/read/reader/format_pyarrow_reader.py | 24 +--------------
paimon-python/pypaimon/read/split_read.py | 34 +++++-----------------
paimon-python/pypaimon/read/table_read.py | 12 --------
.../pypaimon/tests/test_projection_utility.py | 12 ++++----
paimon-python/pypaimon/utils/projection.py | 17 ++++-------
8 files changed, 45 insertions(+), 97 deletions(-)
diff --git a/docs/content/pypaimon/python-api.md
b/docs/content/pypaimon/python-api.md
index 6e4f96d5dc..50a0117891 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -322,6 +322,24 @@ You can also pushdown projection by `ReadBuilder`:
read_builder = read_builder.with_projection(['f3', 'f2'])
```
+For tables with nested struct columns, you can project individual sub-fields
using dotted names:
+
+```python
+# Given a table with schema: id BIGINT, info ROW<name STRING, age INT>, val
STRING
+
+# Select a nested sub-field and a top-level field
+read_builder = read_builder.with_projection(['info.name', 'val'])
+
+# The result columns are flattened with underscore-joined names:
+# info_name, val
+```
+
+This pushes the nested column pruning down to the file reader (Parquet/ORC use
PyArrow's native nested projection; Avro falls back to Python-side extraction).
Only the requested leaves are read from disk.
+
+Limitations:
+- Nested projection is only supported on append-only tables. Primary-key
tables that require a merge read will raise `NotImplementedError`.
+- `ARRAY<ROW>` and `MAP` nested paths are not yet supported.
+
### Generate Splits
Then you can step into Scan Plan stage to get `splits`:
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
index ce95115431..77f050a22f 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -24,7 +24,7 @@ from pypaimon.read.table_read import TableRead
from pypaimon.read.table_scan import TableScan
from pypaimon.schema.data_types import DataField
from pypaimon.table.special_fields import SpecialFields
-from pypaimon.utils.projection import Projection, _is_row_type
+from pypaimon.utils.projection import Projection, is_row_type
class ReadBuilder:
@@ -35,12 +35,10 @@ class ReadBuilder:
self.table: FileStoreTable = table
self._predicate: Optional[Predicate] = None
- # ``_projection`` is the user-facing list of names from
- # :meth:`with_projection`. ``_nested_paths`` is the canonical
- # integer-path form (length-1 for top level, longer for nested
- # ROW children) used by every consumer in this module.
- # ``with_nested_projection`` populates only the latter; the two
- # never both hold meaningful state simultaneously.
+ # ``_projection`` stores the user-facing name list from
+ # :meth:`with_projection`. When dotted names are present,
+ # ``_nested_paths`` is also populated and takes precedence
+ # in ``read_type()`` and downstream consumers.
self._projection: Optional[List[str]] = None
self._nested_paths: Optional[List[List[int]]] = None
self._limit: Optional[int] = None
@@ -145,7 +143,7 @@ class ReadBuilder:
current_field = table_fields[path[0]]
ok = True
for part in parts[1:]:
- if not _is_row_type(current_field.type):
+ if not is_row_type(current_field.type):
ok = False
break
child_fields = current_field.type.fields
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 1fe41468e8..8ba967af28 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -48,11 +48,6 @@ class FormatAvroReader(RecordBatchReader):
projected_data_fields = [full_fields_map[name] for name in read_fields]
self._schema =
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
- # ``nested_name_paths`` is parallel to ``read_fields``. Top-level
- # entries are length-1 paths and use the existing ``record.get``
- # fast path; longer paths walk the record dict step-by-step. The
- # path's first segment must be a real top-level Avro field —
- # ``_get_fields_and_predicate`` upstream guarantees this.
if nested_name_paths is not None and len(nested_name_paths) !=
len(read_fields):
raise ValueError(
"nested_name_paths length {} does not match read_fields length
{}".format(
@@ -97,10 +92,8 @@ class FormatAvroReader(RecordBatchReader):
def _walk_avro_record(record, path: List[str]):
- """Walk a list of field names through an avro record dict, returning
- the leaf value or ``None`` if any segment is missing or hits a
- non-dict value. ``record`` is the top-level fastavro dict; nested
- record fields surface as nested dicts.
+ """Walk field names through a nested avro record dict, returning the
+ leaf value or ``None`` if any segment is missing.
"""
current = record
for name in path:
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index 0c12d27e4f..3fdf202c7c 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -56,9 +56,6 @@ class FormatPyArrowReader(RecordBatchReader):
self.read_fields = read_fields
self._read_field_names = [f.name for f in read_fields]
- # ``nested_name_paths`` is parallel to ``read_fields``; when
- # any path has length > 1 the scanner is invoked with a
- # ``{flat_name: ds.field(*path)}`` column dict.
if nested_name_paths is not None and len(nested_name_paths) !=
len(read_fields):
raise ValueError(
"nested_name_paths length {} does not match read_fields length
{}".format(
@@ -67,11 +64,6 @@ class FormatPyArrowReader(RecordBatchReader):
has_nested_path = bool(
nested_name_paths and any(len(p) > 1 for p in nested_name_paths))
- # Identify which fields exist in the file and which are missing.
- # For nested projection, "exists" is determined by walking the
- # whole path against the file schema; sub-field schema evolution
- # (a leaf renamed or removed) shows up as ``missing`` and is
- # served as a NULL column, mirroring the top-level handling.
file_schema = self.dataset.schema
if has_nested_path:
self.existing_fields = []
@@ -86,11 +78,6 @@ class FormatPyArrowReader(RecordBatchReader):
self.existing_fields = [f.name for f in read_fields if f.name in
file_schema_names]
self.missing_fields = [f.name for f in read_fields if f.name not
in file_schema_names]
- # column name → VariantSchema for shredded columns that need assembly.
- # In nested mode we still want to reassemble shredded VARIANTs
- # that were projected at the top level — only the columns actually
- # reached via a length>1 path are skipped (those are sub-fields of
- # some other struct, not VARIANTs themselves).
self._shredded_schemas: Dict[str, VariantSchema] = {}
if options is None or options.variant_shredding_enabled():
top_level_names = set(file_schema.names)
@@ -102,10 +89,6 @@ class FormatPyArrowReader(RecordBatchReader):
self._shredded_schemas[name] =
build_variant_schema(field_type)
if has_nested_path:
- # Dict-form columns let PyArrow read leaf fields out of nested
- # structs without materialising the parent. The dict keys
- # become the output column names — they're already flattened
- # to ``a_b`` form by the upstream projection utility.
existing_set = set(self.existing_fields)
columns_dict = {}
for f, path in zip(read_fields, nested_name_paths):
@@ -220,12 +203,7 @@ class FormatPyArrowReader(RecordBatchReader):
def _path_exists_in_arrow_schema(schema: pa.Schema, path: List[str]) -> bool:
- """Walk ``path`` (a list of field names) through a PyArrow schema and
- return whether every step exists. The first step is a top-level field
- name; subsequent steps are struct child names. Missing leaves at any
- depth (e.g. a renamed sub-field) yield ``False`` so the caller can
- fall back to a NULL column instead of raising during scan setup.
- """
+ """Check whether a name path is fully resolvable in the given schema."""
if not path:
return False
if path[0] not in schema.names:
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index b68d2f9e36..e5903958d1 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -99,14 +99,13 @@ class SplitRead(ABC):
self.split = split
self.row_tracking_enabled = row_tracking_enabled
self.value_arity = len(read_type)
- # Parallel to ``read_type``; each entry is the original-schema
- # name path. ``None`` when no projection or top-level only.
self.nested_name_paths = nested_name_paths
self.trimmed_primary_key = self.table.trimmed_primary_keys
self.read_fields = read_type
if isinstance(self, MergeFileSplitRead):
self.read_fields = self._create_key_value_fields(read_type)
+ self._cached_nested_path_by_name = self._compute_nested_path_by_name()
self.schema_id_2_fields = {}
self.deletion_file_readers = {}
# Apply filter only when all predicate columns are read by this scan,
@@ -131,11 +130,7 @@ class SplitRead(ABC):
else:
self.predicate_for_reader = None
- def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
- """``{flat_name: original_path}`` for the rows of ``read_type``
- that go through a nested path (``len > 1``). ``None`` when no
- such path exists, so callers stay on the top-level fast path.
- """
+ def _compute_nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
if not self.nested_name_paths:
return None
if not any(len(p) > 1 for p in self.nested_name_paths):
@@ -146,6 +141,9 @@ class SplitRead(ABC):
out[f.name] = path
return out
+ def _nested_path_by_name(self) -> Optional[Dict[str, List[str]]]:
+ return self._cached_nested_path_by_name
+
def _push_down_predicate(self) -> Optional[Predicate]:
if self.predicate is None:
return None
@@ -187,11 +185,7 @@ class SplitRead(ABC):
end = r.to - file.first_row_id
row_indices.extend(range(start, end + 1))
- # Map the parallel ``self.nested_name_paths`` (aligned with
- # ``self.read_fields``) into the same order the format reader
- # will see, indexed by output column name. Set to ``None`` when
- # no path has length > 1 so the reader stays on its top-level
- # fast path.
+ # Map nested paths into the order the format reader will see.
nested_path_by_name = self._nested_path_by_name()
has_nested = nested_path_by_name is not None
@@ -214,8 +208,6 @@ class SplitRead(ABC):
batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_LANCE:
if has_nested:
- # Lance has no nested column pruning today; project the
- # parent struct in full and extract sub-fields client-side.
raise NotImplementedError(
"Nested-field projection is not supported on Lance files")
name_to_field = {f.name: f for f in self.read_fields}
@@ -299,12 +291,9 @@ class SplitRead(ABC):
return reader
def _get_fields_and_predicate(self, schema_id: int, read_fields):
- # In nested mode the flat name (``mv_latest_version``) never
- # appears in the file schema; reachability is decided by the
- # path's top-level entry instead.
- nested_path_by_name = self._nested_path_by_name()
key = (schema_id, tuple(read_fields))
if key not in self.schema_id_2_fields:
+ nested_path_by_name = self._nested_path_by_name()
schema = self.table.schema_manager.get_schema(schema_id)
schema_fields = (
SpecialFields.row_type_with_row_tracking(schema.fields)
@@ -366,8 +355,6 @@ class SplitRead(ABC):
def create_index_mapping(self):
if self._nested_path_by_name() is not None:
- # Format reader already emits flat columns aligned with
- # ``read_fields``; the id-based remap can't see leaf IDs.
return None
base_index_mapping = self._create_base_index_mapping(self.read_fields,
self._get_read_data_fields())
trimmed_key_mapping, _ =
self._get_trimmed_fields(self._get_read_data_fields(),
self._get_all_data_fields())
@@ -411,8 +398,6 @@ class SplitRead(ABC):
def _get_final_read_data_fields(self) -> List[str]:
if self._nested_path_by_name() is not None:
- # Trimmed-fields filters by ID and drops nested leaves;
- # hand the format reader the user-facing flat names directly.
return self._remove_partition_fields(list(self.read_fields))
_, trimmed_fields = self._get_trimmed_fields(
self._get_read_data_fields(), self._get_all_data_fields()
@@ -469,11 +454,6 @@ class SplitRead(ABC):
def _construct_partition_mapping(self) -> List[int]:
if self._nested_path_by_name() is not None:
- # Nested fields carry leaf IDs that don't match top-level
- # data-field IDs, so the trimmed-fields machinery can't see
- # them. Build the mapping directly from ``self.read_fields``:
- # entries whose flat name equals a partition key get a
- # negative index, the rest get a sequential read index.
partition_names = self.table.partition_keys
mapping = [0] * (len(self.read_fields) + 1)
p_count = 0
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 4f53e49126..488c964bcd 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -49,9 +49,6 @@ class TableRead:
self.predicate = predicate
self.read_type = read_type
self.include_row_kind = include_row_kind
- # Parallel to ``read_type``; each entry is the original-schema
- # name path for the field. ``None`` when no projection (or only
- # top-level projection) is set.
self.nested_name_paths = nested_name_paths
def to_iterator(self, splits: List[Split]) -> Iterator:
@@ -275,10 +272,6 @@ class TableRead:
if self.table.is_primary_key_table and not split.raw_convertible:
if self.nested_name_paths and any(
len(p) > 1 for p in self.nested_name_paths):
- # The merge function needs full parent structs; outer
- # projection that walks the path to recover leaves is a
- # separate change. Project the parent struct and extract
- # client-side until then.
raise NotImplementedError(
"Nested-field projection on primary-key tables that "
"require a merge read is not yet supported")
@@ -292,11 +285,6 @@ class TableRead:
elif self.table.options.data_evolution_enabled():
if self.nested_name_paths and any(
len(p) > 1 for p in self.nested_name_paths):
- # Multi-file union for data-evolution tables matches files
- # by top-level field ID; a nested ``read_field`` carries
- # its leaf ID, which never matches and would silently
- # produce all-NULL columns. Refuse loudly until the
- # union path is taught to walk paths.
raise NotImplementedError(
"Nested-field projection on data-evolution tables is "
"not yet supported")
diff --git a/paimon-python/pypaimon/tests/test_projection_utility.py
b/paimon-python/pypaimon/tests/test_projection_utility.py
index 4d1c43eb16..660c2c2dd6 100644
--- a/paimon-python/pypaimon/tests/test_projection_utility.py
+++ b/paimon-python/pypaimon/tests/test_projection_utility.py
@@ -130,7 +130,7 @@ class NestedProjectionTest(unittest.TestCase):
res = Projection.of([[0, 0], [1, 0]]).project(fields)
self.assertEqual([f.name for f in res], ['a_x', 'b_x'])
- # When two collapse to the SAME name, the second gets `_$N`.
+ # When two collapse to the SAME name, the second gets `__N`.
# Build two parents whose leaves have the same compound name.
sub_x_only = _atomic(30, 'x')
fields2 = [
@@ -141,7 +141,7 @@ class NestedProjectionTest(unittest.TestCase):
# First path produces 'a_x'; second is already-existing 'a_x'.
# Collision → suffix on the second.
self.assertEqual(res2[0].name, 'a_x')
- self.assertTrue(res2[1].name.startswith('a_x_$'))
+ self.assertTrue(res2[1].name.startswith('a_x__'))
def test_project_rejects_non_row_step(self):
# Trying to walk into an atomic field must fail loudly.
@@ -158,7 +158,7 @@ class NestedProjectionTest(unittest.TestCase):
NestedProjection([[]])
def test_dup_count_is_monotonic_across_distinct_collisions(self):
- # ``_$N`` is a per-call monotonic counter — distinct collisions
+ # ``__N`` is a per-call monotonic counter — distinct collisions
# share the suffix space, they don't each restart at 0.
sub_x_1 = _atomic(20, 'x')
sub_x_2 = _atomic(21, 'x')
@@ -171,12 +171,12 @@ class NestedProjectionTest(unittest.TestCase):
_struct(4, 'a', [sub_y_1]), # collides via path [3, 0] → a_y
_struct(5, 'a', [sub_x_2, sub_y_2]), # second a.x collision
]
- # Order: a_x (top) → keeps; [1, 0] flatten → a_x (collision) → a_x_$0
- # a_y (top) → keeps; [3, 0] flatten → a_y (collision) → a_y_$1
+ # Order: a_x (top) → keeps; [1, 0] flatten → a_x (collision) → a_x__0
+ # a_y (top) → keeps; [3, 0] flatten → a_y (collision) → a_y__1
res = Projection.of(
[[0], [1, 0], [2], [3, 0]]).project(fields)
self.assertEqual(
- [f.name for f in res], ['a_x', 'a_x_$0', 'a_y', 'a_y_$1'])
+ [f.name for f in res], ['a_x', 'a_x__0', 'a_y', 'a_y__1'])
def test_of_rejects_mixed_int_and_path(self):
# Mixing top-level indexes and nested paths is a programming error;
diff --git a/paimon-python/pypaimon/utils/projection.py
b/paimon-python/pypaimon/utils/projection.py
index c87da7ac2c..53d3b02235 100644
--- a/paimon-python/pypaimon/utils/projection.py
+++ b/paimon-python/pypaimon/utils/projection.py
@@ -26,7 +26,7 @@ columns the user wants to read. Two flavours:
``[[1, 0], [1, 2]]`` means "the 0th and 2nd children of the field at top
level index 1". The result is flattened into top-level fields whose
names are the underscore-joined original path (``a_b`` for ``a.b``,
- with a ``_$N`` suffix on collisions) and whose IDs are inherited from
+ with a ``__N`` suffix on collisions) and whose IDs are inherited from
the leaf so schema-evolution remapping by field ID still works.
"""
@@ -195,7 +195,7 @@ class NestedProjection(Projection):
names = [field.name]
for idx in path[1:]:
child_type = field.type
- if not _is_row_type(child_type):
+ if not is_row_type(child_type):
raise ValueError(
"Nested projection step expected a ROW type but got %s
"
"for field '%s'" % (child_type, field.name))
@@ -206,11 +206,6 @@ class NestedProjection(Projection):
return result
def project(self, row_type) -> List[DataField]:
- # ``dup_count`` is monotonic across the whole call: it increments
- # whenever a name has to be suffixed, regardless of which base
- # name caused the collision. The per-call counter only guarantees
- # uniqueness, not that ``_$0`` always represents the first
- # collision of any given base.
fields = _row_fields(row_type)
out: List[DataField] = []
seen_names = set()
@@ -220,7 +215,7 @@ class NestedProjection(Projection):
name_parts = [field.name]
for idx in path[1:]:
child_type = field.type
- if not _is_row_type(child_type):
+ if not is_row_type(child_type):
raise ValueError(
"Nested projection step expected a ROW type but got %s
"
"for field '%s'" % (child_type, field.name))
@@ -230,7 +225,7 @@ class NestedProjection(Projection):
base_name = "_".join(name_parts)
final_name = base_name
while final_name in seen_names:
- final_name = "%s_$%d" % (base_name, dup_count)
+ final_name = "%s__%d" % (base_name, dup_count)
dup_count += 1
seen_names.add(final_name)
# Keep the leaf field's ID so downstream schema-evolution
@@ -259,9 +254,7 @@ def _row_fields(row_type) -> List[DataField]:
return list(fields)
-def _is_row_type(data_type) -> bool:
+def is_row_type(data_type) -> bool:
if isinstance(data_type, RowType):
return True
- # Lightweight test stubs may report ``.fields`` without subclassing
- # RowType — accept those too.
return getattr(data_type, 'fields', None) is not None