TheR1sing3un commented on code in PR #8187:
URL: https://github.com/apache/paimon/pull/8187#discussion_r3394325649
##########
paimon-python/pypaimon/schema/schema_manager.py:
##########
@@ -44,6 +46,108 @@ def _find_field_index(fields: List[DataField], field_name:
str) -> Optional[int]
return None
+def _extract_row_data_fields(data_type, out_fields: List[DataField]) -> int:
+ """Collect the immediate sub-fields reachable from *data_type* into
+ *out_fields* and return the path depth consumed. A ROW contributes its
+ fields (depth 1); an ARRAY/MAP is transparent and descends into its
+ element/value (consuming the ``element``/``value`` path token); anything
+ else contributes nothing (depth 1)."""
+ if isinstance(data_type, RowType):
+ out_fields.extend(data_type.fields)
+ return 1
+ if isinstance(data_type, ArrayType):
+ return _extract_row_data_fields(data_type.element, out_fields) + 1
+ if isinstance(data_type, MapType):
+ return _extract_row_data_fields(data_type.value, out_fields) + 1
+ return 1
+
+
+def _wrap_new_row_type(data_type, nested_fields: List[DataField]):
+ """Rebuild *data_type* substituting *nested_fields* at its innermost ROW,
+ preserving any ARRAY/MAP wrappers."""
+ if isinstance(data_type, RowType):
+ return RowType(data_type.nullable, nested_fields)
+ if isinstance(data_type, ArrayType):
+ return ArrayType(data_type.nullable,
_wrap_new_row_type(data_type.element, nested_fields))
+ if isinstance(data_type, MapType):
+ return MapType(
+ data_type.nullable, data_type.key,
+ _wrap_new_row_type(data_type.value, nested_fields))
+ return data_type
+
+
+def _get_root_type(data_type, curr_depth: int, max_depth: int):
+ """Return the type sitting at ``max_depth`` when walking ARRAY/MAP wrappers
+ from *data_type* (e.g. the INT in ARRAY<MAP<STRING, ARRAY<INT>>>)."""
+ if curr_depth == max_depth - 1:
+ return data_type
+ if isinstance(data_type, ArrayType):
+ return _get_root_type(data_type.element, curr_depth + 1, max_depth)
+ if isinstance(data_type, MapType):
+ return _get_root_type(data_type.value, curr_depth + 1, max_depth)
+ return data_type
+
+
+def _get_array_map_type_with_target_type_root(source, target, curr_depth: int,
max_depth: int):
+ """Rebuild *source* with *target* substituted at ``max_depth``, keeping the
+ ARRAY/MAP wrappers around it intact."""
+ if curr_depth == max_depth - 1:
+ return target
+ if isinstance(source, ArrayType):
+ return ArrayType(
+ source.nullable,
+ _get_array_map_type_with_target_type_root(
+ source.element, target, curr_depth + 1, max_depth))
+ if isinstance(source, MapType):
+ return MapType(
+ source.nullable, source.key,
+ _get_array_map_type_with_target_type_root(
+ source.value, target, curr_depth + 1, max_depth))
+ return target
+
+
+def _update_intermediate_column(new_fields, previous_fields, depth, prev_depth,
+ field_names, update_last_fn):
+ """Walk *field_names* into nested ROW (transparently through ARRAY/MAP),
+ then run *update_last_fn(depth, fields, name)* on the field list that
+ directly contains the final path element, rebuilding parent types
upward."""
+ if depth == len(field_names) - 1:
+ update_last_fn(depth, new_fields, field_names[depth])
+ return
+ if depth >= len(field_names):
+ # Path descended through ARRAY/MAP past the last ROW; operate on the
+ # field that owns the wrapper at the previous depth.
+ update_last_fn(prev_depth, previous_fields, field_names[prev_depth])
+ return
+ for i, field in enumerate(new_fields):
+ if field.name != field_names[depth]:
+ continue
+ nested_fields: List[DataField] = []
+ new_depth = depth + _extract_row_data_fields(field.type, nested_fields)
Review Comment:
fixed
##########
paimon-python/pypaimon/read/reader/data_file_batch_reader.py:
##########
@@ -57,55 +122,105 @@ def __init__(self, format_reader: RecordBatchReader,
index_mapping: List[int], p
self.file_io = file_io
# Per-file field-id normalization: map the physically-read columns
# (the file's own field order/names) onto the latest read target by
- # field id, padding missing ids with NULL. ``None`` when there is no
- # evolution to reconcile (identity) -- the common path stays zero-copy.
- self._normalize_positions, self._normalize_names = \
- self._build_normalize_plan(file_data_fields, target_data_fields)
+ # field id, padding missing ids with NULL and recursing into nested
+ # ROW / ARRAY<ROW> / MAP<.,ROW> sub-fields the same way. ``None`` when
+ # there is no evolution to reconcile -- the common path stays
zero-copy.
+ self._normalize_plan = self._build_normalize_plan(file_data_fields,
target_data_fields)
@staticmethod
def _build_normalize_plan(file_data_fields, target_data_fields):
"""Build a per-file field-id alignment plan.
- Returns ``(positions, names)`` where ``positions[i]`` is the column
- index in the physically-read batch carrying ``target_data_fields[i]``
- (matched by field id), or -1 if the file does not contain that id (pad
- NULL). ``names[i]`` is the latest target name. Returns ``(None, None)``
- when the plan is the identity (no evolution), so the caller skips
- normalization and stays zero-copy.
+ Returns a list of ``(pos, file_field, target_field)`` -- one per target
+ field, in target order -- where ``pos`` is the column index in the
+ physically-read batch carrying ``target_field`` (matched by field id),
+ or -1 if the file does not contain that id (pad NULL). Returns ``None``
+ when the file already matches the target exactly (no evolution), so the
+ caller stays zero-copy.
"""
if file_data_fields is None or target_data_fields is None:
- return None, None
+ return None
+ # Recursive equality covers nested sub-field changes too: any rename /
+ # add / drop / type change at any depth makes the file != target.
+ if file_data_fields == target_data_fields:
+ return None
file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)}
- positions = []
- names = []
- # Identity only when every target maps to the same physical position
- # AND already carries the same name -- a rename keeps the position but
- # changes the name, which still requires a relabel pass.
- identity = len(file_data_fields) == len(target_data_fields)
- for i, target in enumerate(target_data_fields):
+ plan = []
+ for target in target_data_fields:
pos = file_id_to_pos.get(target.id, -1)
- positions.append(pos)
- names.append(target.name)
- if pos != i or (pos >= 0 and file_data_fields[pos].name !=
target.name):
- identity = False
- if identity:
- return None, None
- return positions, names
+ file_field = file_data_fields[pos] if pos >= 0 else None
+ plan.append((pos, file_field, target))
+ return plan
def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch:
"""Reorder/pad the physically-read batch onto the latest read target by
- field id, and relabel columns to the latest names. Missing ids become
- all-NULL columns; types are reconciled later by
_align_batch_to_read_schema."""
- if self._normalize_positions is None:
+ field id, relabel columns to the latest names, and align nested ROW
+ sub-fields by id. Missing ids become typed all-NULL columns."""
+ if self._normalize_plan is None:
return record_batch
num_rows = record_batch.num_rows
arrays = []
- for pos in self._normalize_positions:
+ names = []
+ for pos, file_field, target_field in self._normalize_plan:
+ target_pa_type =
PyarrowFieldParser.from_paimon_type(target_field.type)
if pos < 0:
- arrays.append(pa.nulls(num_rows))
+ arrays.append(pa.nulls(num_rows, type=target_pa_type))
else:
- arrays.append(record_batch.column(pos))
- return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names)
+ arrays.append(self._align_array_by_id(
+ record_batch.column(pos), file_field.type,
target_field.type))
+ names.append(target_field.name)
+ return pa.RecordBatch.from_arrays(arrays, names=names)
+
+ def _align_array_by_id(self, array, file_type, target_type):
+ """Return *array* converted to *target_type*, matching ROW sub-fields
by
+ field id (reorder, pad missing with NULL, follow renames, cast changed
+ types) recursively, transparently through ARRAY/MAP wrappers."""
+ if isinstance(target_type, RowType) and isinstance(file_type, RowType):
+ n = len(array)
+ file_id_to_pos = {f.id: i for i, f in enumerate(file_type.fields)}
+ children = []
+ pa_fields = []
+ for tsub in target_type.fields:
+ p = file_id_to_pos.get(tsub.id, -1)
+ if p < 0:
+ child = pa.nulls(n,
type=PyarrowFieldParser.from_paimon_type(tsub.type))
+ else:
+ child = self._align_array_by_id(
+ array.field(p), file_type.fields[p].type, tsub.type)
+ children.append(child)
+ pa_fields.append(pa.field(tsub.name, child.type,
nullable=tsub.type.nullable))
+ # Preserve the struct's own null mask; child values under a null
+ # struct are irrelevant.
+ return pa.StructArray.from_arrays(
+ children, fields=pa_fields, mask=pc.is_null(array))
+ if isinstance(target_type, ArrayType) and isinstance(file_type,
ArrayType):
+ aligned_values = self._align_array_by_id(
+ array.values, file_type.element, target_type.element)
+ return pa.ListArray.from_arrays(
+ array.offsets, aligned_values, mask=pc.is_null(array))
+ if isinstance(target_type, MapType) and isinstance(file_type, MapType):
+ aligned_items = self._align_array_by_id(
+ array.items, file_type.value, target_type.value)
+ # MapArray.from_arrays cannot carry a null mask (a null map would
+ # collapse to an empty one), so rebuild from buffers, reusing the
+ # original validity/offset buffers and only swapping the value
child.
+ target_pa = PyarrowFieldParser.from_paimon_type(target_type)
+ entries = pa.StructArray.from_arrays(
+ [array.keys, aligned_items],
+ fields=[target_pa.key_field, target_pa.item_field])
+ return pa.Array.from_buffers(
+ target_pa, len(array), array.buffers()[:2], children=[entries])
+ # A constructed type changed to a character string: pyarrow cannot
+ # cast struct/list/map to utf8 directly, so render the engine's
+ # string form instead.
+ if (isinstance(file_type, (RowType, ArrayType, MapType))
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]