This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 87019a332db7 fix(hive): Tolerate pruned ArrayWritable in nested BLOB 
projection (#18581)
87019a332db7 is described below

commit 87019a332db78e88ccb7968739a91a45f8f1a06f
Author: voonhous <[email protected]>
AuthorDate: Thu May 7 17:29:41 2026 +0800

    fix(hive): Tolerate pruned ArrayWritable in nested BLOB projection (#18581)
    
    Fixes issue: #18577
    
    When Hive's FetchOperator pushes nested column projection (e.g.
    `SELECT blob_data.reference.external_path`) through Parquet via
    `hive.io.file.readNestedColumn.paths`, the reader returns a compacted
    ArrayWritable holding only the projected sub-fields in low slots,
    while oldSchema stays the full 3-field canonical BLOB
    (BlobLogicalType.validate rejects partial field lists; pruneDataSchema
    deliberately preserves the canonical shape). Positional indexing into
    the compacted array AIOBEs, and even with a bounds guard, Hive's
    ObjectInspector downstream expects projected values at their
    canonical positions - the rewrite must remap, not just survive.
    
    Introduce a projection-aware rewrite path:
    
    - HoodieProjectionMask (new) - immutable per-level descriptor of
      physical layout. isCanonicalAtThisLevel() means schema positions
      apply; otherwise physicalIndexOf / physicalOrder map field names
      to physical slots.
    - HoodieColumnProjectionUtils.buildNestedProjectionMask() - parses
      hive.io.file.readNestedColumn.paths, walks RECORD / BLOB / VARIANT,
      returns the matching mask (or all() when projection is absent).
    - HiveHoodieReaderContext threads the mask into a new 5-arg
      rewriteRecordWithNewSchema overload.
    - HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchemaInternal
      branches on the mask:
        - rewriteCanonicalRecord - legacy positional logic with a
          defensive oldField.pos() < arrayLength guard.
        - rewriteCompactedRecord - iterates physicalOrder() and writes
          each projected slot at its canonical position so the
          downstream ObjectInspector finds fields where it expects them.
    
    The compacted path is the primary fix; the canonical-path bounds
    guard is a defensive fallback.
    
    Tests: TestHoodieColumnProjectionUtils covers mask construction;
    TestHoodieArrayWritableSchemaUtils covers the AIOBE reproducer,
    compacted round-trip, and a canonical-shape regression.
    HoodieSchemaTestUtils gains createPlainBlobRecord and
    createPlainVariantRecord helpers (variant helper for upcoming
    VARIANT parity).
---
 .../hudi/common/schema/HoodieProjectionMask.java   | 199 +++++++++++++++++++++
 .../hudi/common/schema/HoodieSchemaTestUtils.java  |   4 +-
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  12 +-
 .../hudi/hadoop/HoodieColumnProjectionUtils.java   |  99 ++++++++++
 .../utils/HoodieArrayWritableSchemaUtils.java      | 179 ++++++++++++------
 .../hadoop/TestHoodieColumnProjectionUtils.java    |  92 ++++++++++
 .../utils/TestHoodieArrayWritableSchemaUtils.java  | 108 ++++++++++-
 7 files changed, 630 insertions(+), 63 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java
new file mode 100644
index 000000000000..0f0a8de07816
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieProjectionMask.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+
+/**
+ * Describes the physical field layout of an incoming columnar record when an 
upstream
+ * reader (e.g. Hive's Parquet reader) has compacted nested struct projection 
— that is,
+ * dropped non-selected sub-fields and shifted the survivors into low slots.
+ *
+ * <p>The canonical (Hudi-side) schema declares the full ordered field list. 
When the
+ * upstream reader projects only a subset of a struct, the resulting array's 
slots no
+ * longer correspond to canonical positions. This mask captures, per record 
level, the
+ * physical layout, so position-based access can be remapped.
+ *
+ * <p>A mask carries two pieces of information for the level it describes:
+ * <ol>
+ *   <li>Whether THIS level is canonical-shaped or compacted. Hive's Parquet 
reader pads
+ *       non-projected top-level columns with nulls (canonical), but compacts 
non-projected
+ *       sub-fields of struct columns (compacted).
+ *   <li>Optional per-child masks. A canonical level can still descend into a 
child whose
+ *       sub-record is compacted — for example, the top-level row is canonical 
but the
+ *       blob_data sub-record's interior is compacted by Hive's 
nested-projection pushdown.
+ * </ol>
+ *
+ * <p>{@link #all()} is the no-op identity used everywhere outside the 
projected-record
+ * path. {@link #canonicalWith(Map)} preserves canonical positions at this 
level while
+ * supplying compacted descent for specific children.
+ */
+public final class HoodieProjectionMask {
+
+  private static final HoodieProjectionMask ALL = new 
HoodieProjectionMask(false, Collections.emptyMap(), Collections.emptyMap());
+
+  // True when THIS level's ArrayWritable is compacted to only the projected 
fields.
+  // False when this level is canonical-shaped (full positions per schema, 
with nulls for unprojected).
+  private final boolean compactedAtThisLevel;
+  // Field name -> position in the projected ArrayWritable. Only meaningful 
when compactedAtThisLevel.
+  private final Map<String, Integer> physicalIndex;
+  // Per-child mask used during descent. May be non-empty even when this level 
is canonical.
+  private final Map<String, HoodieProjectionMask> childMasks;
+
+  private HoodieProjectionMask(boolean compactedAtThisLevel, Map<String, 
Integer> physicalIndex, Map<String, HoodieProjectionMask> childMasks) {
+    this.compactedAtThisLevel = compactedAtThisLevel;
+    this.physicalIndex = physicalIndex;
+    this.childMasks = childMasks;
+  }
+
+  public static HoodieProjectionMask all() {
+    return ALL;
+  }
+
+  /**
+   * Mask whose THIS level is canonical-shaped but whose listed children carry 
their own
+   * (typically compacted) descent masks. Use this at boundaries where the 
outer record
+   * is full but specific sub-records have been compacted by the reader (the 
typical
+   * Hive nested-projection-on-BLOB shape).
+   */
+  public static HoodieProjectionMask canonicalWith(Map<String, 
HoodieProjectionMask> childMasks) {
+    if (childMasks == null || childMasks.isEmpty()) {
+      return ALL;
+    }
+    return new HoodieProjectionMask(false, Collections.emptyMap(), 
Collections.unmodifiableMap(new LinkedHashMap<>(childMasks)));
+  }
+
+  /**
+   * True when no remapping or descent override applies — the rewrite can use 
canonical
+   * positions everywhere below this point.
+   */
+  public boolean isAll() {
+    return !compactedAtThisLevel && childMasks.isEmpty();
+  }
+
+  /**
+   * True when the level this mask describes is canonical-shaped (positions 
match the
+   * schema). Children may still carry compacted sub-masks.
+   */
+  public boolean isCanonicalAtThisLevel() {
+    return !compactedAtThisLevel;
+  }
+
+  /**
+   * Position of {@code fieldName} in the compacted ArrayWritable at this 
level. Empty
+   * when this level is canonical or the field was not projected.
+   */
+  public OptionalInt physicalIndexOf(String fieldName) {
+    if (!compactedAtThisLevel) {
+      return OptionalInt.empty();
+    }
+    Integer idx = physicalIndex.get(fieldName);
+    return idx == null ? OptionalInt.empty() : OptionalInt.of(idx);
+  }
+
+  /**
+   * Field names of this compacted level, in physical-position order. Empty 
when the
+   * level is canonical.
+   */
+  public List<String> physicalOrder() {
+    if (!compactedAtThisLevel) {
+      return Collections.emptyList();
+    }
+    return new ArrayList<>(physicalIndex.keySet());
+  }
+
+  /**
+   * Mask to use when recursing into {@code fieldName}'s sub-record. Falls 
back to
+   * {@link #all()} when no override is registered for this child.
+   */
+  public HoodieProjectionMask childOrAll(String fieldName) {
+    HoodieProjectionMask child = childMasks.get(fieldName);
+    return child == null ? ALL : child;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Builds a mask describing a compacted level. Insertion order defines the 
physical
+   * field order in the projected ArrayWritable.
+   */
+  public static final class Builder {
+    private final LinkedHashMap<String, HoodieProjectionMask> children = new 
LinkedHashMap<>();
+
+    public Builder field(String name) {
+      return field(name, ALL);
+    }
+
+    public Builder field(String name, HoodieProjectionMask child) {
+      children.put(name, Objects.requireNonNull(child, "child mask"));
+      return this;
+    }
+
+    public HoodieProjectionMask build() {
+      if (children.isEmpty()) {
+        return ALL;
+      }
+      LinkedHashMap<String, HoodieProjectionMask> childrenCopy = new 
LinkedHashMap<>(children);
+      LinkedHashMap<String, Integer> index = new 
LinkedHashMap<>(children.size());
+      int i = 0;
+      for (String name : childrenCopy.keySet()) {
+        index.put(name, i++);
+      }
+      return new HoodieProjectionMask(
+          true,
+          Collections.unmodifiableMap(index),
+          Collections.unmodifiableMap(childrenCopy));
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof HoodieProjectionMask)) {
+      return false;
+    }
+    HoodieProjectionMask other = (HoodieProjectionMask) o;
+    return compactedAtThisLevel == other.compactedAtThisLevel
+        && physicalIndex.equals(other.physicalIndex)
+        && childMasks.equals(other.childMasks);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(compactedAtThisLevel, physicalIndex, childMasks);
+  }
+
+  @Override
+  public String toString() {
+    if (isAll()) {
+      return "HoodieProjectionMask{ALL}";
+    }
+    return "HoodieProjectionMask{compacted=" + compactedAtThisLevel + 
",children=" + childMasks + "}";
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
index faba63d66690..6052639d209e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
@@ -75,7 +75,7 @@ public class HoodieSchemaTestUtils {
    * was stripped by Spark's TableOutputResolver Cast.
    */
   public static HoodieSchema createPlainBlobRecord(String recordName) {
-    HoodieSchema reference = HoodieSchema.createRecord("reference", null, 
null, false, Arrays.asList(
+    HoodieSchema referenceSchema = HoodieSchema.createRecord("reference", 
null, null, false, Arrays.asList(
         HoodieSchemaField.of("external_path", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
         HoodieSchemaField.of("offset", 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, 
null),
         HoodieSchemaField.of("length", 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, 
null),
@@ -83,7 +83,7 @@ public class HoodieSchemaTestUtils {
     return HoodieSchema.createRecord(recordName, null, null, false, 
Arrays.asList(
         HoodieSchemaField.of("type", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
         HoodieSchemaField.of("data", 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.BYTES)), null, 
JsonProperties.NULL_VALUE),
-        HoodieSchemaField.of("reference", 
HoodieSchema.createNullable(reference), null, JsonProperties.NULL_VALUE)));
+        HoodieSchemaField.of("reference", 
HoodieSchema.createNullable(referenceSchema), null, 
JsonProperties.NULL_VALUE)));
   }
 
   /**
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 9e7994f0147b..d9e99742674d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
 import org.apache.hudi.common.schema.HoodieSchemaField;
@@ -36,6 +37,7 @@ import 
org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieAvroSchemaException;
 import org.apache.hudi.hadoop.utils.HiveTypeUtils;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableSchemaUtils;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
@@ -174,11 +176,15 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
       firstRecordReader = recordReader;
     }
     ClosableIterator<ArrayWritable> recordIterator = new 
RecordReaderValueIterator<>(recordReader);
-    if 
(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(modifiedDataSchema, 
requiredSchema)) {
+    HoodieProjectionMask physicalMask = 
HoodieColumnProjectionUtils.buildNestedProjectionMask(jobConfCopy, 
modifiedDataSchema);
+    if (physicalMask.isAll() && 
HoodieSchemaCompatibility.areSchemasProjectionEquivalent(modifiedDataSchema, 
requiredSchema)) {
       return recordIterator;
     }
-    // record reader puts the required columns in the positions of the data 
schema and nulls the rest of the columns
-    return new CloseableMappingIterator<>(recordIterator, 
recordContext.projectRecord(modifiedDataSchema, requiredSchema));
+    // record reader puts the required columns in the positions of the data 
schema and nulls the rest of the columns;
+    // physicalMask additionally tells the rewrite where struct sub-fields 
landed when Hive's
+    // Parquet reader compacted nested-column projection.
+    return new CloseableMappingIterator<>(recordIterator,
+        record -> 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(record, 
modifiedDataSchema, requiredSchema, Collections.emptyMap(), physicalMask));
   }
 
   @Override
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
index 8971220d1330..c89d37f1c306 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.hadoop;
 
+import org.apache.hudi.common.schema.HoodieProjectionMask;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -37,7 +41,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -59,6 +66,13 @@ public class HoodieColumnProjectionUtils {
    * the column a's path is c.a and b's path is c.b
    */
   public static final String READ_COLUMN_NAMES_CONF_STR = 
"hive.io.file.readcolumn.names";
+  /**
+   * Hive's Parquet reader (DataWritableReadSupport) reads this conf to 
compact nested
+   * struct projection. ProjectionPusher in Hive's FetchOperator sets it 
before invoking
+   * the record reader; Hudi only needs to consume it. Format: comma-separated 
dotted
+   * paths from root to leaf, e.g. {@code 
blob_data.reference,blob_data.reference.external_path}.
+   */
+  public static final String READ_NESTED_COLUMN_PATH_CONF_STR = 
"hive.io.file.readNestedColumn.paths";
   private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
   private static final String READ_COLUMN_NAMES_CONF_STR_DEFAULT = "";
 
@@ -165,4 +179,89 @@ public class HoodieColumnProjectionUtils {
         return false;
     }
   }
+
+  /**
+   * Build a {@link HoodieProjectionMask} reflecting Hive's nested-column 
projection for
+   * {@code dataSchema}. Hive's Parquet reader pads non-projected top-level 
columns with
+   * nulls (canonical layout) but compacts non-projected sub-fields of struct 
columns.
+   * The returned mask preserves canonical positions at the row level and 
supplies a
+   * compacted descent mask for each top-level column whose interior was 
pruned.
+   *
+   * <p>Returns {@link HoodieProjectionMask#all()} when the conf is empty or 
no struct
+   * column has a sub-field projection.
+   */
+  public static HoodieProjectionMask buildNestedProjectionMask(Configuration 
conf, HoodieSchema dataSchema) {
+    String paths = conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, "");
+    if (paths.isEmpty()) {
+      return HoodieProjectionMask.all();
+    }
+    // Group nested paths by their top-level column. Top-level-only paths 
(single
+    // component, e.g. "blob_data") are ignored here — Hive does not compact 
at the
+    // top level, so canonical positions still apply.
+    Map<String, List<List<String>>> pathsByField = new LinkedHashMap<>();
+    for (String path : paths.split(",")) {
+      String trimmed = path.trim();
+      if (trimmed.isEmpty()) {
+        continue;
+      }
+      String[] components = trimmed.split("\\.");
+      if (components.length < 2) {
+        continue;
+      }
+      List<String> tail = new ArrayList<>(components.length - 1);
+      for (int i = 1; i < components.length; i++) {
+        tail.add(components[i].toLowerCase(Locale.ROOT));
+      }
+      String head = components[0].toLowerCase(Locale.ROOT);
+      pathsByField.computeIfAbsent(head, k -> new ArrayList<>()).add(tail);
+    }
+    if (pathsByField.isEmpty()) {
+      return HoodieProjectionMask.all();
+    }
+    HoodieSchema rowSchema = dataSchema.getNonNullType();
+    Map<String, HoodieProjectionMask> childMasks = new LinkedHashMap<>();
+    for (HoodieSchemaField topField : rowSchema.getFields()) {
+      String key = topField.name().toLowerCase(Locale.ROOT);
+      List<List<String>> nestedPaths = pathsByField.get(key);
+      if (nestedPaths == null) {
+        continue;
+      }
+      HoodieProjectionMask childMask = buildMaskForRecord(topField.schema(), 
nestedPaths);
+      if (!childMask.isAll()) {
+        childMasks.put(topField.name(), childMask);
+      }
+    }
+    return HoodieProjectionMask.canonicalWith(childMasks);
+  }
+
+  private static HoodieProjectionMask buildMaskForRecord(HoodieSchema schema, 
List<List<String>> paths) {
+    HoodieSchema recordSchema = schema.getNonNullType();
+    HoodieSchemaType type = recordSchema.getType();
+    if (type != HoodieSchemaType.RECORD && type != HoodieSchemaType.BLOB && 
type != HoodieSchemaType.VARIANT) {
+      return HoodieProjectionMask.all();
+    }
+    // Group remaining components by first-level field name; lower-case to 
match
+    // Hive's lowercased column names.
+    Map<String, List<List<String>>> pathsByField = new LinkedHashMap<>();
+    for (List<String> path : paths) {
+      if (path.isEmpty()) {
+        // Whole sub-record projected as-is — no compaction below this point.
+        return HoodieProjectionMask.all();
+      }
+      String head = path.get(0);
+      List<String> tail = path.subList(1, path.size());
+      pathsByField.computeIfAbsent(head, k -> new ArrayList<>()).add(tail);
+    }
+    HoodieProjectionMask.Builder builder = HoodieProjectionMask.builder();
+    for (HoodieSchemaField field : recordSchema.getFields()) {
+      String key = field.name().toLowerCase(Locale.ROOT);
+      List<List<String>> childPaths = pathsByField.get(key);
+      if (childPaths == null) {
+        continue;
+      }
+      HoodieProjectionMask childMask = buildMaskForRecord(field.schema(), 
childPaths);
+      builder.field(field.name(), childMask);
+    }
+    return builder.build();
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
index a468de4f52a5..365f7a44cc3a 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.hadoop.utils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
 import org.apache.hudi.common.schema.HoodieSchemaField;
@@ -63,22 +64,40 @@ import static 
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 public class HoodieArrayWritableSchemaUtils {
 
   public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable 
writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> 
renameCols) {
-    return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, 
newSchema, renameCols, new LinkedList<>());
+    return rewriteRecordWithNewSchema(writable, oldSchema, newSchema, 
renameCols, HoodieProjectionMask.all());
   }
 
-  private static Writable rewriteRecordWithNewSchema(Writable writable, 
HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> renameCols, 
Deque<String> fieldNames) {
+  /**
+   * Rewrite variant aware of an upstream reader having compacted nested 
struct projection.
+   *
+   * <p>{@code physicalMask} describes the actual ArrayWritable layout per 
record level:
+   * which sub-fields are present and at what physical index. Pass {@link 
HoodieProjectionMask#all()}
+   * when the input is canonical-shaped — the rewrite then reduces to 
position-by-canonical-pos
+   * access exactly like the legacy 4-arg overload.
+   */
+  public static ArrayWritable rewriteRecordWithNewSchema(ArrayWritable 
writable, HoodieSchema oldSchema, HoodieSchema newSchema,
+                                                          Map<String, String> 
renameCols, HoodieProjectionMask physicalMask) {
+    return (ArrayWritable) rewriteRecordWithNewSchema(writable, oldSchema, 
newSchema, renameCols, new LinkedList<>(), physicalMask);
+  }
+
+  private static Writable rewriteRecordWithNewSchema(Writable writable, 
HoodieSchema oldSchema, HoodieSchema newSchema,
+                                                     Map<String, String> 
renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) {
     if (writable == null) {
       return null;
     }
     HoodieSchema oldSchemaNonNull = oldSchema.getNonNullType();
     HoodieSchema newSchemaNonNull = newSchema.getNonNullType();
-    if 
(HoodieSchemaCompatibility.areSchemasProjectionEquivalent(oldSchemaNonNull, 
newSchemaNonNull)) {
+    // Only short-circuit when the input is in canonical shape; an 
upstream-projected
+    // ArrayWritable needs the recursive rewrite to remap slots, even when the 
schema
+    // pair would otherwise be equivalent.
+    if (physicalMask.isAll() && 
HoodieSchemaCompatibility.areSchemasProjectionEquivalent(oldSchemaNonNull, 
newSchemaNonNull)) {
       return writable;
     }
-    return rewriteRecordWithNewSchemaInternal(writable, oldSchemaNonNull, 
newSchemaNonNull, renameCols, fieldNames);
+    return rewriteRecordWithNewSchemaInternal(writable, oldSchemaNonNull, 
newSchemaNonNull, renameCols, fieldNames, physicalMask);
   }
 
-  private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> 
renameCols, Deque<String> fieldNames) {
+  private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, HoodieSchema oldSchema, HoodieSchema newSchema,
+                                                             Map<String, 
String> renameCols, Deque<String> fieldNames, HoodieProjectionMask 
physicalMask) {
     switch (newSchema.getType()) {
       // BLOB/VARIANT are physically records; share the RECORD field-by-name 
rewrite.
       case BLOB:
@@ -87,55 +106,10 @@ public class HoodieArrayWritableSchemaUtils {
         if (!(writable instanceof ArrayWritable)) {
           throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a record", writable.getClass().getName()));
         }
-
-        ArrayWritable arrayWritable = (ArrayWritable) writable;
-        List<HoodieSchemaField> fields = newSchema.getFields();
-        // projection will keep the size from the "from" schema because it 
gets recycled
-        // and if the size changes the reader will fail
-        boolean noFieldsRenaming = renameCols.isEmpty();
-        String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
-        Writable[] values = new Writable[Math.max(fields.size(), 
arrayWritable.get().length)];
-        for (int i = 0; i < fields.size(); i++) {
-          HoodieSchemaField newField = newSchema.getFields().get(i);
-          String newFieldName = newField.name();
-          fieldNames.push(newFieldName);
-          Option<HoodieSchemaField> oldFieldOpt = noFieldsRenaming
-              ? oldSchema.getField(newFieldName)
-              : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix, 
newFieldName, renameCols));
-          if (oldFieldOpt.isPresent()) {
-            HoodieSchemaField oldField = oldFieldOpt.get();
-            values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], 
oldField.schema(), newField.schema(), renameCols, fieldNames);
-          } else if (newField.defaultVal().isPresent() && 
newField.defaultVal().get().equals(HoodieSchema.NULL_VALUE)) {
-            values[i] = NullWritable.get();
-          } else if (!newField.schema().isNullable() && 
newField.defaultVal().isEmpty()) {
-            throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value and is non-nullable");
-          } else if (newField.defaultVal().isPresent()) {
-            switch (newField.getNonNullSchema().getType()) {
-              case BOOLEAN:
-                values[i] = new BooleanWritable((Boolean) 
newField.defaultVal().get());
-                break;
-              case INT:
-                values[i] = new IntWritable((Integer) 
newField.defaultVal().get());
-                break;
-              case LONG:
-                values[i] = new LongWritable((Long) 
newField.defaultVal().get());
-                break;
-              case FLOAT:
-                values[i] = new FloatWritable((Float) 
newField.defaultVal().get());
-                break;
-              case DOUBLE:
-                values[i] = new DoubleWritable((Double) 
newField.defaultVal().get());
-                break;
-              case STRING:
-                values[i] = new Text(newField.defaultVal().toString());
-                break;
-              default:
-                throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value");
-            }
-          }
-          fieldNames.pop();
+        if (physicalMask.isCanonicalAtThisLevel()) {
+          return rewriteCanonicalRecord((ArrayWritable) writable, oldSchema, 
newSchema, renameCols, fieldNames, physicalMask);
         }
-        return new ArrayWritable(Writable.class, values);
+        return rewriteCompactedRecord((ArrayWritable) writable, oldSchema, 
newSchema, renameCols, fieldNames, physicalMask);
 
       case ENUM:
         if ((writable instanceof BytesWritable)) {
@@ -155,7 +129,7 @@ public class HoodieArrayWritableSchemaUtils {
         ArrayWritable array = (ArrayWritable) writable;
         fieldNames.push("element");
         for (int i = 0; i < array.get().length; i++) {
-          array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames);
+          array.get()[i] = rewriteRecordWithNewSchema(array.get()[i], 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames, 
HoodieProjectionMask.all());
         }
         fieldNames.pop();
         return array;
@@ -167,7 +141,8 @@ public class HoodieArrayWritableSchemaUtils {
         fieldNames.push("value");
         for (int i = 0; i < map.get().length; i++) {
           Writable mapEntry = map.get()[i];
-          ((ArrayWritable) mapEntry).get()[1] = 
rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], 
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames);
+          ((ArrayWritable) mapEntry).get()[1] =
+              rewriteRecordWithNewSchema(((ArrayWritable) mapEntry).get()[1], 
oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames, 
HoodieProjectionMask.all());
         }
         return map;
 
@@ -179,6 +154,100 @@ public class HoodieArrayWritableSchemaUtils {
     }
   }
 
+  /**
+   * Canonical record rewrite: input ArrayWritable matches {@code oldSchema}'s 
field
+   * positions (with possibly trailing nulls); output is sized to
+   * {@code max(newSchema field count, input array length)} so any trailing 
slots Hive
+   * left in a recycled row buffer are preserved, while schema-evolution 
semantics still
+   * apply to the leading {@code newSchema}-sized region (default values, null 
padding,
+   * non-nullable-without-default throws).
+   */
+  private static Writable rewriteCanonicalRecord(ArrayWritable arrayWritable, 
HoodieSchema oldSchema, HoodieSchema newSchema,
+                                                 Map<String, String> 
renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) {
+    List<HoodieSchemaField> fields = newSchema.getFields();
+    boolean noFieldsRenaming = renameCols.isEmpty();
+    String namePrefix = createNamePrefix(noFieldsRenaming, fieldNames);
+    Writable[] values = new Writable[Math.max(fields.size(), 
arrayWritable.get().length)];
+    for (int i = 0; i < fields.size(); i++) {
+      HoodieSchemaField newField = fields.get(i);
+      String newFieldName = newField.name();
+      fieldNames.push(newFieldName);
+      Option<HoodieSchemaField> oldFieldOpt = noFieldsRenaming
+          ? oldSchema.getField(newFieldName)
+          : oldSchema.getField(getOldFieldNameWithRenaming(namePrefix, 
newFieldName, renameCols));
+
+      // Bounds-check because some upstream readers may still hand back arrays 
shorter
+      // than the schema declares (no projection mask supplied).
+      if (oldFieldOpt.isPresent() && oldFieldOpt.get().pos() < 
arrayWritable.get().length) {
+        HoodieSchemaField oldField = oldFieldOpt.get();
+        HoodieProjectionMask childMask = 
physicalMask.childOrAll(oldField.name());
+        values[i] = 
rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], 
oldField.schema(), newField.schema(), renameCols, fieldNames, childMask);
+      } else if (newField.defaultVal().isPresent() && 
newField.defaultVal().get().equals(HoodieSchema.NULL_VALUE)) {
+        values[i] = NullWritable.get();
+      } else if (!newField.schema().isNullable() && 
newField.defaultVal().isEmpty()) {
+        throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value and is non-nullable");
+      } else if (newField.defaultVal().isPresent()) {
+        switch (newField.getNonNullSchema().getType()) {
+          case BOOLEAN:
+            values[i] = new BooleanWritable((Boolean) 
newField.defaultVal().get());
+            break;
+          case INT:
+            values[i] = new IntWritable((Integer) newField.defaultVal().get());
+            break;
+          case LONG:
+            values[i] = new LongWritable((Long) newField.defaultVal().get());
+            break;
+          case FLOAT:
+            values[i] = new FloatWritable((Float) newField.defaultVal().get());
+            break;
+          case DOUBLE:
+            values[i] = new DoubleWritable((Double) 
newField.defaultVal().get());
+            break;
+          case STRING:
+            values[i] = new Text(newField.defaultVal().toString());
+            break;
+          default:
+            throw new SchemaCompatibilityException("Field " + 
createFullName(fieldNames) + " has no default value");
+        }
+      }
+      fieldNames.pop();
+    }
+    return new ArrayWritable(Writable.class, values);
+  }
+
+  /**
+   * Compacted record rewrite: input ArrayWritable matches the upstream 
reader's
+   * projected schema (only the projected sub-fields, in declared order). The 
output
+   * preserves the same physical layout — Hive's downstream {@code 
ArrayWritableObjectInspector}
+   * was constructed from the projected schema and expects compacted positions.
+   * Per-element schema conversion still applies (e.g. plain STRING → 
canonical ENUM).
+   */
+  private static Writable rewriteCompactedRecord(ArrayWritable arrayWritable, 
HoodieSchema oldSchema, HoodieSchema newSchema,
+                                                 Map<String, String> 
renameCols, Deque<String> fieldNames, HoodieProjectionMask physicalMask) {
+    Writable[] inputs = arrayWritable.get();
+    Writable[] values = new Writable[inputs.length];
+    List<String> order = physicalMask.physicalOrder();
+    for (int physIdx = 0; physIdx < order.size(); physIdx++) {
+      if (physIdx >= inputs.length) {
+        // Mask claims a field at a position the reader didn't fill; defensive 
— leave null.
+        continue;
+      }
+      String physicalFieldName = order.get(physIdx);
+      Option<HoodieSchemaField> oldFieldOpt = 
oldSchema.getField(physicalFieldName);
+      Option<HoodieSchemaField> newFieldOpt = 
newSchema.getField(physicalFieldName);
+      if (oldFieldOpt.isEmpty() || newFieldOpt.isEmpty()) {
+        // The reader returned a sub-field neither side knows about — pass 
through.
+        values[physIdx] = inputs[physIdx];
+        continue;
+      }
+      fieldNames.push(physicalFieldName);
+      HoodieProjectionMask childMask = 
physicalMask.childOrAll(physicalFieldName);
+      values[physIdx] = rewriteRecordWithNewSchema(inputs[physIdx], 
oldFieldOpt.get().schema(), newFieldOpt.get().schema(), renameCols, fieldNames, 
childMask);
+      fieldNames.pop();
+    }
+    return new ArrayWritable(Writable.class, values);
+  }
+
   public static Writable rewritePrimaryType(Writable writable, HoodieSchema 
oldSchema, HoodieSchema newSchema) {
     if (oldSchema.getType() == newSchema.getType()) {
       switch (oldSchema.getType()) {
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
index e7467acef5d0..fcf5ac44efba 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java
@@ -18,10 +18,21 @@
 
 package org.apache.hudi.hadoop;
 
+import org.apache.hudi.common.schema.HoodieProjectionMask;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaTestUtils;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.OptionalInt;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -73,4 +84,85 @@ public class TestHoodieColumnProjectionUtils {
     TypeInfo typeInfo11 = 
TypeInfoUtils.getTypeInfosFromTypeString(col11).get(0);
     assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo11));
   }
+
+  @Test
+  void testBuildNestedProjectionMaskEmptyConfYieldsAll() {
+    HoodieSchema row = rowSchemaWithBlobColumn();
+    Configuration conf = new Configuration();
+    HoodieProjectionMask mask = 
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+    assertTrue(mask.isAll());
+  }
+
+  @Test
+  void testBuildNestedProjectionMaskTopLevelOnlyYieldsAll() {
+    // Top-level pruning never compacts; only nested paths produce a mask.
+    HoodieSchema row = rowSchemaWithBlobColumn();
+    Configuration conf = new Configuration();
+    conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, 
"blob_data,id");
+    HoodieProjectionMask mask = 
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+    assertTrue(mask.isAll());
+  }
+
+  @Test
+  void testBuildNestedProjectionMaskForBlobReferenceProjection() {
+    HoodieSchema row = rowSchemaWithBlobColumn();
+    Configuration conf = new Configuration();
+    conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, 
"blob_data.reference");
+    HoodieProjectionMask mask = 
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+    assertFalse(mask.isAll());
+    assertTrue(mask.isCanonicalAtThisLevel());
+    HoodieProjectionMask blobChild = mask.childOrAll("blob_data");
+    assertFalse(blobChild.isAll());
+    assertFalse(blobChild.isCanonicalAtThisLevel());
+    assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("reference"));
+    assertEquals(OptionalInt.empty(), blobChild.physicalIndexOf("type"));
+    assertEquals(OptionalInt.empty(), blobChild.physicalIndexOf("data"));
+    assertTrue(blobChild.childOrAll("reference").isAll());
+  }
+
+  @Test
+  void testBuildNestedProjectionMaskForExternalPathProjection() {
+    HoodieSchema row = rowSchemaWithBlobColumn();
+    Configuration conf = new Configuration();
+    conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, 
"blob_data.reference.external_path");
+    HoodieProjectionMask mask = 
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+    HoodieProjectionMask blobChild = mask.childOrAll("blob_data");
+    assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("reference"));
+    HoodieProjectionMask refChild = blobChild.childOrAll("reference");
+    assertFalse(refChild.isAll());
+    assertEquals(OptionalInt.of(0), refChild.physicalIndexOf("external_path"));
+    assertEquals(OptionalInt.empty(), refChild.physicalIndexOf("offset"));
+  }
+
+  @Test
+  void testBuildNestedProjectionMaskOrdersByDeclaredSchema() {
+    // Physical order follows the schema, not the conf-path order.
+    HoodieSchema row = rowSchemaWithBlobColumn();
+    Configuration conf = new Configuration();
+    conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, 
"blob_data.reference,blob_data.type");
+    HoodieProjectionMask mask = 
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+    HoodieProjectionMask blobChild = mask.childOrAll("blob_data");
+    assertEquals(OptionalInt.of(0), blobChild.physicalIndexOf("type"));
+    assertEquals(OptionalInt.of(1), blobChild.physicalIndexOf("reference"));
+  }
+
+  @Test
+  void testBuildNestedProjectionMaskCaseInsensitive() {
+    HoodieSchema row = rowSchemaWithBlobColumn();
+    Configuration conf = new Configuration();
+    conf.set(HoodieColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR, 
"BLOB_DATA.Reference.External_Path");
+    HoodieProjectionMask mask = 
HoodieColumnProjectionUtils.buildNestedProjectionMask(conf, row);
+
+    assertFalse(mask.childOrAll("blob_data").isAll());
+    assertEquals(OptionalInt.of(0), 
mask.childOrAll("blob_data").physicalIndexOf("reference"));
+  }
+
+  private static HoodieSchema rowSchemaWithBlobColumn() {
+    HoodieSchemaField idField = HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.LONG), null, null);
+    HoodieSchemaField blobField = HoodieSchemaField.of("blob_data", 
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"), null, null);
+    return HoodieSchema.createRecord("test_row", null, null, false, 
Arrays.asList(idField, blobField));
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
index 16188b520074..ff5bdbc18694 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.utils;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.schema.HoodieProjectionMask;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaTestUtils;
 import org.apache.hudi.common.schema.HoodieSchemaType;
@@ -334,6 +335,107 @@ public class TestHoodieArrayWritableSchemaUtils {
     assertSame(bytes, rewritten);
   }
 
+  @Test
+  void testRewriteBlobWithPrunedArrayWritableFillsMissingFieldsWithNull() {
+    HoodieSchema oldSchema = 
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+    HoodieSchema newSchema = HoodieSchema.createBlob();
+    ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new 
Writable[] {
+        new Text("INLINE")
+    });
+
+    ArrayWritable rewritten = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        prunedRecord, oldSchema, newSchema, Collections.emptyMap());
+
+    assertEquals(3, rewritten.get().length);
+    assertInstanceOf(BytesWritable.class, rewritten.get()[0]);
+    assertEquals("INLINE", new String(((BytesWritable) 
rewritten.get()[0]).copyBytes()));
+    assertEquals(NullWritable.get(), rewritten.get()[1]);
+    assertEquals(NullWritable.get(), rewritten.get()[2]);
+  }
+
+  @Test
+  void testRewriteBlobWithPrunedReferenceProjection() {
+    // SELECT blob_data.reference: compacted-shape input must round-trip 
unchanged so
+    // Hive's projected-schema ObjectInspector finds reference at slot 0.
+    HoodieSchema oldSchema = 
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+    HoodieSchema newSchema = HoodieSchema.createBlob();
+    ArrayWritable referenceSubstruct = new ArrayWritable(Writable.class, new 
Writable[] {
+        new Text("blobs/updated-1"),
+        new LongWritable(0L),
+        new LongWritable(11L),
+        new BooleanWritable(false)
+    });
+    ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new 
Writable[] {
+        referenceSubstruct
+    });
+    HoodieProjectionMask mask = 
HoodieProjectionMask.builder().field("reference").build();
+
+    ArrayWritable rewritten = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        prunedRecord, oldSchema, newSchema, Collections.emptyMap(), mask);
+
+    assertEquals(1, rewritten.get().length);
+    ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[0];
+    assertEquals(4, rewrittenRef.get().length);
+    assertEquals(new Text("blobs/updated-1"), rewrittenRef.get()[0]);
+    assertEquals(new LongWritable(0L), rewrittenRef.get()[1]);
+    assertEquals(new LongWritable(11L), rewrittenRef.get()[2]);
+    assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]);
+  }
+
+  @Test
+  void testRewriteBlobWithPrunedReferenceExternalPathProjection() {
+    // SELECT blob_data.reference.external_path — reproducer for the CCE at 
:149.
+    HoodieSchema oldSchema = 
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+    HoodieSchema newSchema = HoodieSchema.createBlob();
+    ArrayWritable prunedReference = new ArrayWritable(Writable.class, new 
Writable[] {
+        new Text("blobs/updated-1")
+    });
+    ArrayWritable prunedRecord = new ArrayWritable(Writable.class, new 
Writable[] {
+        prunedReference
+    });
+    HoodieProjectionMask mask = HoodieProjectionMask.builder()
+        .field("reference", 
HoodieProjectionMask.builder().field("external_path").build())
+        .build();
+
+    ArrayWritable rewritten = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        prunedRecord, oldSchema, newSchema, Collections.emptyMap(), mask);
+
+    assertEquals(1, rewritten.get().length);
+    ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[0];
+    assertEquals(1, rewrittenRef.get().length);
+    assertEquals(new Text("blobs/updated-1"), rewrittenRef.get()[0]);
+  }
+
+  @Test
+  void testRewriteBlobWithCanonicalShapeStillWorksAfterMaskWiring() {
+    // Regression guard: mask=all() must keep the legacy canonical-shape 
behavior.
+    HoodieSchema oldSchema = 
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+    HoodieSchema newSchema = HoodieSchema.createBlob();
+    ArrayWritable referenceSubstruct = new ArrayWritable(Writable.class, new 
Writable[] {
+        new Text("blobs/path-1"),
+        new LongWritable(0L),
+        new LongWritable(11L),
+        new BooleanWritable(false)
+    });
+    ArrayWritable record = new ArrayWritable(Writable.class, new Writable[] {
+        new Text("OUT_OF_LINE"),
+        NullWritable.get(),
+        referenceSubstruct
+    });
+
+    ArrayWritable rewritten = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        record, oldSchema, newSchema, Collections.emptyMap(), 
HoodieProjectionMask.all());
+
+    assertInstanceOf(BytesWritable.class, rewritten.get()[0]);
+    assertEquals("OUT_OF_LINE", new String(((BytesWritable) 
rewritten.get()[0]).copyBytes()));
+    assertEquals(NullWritable.get(), rewritten.get()[1]);
+    ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[2];
+    assertEquals(new Text("blobs/path-1"), rewrittenRef.get()[0]);
+    assertEquals(new LongWritable(0L), rewrittenRef.get()[1]);
+    assertEquals(new LongWritable(11L), rewrittenRef.get()[2]);
+    assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]);
+  }
+
   private void validateRewriteWithAvro(
       Writable oldWritable,
       HoodieSchema oldSchema,
@@ -452,9 +554,9 @@ public class TestHoodieArrayWritableSchemaUtils {
   void testRewritePlainVariantRecordToCanonicalVariantSchema() {
     HoodieSchema oldSchema = 
HoodieSchemaTestUtils.createPlainVariantRecord("variant_data");
     HoodieSchema newSchema = HoodieSchema.createVariant();
-    BytesWritable metadata = new BytesWritable(new byte[]{1, 2, 3});
-    BytesWritable value = new BytesWritable(new byte[]{4, 5, 6});
-    ArrayWritable record = new ArrayWritable(Writable.class, new 
Writable[]{metadata, value});
+    BytesWritable metadata = new BytesWritable(new byte[] {1, 2, 3});
+    BytesWritable value = new BytesWritable(new byte[] {4, 5, 6});
+    ArrayWritable record = new ArrayWritable(Writable.class, new Writable[] 
{metadata, value});
 
     ArrayWritable rewritten = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
         record, oldSchema, newSchema, Collections.emptyMap());


Reply via email to