JingsongLi commented on code in PR #8187:
URL: https://github.com/apache/paimon/pull/8187#discussion_r3393438569


##########
paimon-python/pypaimon/read/reader/data_file_batch_reader.py:
##########
@@ -57,55 +122,105 @@ def __init__(self, format_reader: RecordBatchReader, 
index_mapping: List[int], p
         self.file_io = file_io
         # Per-file field-id normalization: map the physically-read columns
         # (the file's own field order/names) onto the latest read target by
-        # field id, padding missing ids with NULL. ``None`` when there is no
-        # evolution to reconcile (identity) -- the common path stays zero-copy.
-        self._normalize_positions, self._normalize_names = \
-            self._build_normalize_plan(file_data_fields, target_data_fields)
+        # field id, padding missing ids with NULL and recursing into nested
+        # ROW / ARRAY<ROW> / MAP<.,ROW> sub-fields the same way. ``None`` when
+        # there is no evolution to reconcile -- the common path stays 
zero-copy.
+        self._normalize_plan = self._build_normalize_plan(file_data_fields, 
target_data_fields)
 
     @staticmethod
     def _build_normalize_plan(file_data_fields, target_data_fields):
         """Build a per-file field-id alignment plan.
 
-        Returns ``(positions, names)`` where ``positions[i]`` is the column
-        index in the physically-read batch carrying ``target_data_fields[i]``
-        (matched by field id), or -1 if the file does not contain that id (pad
-        NULL). ``names[i]`` is the latest target name. Returns ``(None, None)``
-        when the plan is the identity (no evolution), so the caller skips
-        normalization and stays zero-copy.
+        Returns a list of ``(pos, file_field, target_field)`` -- one per target
+        field, in target order -- where ``pos`` is the column index in the
+        physically-read batch carrying ``target_field`` (matched by field id),
+        or -1 if the file does not contain that id (pad NULL). Returns ``None``
+        when the file already matches the target exactly (no evolution), so the
+        caller stays zero-copy.
         """
         if file_data_fields is None or target_data_fields is None:
-            return None, None
+            return None
+        # Recursive equality covers nested sub-field changes too: any rename /
+        # add / drop / type change at any depth makes the file != target.
+        if file_data_fields == target_data_fields:
+            return None
         file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)}
-        positions = []
-        names = []
-        # Identity only when every target maps to the same physical position
-        # AND already carries the same name -- a rename keeps the position but
-        # changes the name, which still requires a relabel pass.
-        identity = len(file_data_fields) == len(target_data_fields)
-        for i, target in enumerate(target_data_fields):
+        plan = []
+        for target in target_data_fields:
             pos = file_id_to_pos.get(target.id, -1)
-            positions.append(pos)
-            names.append(target.name)
-            if pos != i or (pos >= 0 and file_data_fields[pos].name != 
target.name):
-                identity = False
-        if identity:
-            return None, None
-        return positions, names
+            file_field = file_data_fields[pos] if pos >= 0 else None
+            plan.append((pos, file_field, target))
+        return plan
 
     def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch:
         """Reorder/pad the physically-read batch onto the latest read target by
-        field id, and relabel columns to the latest names. Missing ids become
-        all-NULL columns; types are reconciled later by 
_align_batch_to_read_schema."""
-        if self._normalize_positions is None:
+        field id, relabel columns to the latest names, and align nested ROW
+        sub-fields by id. Missing ids become typed all-NULL columns."""
+        if self._normalize_plan is None:
             return record_batch
         num_rows = record_batch.num_rows
         arrays = []
-        for pos in self._normalize_positions:
+        names = []
+        for pos, file_field, target_field in self._normalize_plan:
+            target_pa_type = 
PyarrowFieldParser.from_paimon_type(target_field.type)
             if pos < 0:
-                arrays.append(pa.nulls(num_rows))
+                arrays.append(pa.nulls(num_rows, type=target_pa_type))
             else:
-                arrays.append(record_batch.column(pos))
-        return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names)
+                arrays.append(self._align_array_by_id(
+                    record_batch.column(pos), file_field.type, 
target_field.type))
+            names.append(target_field.name)
+        return pa.RecordBatch.from_arrays(arrays, names=names)
+
+    def _align_array_by_id(self, array, file_type, target_type):
+        """Return *array* converted to *target_type*, matching ROW sub-fields 
by
+        field id (reorder, pad missing with NULL, follow renames, cast changed
+        types) recursively, transparently through ARRAY/MAP wrappers."""
+        if isinstance(target_type, RowType) and isinstance(file_type, RowType):
+            n = len(array)
+            file_id_to_pos = {f.id: i for i, f in enumerate(file_type.fields)}
+            children = []
+            pa_fields = []
+            for tsub in target_type.fields:
+                p = file_id_to_pos.get(tsub.id, -1)
+                if p < 0:
+                    child = pa.nulls(n, 
type=PyarrowFieldParser.from_paimon_type(tsub.type))
+                else:
+                    child = self._align_array_by_id(
+                        array.field(p), file_type.fields[p].type, tsub.type)
+                children.append(child)
+                pa_fields.append(pa.field(tsub.name, child.type, 
nullable=tsub.type.nullable))
+            # Preserve the struct's own null mask; child values under a null
+            # struct are irrelevant.
+            return pa.StructArray.from_arrays(
+                children, fields=pa_fields, mask=pc.is_null(array))
+        if isinstance(target_type, ArrayType) and isinstance(file_type, 
ArrayType):
+            aligned_values = self._align_array_by_id(
+                array.values, file_type.element, target_type.element)
+            return pa.ListArray.from_arrays(
+                array.offsets, aligned_values, mask=pc.is_null(array))
+        if isinstance(target_type, MapType) and isinstance(file_type, MapType):
+            aligned_items = self._align_array_by_id(
+                array.items, file_type.value, target_type.value)
+            # MapArray.from_arrays cannot carry a null mask (a null map would
+            # collapse to an empty one), so rebuild from buffers, reusing the
+            # original validity/offset buffers and only swapping the value 
child.
+            target_pa = PyarrowFieldParser.from_paimon_type(target_type)
+            entries = pa.StructArray.from_arrays(
+                [array.keys, aligned_items],
+                fields=[target_pa.key_field, target_pa.item_field])
+            return pa.Array.from_buffers(
+                target_pa, len(array), array.buffers()[:2], children=[entries])
+        # A constructed type changed to a character string: pyarrow cannot
+        # cast struct/list/map to utf8 directly, so render the engine's
+        # string form instead.
+        if (isinstance(file_type, (RowType, ArrayType, MapType))

Review Comment:
   This still leaves VECTOR -> STRING accepted but unreadable. `supports_cast` 
includes VECTOR in `CONSTRUCTED`, so `update_column_type("embed", STRING)` 
succeeds for a vector column, but old files reach this fallback and PyArrow 
raises `ArrowNotImplementedError: Unsupported cast from fixed_size_list<...> to 
utf8`. I reproduced it with an inline `pa.list_(pa.float32(), 3)` vector 
column. Please either include `VectorType` in the read-time string rendering 
path (fixed-size lists do not have offsets like normal lists), or narrow the 
cast rule so VECTOR -> CHAR/VARCHAR is rejected until it is supported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to