This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ca4c40bc5a7 fix: Handle nested map and array columns in MDT (#17694)
6ca4c40bc5a7 is described below

commit 6ca4c40bc5a74d42fe844db0557ece598b5fdb5b
Author: Vinish Reddy <[email protected]>
AuthorDate: Sat Jan 24 01:51:20 2026 +0530

    fix: Handle nested map and array columns in MDT (#17694)
    
    * Handle nested map and array columns in MDT
    
    * Address comments from self review
    
    * Address comments and Add functional tests for nested fields
    
    * Handle nested arrary and map fields in HoodieSchemaUtils.scala as well
    
    * Remove duplicate code by moving  nested schema handling to HoodieSchema, 
standarize to list.element for nested array columns
    
    * Address comments - 2
    
    * Address comments - Tim
---
 .../hudi/common/config/HoodieMetadataConfig.java   |   7 +-
 .../apache/hudi/common/schema/HoodieSchema.java    | 176 ++++++++++
 .../hudi/common/schema/HoodieSchemaUtils.java      |  44 +--
 .../hudi/common/schema/TestHoodieSchemaUtils.java  | 200 +++++++++++
 .../org/apache/hudi/common/util/ParquetUtils.java  |  41 ++-
 .../scala/org/apache/hudi/HoodieSchemaUtils.scala  |  43 ++-
 .../hudi/schema/TestHoodieSparkSchemaUtils.scala   | 132 +++++--
 .../hudi/functional/TestColumnStatsIndex.scala     | 379 ++++++++++++++++++++-
 8 files changed, 931 insertions(+), 91 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index ec688f7768d6..d4bc9dca8fc4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -219,7 +219,12 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .noDefaultValue()
       .markAdvanced()
       .sinceVersion("0.11.0")
-      .withDocumentation("Comma-separated list of columns for which column 
stats index will be built. If not set, all columns will be indexed");
+      .withDocumentation("Comma-separated list of columns for which column 
stats index will be built. "
+          + "If not set, all columns will be indexed. "
+          + "For nested fields within ARRAY types, use: field.list.element "
+          + "(e.g., items.list.element or items.list.element.price). "
+          + "For nested fields within MAP types, use: field.key_value.key for 
keys or field.key_value.value for values "
+          + "(e.g., metadata.key_value.key, metadata.key_value.value, or 
metadata.key_value.value.nested_field).");
 
   public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_MAX_COLUMNS = 
ConfigProperty
       .key(METADATA_PREFIX + ".index.column.stats.max.columns.to.index")
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index a8222670df1a..e13fdc17179e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -28,6 +28,8 @@ import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 
+import org.apache.hudi.common.util.collection.Pair;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -86,6 +88,25 @@ public class HoodieSchema implements Serializable {
   public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
   public static final HoodieSchema NULL_SCHEMA = 
HoodieSchema.create(HoodieSchemaType.NULL);
   private static final long serialVersionUID = 1L;
+
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private static final String ARRAY_LIST = "list";
+  private static final String ARRAY_ELEMENT = "element";
+  private static final String MAP_KEY_VALUE = "key_value";
+  private static final String MAP_KEY = "key";
+  private static final String MAP_VALUE = "value";
+
+  private static final String ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + 
ARRAY_ELEMENT;
+  private static final String MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + 
MAP_KEY;
+  private static final String MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + 
MAP_VALUE;
+
+  public static final String PARQUET_ARRAY_SPARK = ".array";
+  public static final String PARQUET_ARRAY_AVRO = "." + ARRAY_LIST_ELEMENT;
+
   private Schema avroSchema;
   private HoodieSchemaType type;
 
@@ -754,6 +775,161 @@ public class HoodieSchema implements Serializable {
     return types.get(0).getType() != HoodieSchemaType.NULL ? types.get(0) : 
types.get(1);
   }
 
+  /**
+   * Gets a nested field using dot notation, supporting Parquet-style 
array/map accessors.
+   *
+   * <p>Supports nested field access using dot notation including MAP and 
ARRAY types
+   * using Parquet-style accessor patterns:</p>
+   *
+   * <ul>
+   *   <li><b>RECORD types:</b> Standard dot notation (e.g., {@code 
"user.profile.name"})</li>
+   *   <li><b>ARRAY types:</b> Use {@code ".list.element"} to access array 
elements
+   *       <ul>
+   *         <li>Example: {@code "items.list.element"} accesses element schema 
of array</li>
+   *         <li>Example: {@code "items.list.element.id"} accesses nested 
field within array elements</li>
+   *       </ul>
+   *   </li>
+   *   <li><b>MAP types:</b> Use {@code ".key_value.key"} or {@code 
".key_value.value"} to access map components
+   *       <ul>
+   *         <li>Example: {@code "metadata.key_value.key"} accesses map keys 
(always STRING)</li>
+   *         <li>Example: {@code "metadata.key_value.value"} accesses map 
value schema</li>
+   *         <li>Example: {@code "nested_map.key_value.value.field"} accesses 
nested field within map values</li>
+   *       </ul>
+   *   </li>
+   * </ul>
+   *
+   * <p>Note: Spark Parquet files may use {@code ".array"} format instead of 
{@code ".list.element"}.
+   * This translation is handled at the Parquet reading level in ParquetUtils, 
not here.</p>
+   *
+   * @param fieldName Field path (e.g., "user.profile.name", 
"items.list.element", "metadata.key_value.value")
+   * @return Option containing a pair of canonical field name and the 
HoodieSchemaField, or Option.empty() if not found
+   */
+  public Option<Pair<String, HoodieSchemaField>> getNestedField(String 
fieldName) {
+    ValidationUtils.checkArgument(fieldName != null && !fieldName.isEmpty(), 
"Field name cannot be null or empty");
+    return getNestedFieldInternal(this, fieldName, 0, "");
+  }
+
+  /**
+   * Internal helper method for recursively retrieving nested fields using 
offset-based navigation.
+   *
+   * @param schema   the current schema to search in
+   * @param fullPath the full field path string
+   * @param offset   current position in fullPath
+   * @param prefix   the accumulated field path prefix
+   * @return Option containing a pair of canonical field name and the 
HoodieSchemaField, or Option.empty() if field not found
+   */
+  private static Option<Pair<String, HoodieSchemaField>> 
getNestedFieldInternal(
+      HoodieSchema schema, String fullPath, int offset, String prefix) {
+    HoodieSchema nonNullableSchema = schema.getNonNullType();
+    int nextDot = fullPath.indexOf('.', offset);
+    // Terminal case: no more dots in this segment
+    if (nextDot == -1) {
+      String fieldName = fullPath.substring(offset);
+      // Handle RECORD terminal case
+      if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) {
+        return Option.empty();
+      }
+      return nonNullableSchema.getField(fieldName)
+          .map(field -> Pair.of(prefix + fieldName, field));
+    }
+    // Recursive case: more nesting to explore
+    String rootFieldName = fullPath.substring(offset, nextDot);
+    int nextOffset = nextDot + 1;
+    // Handle RECORD: standard field navigation
+    if (nonNullableSchema.getType() == HoodieSchemaType.RECORD) {
+      return nonNullableSchema.getField(rootFieldName)
+          .flatMap(field -> getNestedFieldInternal(field.schema(), fullPath, 
nextOffset, prefix + rootFieldName + "."));
+    }
+    // Handle ARRAY: expect ".list.element"
+    if (nonNullableSchema.getType() == HoodieSchemaType.ARRAY && 
ARRAY_LIST.equals(rootFieldName)) {
+      return handleArrayNavigation(nonNullableSchema, fullPath, nextOffset, 
prefix);
+    }
+    // Handle MAP: expect ".key_value.key" or ".key_value.value"
+    if (nonNullableSchema.getType() == HoodieSchemaType.MAP && 
MAP_KEY_VALUE.equals(rootFieldName)) {
+      return handleMapNavigation(nonNullableSchema, fullPath, nextOffset, 
prefix);
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Handles navigation into ARRAY types using the ".list.element" pattern.
+   *
+   * @param arraySchema the ARRAY schema to navigate into
+   * @param fullPath    the full field path string
+   * @param offset      current position in fullPath (should point to 
"element")
+   * @param prefix      the accumulated field path prefix
+   * @return Option containing the nested field, or Option.empty() if invalid 
path
+   */
+  private static Option<Pair<String, HoodieSchemaField>> handleArrayNavigation(
+      HoodieSchema arraySchema, String fullPath, int offset, String prefix) {
+    int nextPos = getNextOffset(fullPath, offset, ARRAY_ELEMENT);
+    if (nextPos == -1) {
+      return Option.empty();
+    }
+
+    HoodieSchema elementSchema = arraySchema.getElementType();
+    if (nextPos == fullPath.length()) {
+      return Option.of(Pair.of(prefix + ARRAY_LIST_ELEMENT,
+          HoodieSchemaField.of(ARRAY_ELEMENT, elementSchema, null, null)));
+    }
+    return getNestedFieldInternal(elementSchema, fullPath, nextPos, prefix + 
ARRAY_LIST_ELEMENT + ".");
+  }
+
+  /**
+   * Handles navigation into MAP types using the Parquet-style 
".key_value.key" or ".key_value.value" patterns.
+   *
+   * @param mapSchema the MAP schema to navigate into
+   * @param fullPath  the full field path string
+   * @param offset    current position in fullPath (should point to "key" or 
"value")
+   * @param prefix    the accumulated field path prefix
+   * @return Option containing the nested field, or Option.empty() if invalid 
path
+   */
+  private static Option<Pair<String, HoodieSchemaField>> handleMapNavigation(
+      HoodieSchema mapSchema, String fullPath, int offset, String prefix) {
+    // Check for "key" path
+    int keyPos = getNextOffset(fullPath, offset, MAP_KEY);
+    if (keyPos != -1) {
+      if (keyPos == fullPath.length()) {
+        return Option.of(Pair.of(prefix + MAP_KEY_VALUE_KEY,
+            HoodieSchemaField.of(MAP_KEY, mapSchema.getKeyType(), null, 
null)));
+      }
+      // Map keys are primitives, cannot navigate further
+      return Option.empty();
+    }
+
+    // Check for "value" path
+    int valuePos = getNextOffset(fullPath, offset, MAP_VALUE);
+    if (valuePos == -1) {
+      return Option.empty();
+    }
+
+    HoodieSchema valueSchema = mapSchema.getValueType();
+    if (valuePos == fullPath.length()) {
+      return Option.of(Pair.of(prefix + MAP_KEY_VALUE_VALUE,
+          HoodieSchemaField.of(MAP_VALUE, valueSchema, null, null)));
+    }
+    return getNestedFieldInternal(valueSchema, fullPath, valuePos, prefix + 
MAP_KEY_VALUE_VALUE + ".");
+  }
+
+  /**
+   * Advances offset past a component name in the path, handling end-of-path 
and dot separator.
+   *
+   * @param path      the full path string
+   * @param offset    current position in path
+   * @param component the component name to match (e.g., "element", "key", 
"value")
+   * @return new offset after component and dot, or path.length() if at end, 
or -1 if no match
+   */
+  private static int getNextOffset(String path, int offset, String component) {
+    if (!path.regionMatches(offset, component, 0, component.length())) {
+      return -1;
+    }
+    int next = offset + component.length();
+    if (next == path.length()) {
+      return next;
+    }
+    return (path.charAt(next) == '.') ? next + 1 : -1;
+  }
+
   /**
    * Returns the underlying Avro schema for compatibility purposes.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index c8244fc32e61..eea2ea6ca723 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -386,12 +386,15 @@ public final class HoodieSchemaUtils {
 
   /**
    * Gets a field (including nested fields) from the schema using dot notation.
-   * This is equivalent to HoodieAvroUtils.getSchemaForField() but operates on 
HoodieSchema.
+   * This method delegates to {@link HoodieSchema#getNestedField(String)}.
    * <p>
    * Supports nested field access using dot notation. For example:
    * <ul>
    *   <li>"name" - retrieves top-level field</li>
    *   <li>"user.profile.displayName" - retrieves nested field</li>
+   *   <li>"items.list.element" - retrieves array element schema </li>
+   *   <li>"metadata.key_value.key" - retrieves map key schema</li>
+   *   <li>"metadata.key_value.value" - retrieves map value schema</li>
    * </ul>
    *
    * @param schema    the schema to search in
@@ -403,44 +406,7 @@ public final class HoodieSchemaUtils {
   public static Option<Pair<String, HoodieSchemaField>> 
getNestedField(HoodieSchema schema, String fieldName) {
     ValidationUtils.checkArgument(schema != null, "Schema cannot be null");
     ValidationUtils.checkArgument(fieldName != null && !fieldName.isEmpty(), 
"Field name cannot be null or empty");
-    return getNestedFieldInternal(schema, fieldName, "");
-  }
-
-  /**
-   * Internal helper method for recursively retrieving nested fields.
-   *
-   * @param schema    the current schema to search in
-   * @param fieldName the remaining field path
-   * @param prefix    the accumulated field path prefix
-   * @return Option containing Pair of canonical field name and the 
HoodieSchemaField, or Option.empty() if field not found
-   */
-  private static Option<Pair<String, HoodieSchemaField>> 
getNestedFieldInternal(HoodieSchema schema, String fieldName, String prefix) {
-    HoodieSchema nonNullableSchema = getNonNullTypeFromUnion(schema);
-
-    if (!fieldName.contains(".")) {
-      // Base case: simple field name
-      if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) {
-        return Option.empty();
-      }
-      return nonNullableSchema.getField(fieldName)
-          .map(field -> Pair.of(prefix + fieldName, field));
-    } else {
-      // Recursive case: nested field
-      if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) {
-        return Option.empty();
-      }
-
-      int dotIndex = fieldName.indexOf(".");
-      String rootFieldName = fieldName.substring(0, dotIndex);
-      String remainingPath = fieldName.substring(dotIndex + 1);
-
-      return nonNullableSchema.getField(rootFieldName)
-          .flatMap(rootField -> getNestedFieldInternal(
-              rootField.schema(),
-              remainingPath,
-              prefix + rootFieldName + "."
-          ));
-    }
+    return schema.getNestedField(fieldName);
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index c35e9e11b78e..851d07e02cb0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -2175,4 +2175,204 @@ public class TestHoodieSchemaUtils {
     assertEquals("timestamp-micros", 
prunedCol0FieldOpt.get().schema().getProp(logicalTypeKey));
     assertEquals("uuid", 
prunedCol1FieldOpt.get().schema().getProp(logicalTypeKey));
   }
+
+  @Test
+  public void testGetNestedFieldWithArrayListElement() {
+    // Create schema with array field
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("items", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)))
+        )
+    );
+
+    // Test: items.list.element should resolve to STRING
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(schema, "items.list.element");
+    assertTrue(result.isPresent());
+    assertEquals("items.list.element", result.get().getLeft());
+    assertEquals("element", result.get().getRight().name());
+    assertEquals(HoodieSchemaType.STRING, 
result.get().getRight().schema().getType());
+  }
+
+  @Test
+  public void testGetNestedFieldWithArrayListElementAndNesting() {
+    // Create schema with array of records
+    HoodieSchema nestedSchema = HoodieSchema.createRecord(
+        "NestedRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("nested_int", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("nested_string", 
HoodieSchema.create(HoodieSchemaType.STRING))
+        )
+    );
+
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("items", 
HoodieSchema.createArray(nestedSchema))
+        )
+    );
+
+    // Test: items.list.element.nested_int should resolve to INT
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(schema, "items.list.element.nested_int");
+    assertTrue(result.isPresent());
+    assertEquals("items.list.element.nested_int", result.get().getLeft());
+    assertEquals("nested_int", result.get().getRight().name());
+    assertEquals(HoodieSchemaType.INT, 
result.get().getRight().schema().getType());
+  }
+
+  @Test
+  public void testGetNestedFieldWithMapKeyValueKey() {
+    // Create schema with map field
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("metadata", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)))
+        )
+    );
+
+    // Test: metadata.key_value.key should resolve to STRING (MAP keys are 
always STRING)
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(schema, "metadata.key_value.key");
+    assertTrue(result.isPresent());
+    assertEquals("metadata.key_value.key", result.get().getLeft());
+    assertEquals("key", result.get().getRight().name());
+    assertEquals(HoodieSchemaType.STRING, 
result.get().getRight().schema().getType());
+  }
+
+  @Test
+  public void testGetNestedFieldWithMapKeyValueValue() {
+    // Create schema with map field
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("metadata", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)))
+        )
+    );
+
+    // Test: metadata.key_value.value should resolve to INT
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(schema, "metadata.key_value.value");
+    assertTrue(result.isPresent());
+    assertEquals("metadata.key_value.value", result.get().getLeft());
+    assertEquals("value", result.get().getRight().name());
+    assertEquals(HoodieSchemaType.INT, 
result.get().getRight().schema().getType());
+  }
+
+  @Test
+  public void testGetNestedFieldWithMapKeyValueValueAndNesting() {
+    // Create schema with map of records (like the original failing case)
+    HoodieSchema nestedSchema = HoodieSchema.createRecord(
+        "NestedRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("nested_int", 
HoodieSchema.create(HoodieSchemaType.INT)),
+            HoodieSchemaField.of("nested_string", 
HoodieSchema.create(HoodieSchemaType.STRING))
+        )
+    );
+
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("nested_map", 
HoodieSchema.createMap(nestedSchema))
+        )
+    );
+
+    // Test: nested_map.key_value.value.nested_int should resolve to INT
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(schema, 
"nested_map.key_value.value.nested_int");
+    assertTrue(result.isPresent());
+    assertEquals("nested_map.key_value.value.nested_int", 
result.get().getLeft());
+    assertEquals("nested_int", result.get().getRight().name());
+    assertEquals(HoodieSchemaType.INT, 
result.get().getRight().schema().getType());
+  }
+
+  @Test
+  public void testGetNestedFieldWithInvalidArrayPath() {
+    // Create schema with array field
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("items", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)))
+        )
+    );
+
+    // Test: Invalid array path should return Option.empty()
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(schema, "items.wrong.path");
+    assertFalse(result.isPresent());
+
+    // Test: Missing "element" should return Option.empty()
+    result = HoodieSchemaUtils.getNestedField(schema, "items.list.missing");
+    assertFalse(result.isPresent());
+  }
+
+  @Test
+  public void testGetNestedFieldWithInvalidMapPath() {
+    // Create schema with map field
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("metadata", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)))
+        )
+    );
+
+    // Test: Invalid map path should return Option.empty()
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(schema, "metadata.wrong.path");
+    assertFalse(result.isPresent());
+
+    // Test: Missing "key" or "value" should return Option.empty()
+    result = HoodieSchemaUtils.getNestedField(schema, 
"metadata.key_value.missing");
+    assertFalse(result.isPresent());
+  }
+
+  @Test
+  public void testGetNestedFieldComplexNestedMapAndArray() {
+    // Create complex schema: record with map of arrays of records
+    HoodieSchema innerRecord = HoodieSchema.createRecord(
+        "InnerRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("value", 
HoodieSchema.create(HoodieSchemaType.LONG))
+        )
+    );
+
+    HoodieSchema arrayOfRecords = HoodieSchema.createArray(innerRecord);
+    HoodieSchema mapOfArrays = HoodieSchema.createMap(arrayOfRecords);
+
+    HoodieSchema schema = HoodieSchema.createRecord(
+        "TestRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("complex_field", mapOfArrays)
+        )
+    );
+
+    // Test: complex_field.key_value.value.list.element.value should resolve 
to LONG
+    Option<Pair<String, HoodieSchemaField>> result = 
HoodieSchemaUtils.getNestedField(
+        schema, "complex_field.key_value.value.list.element.value");
+    assertTrue(result.isPresent());
+    assertEquals("complex_field.key_value.value.list.element.value", 
result.get().getLeft());
+    assertEquals("value", result.get().getRight().name());
+    assertEquals(HoodieSchemaType.LONG, 
result.get().getRight().schema().getType());
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e0fdbe5c849d..678c026e0089 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -294,17 +294,27 @@ public class ParquetUtils extends FileFormatUtils {
   }
 
   public List<HoodieColumnRangeMetadata<Comparable>> 
readColumnStatsFromMetadata(ParquetMetadata metadata, String filePath, 
Option<List<String>> columnList, HoodieIndexVersion indexVersion) {
+    // Create a set of columns to match, including both canonical 
(.list.element) and Parquet (.array) formats
+    // This handles files written by both Spark (uses .array) and Avro (uses 
.list.element) writers
+    Set<String> columnsToMatch = columnList.map(cols -> {
+      Set<String> set = new HashSet<>(cols);
+      cols.forEach(col -> set.add(convertListElementToArray(col)));
+      return set;
+    }).orElse(null);
+
     // Collect stats from all individual Parquet blocks
     Stream<HoodieColumnRangeMetadata<Comparable>> 
hoodieColumnRangeMetadataStream =
         metadata.getBlocks().stream().sequential().flatMap(blockMetaData ->
             blockMetaData.getColumns().stream()
-                .filter(f -> !columnList.isPresent() || 
columnList.get().contains(f.getPath().toDotString()))
+                .filter(f -> columnsToMatch == null || 
columnsToMatch.contains(f.getPath().toDotString()))
                 .map(columnChunkMetaData -> {
                   Statistics stats = columnChunkMetaData.getStatistics();
                   ValueMetadata valueMetadata = 
ValueMetadata.getValueMetadata(columnChunkMetaData.getPrimitiveType(), 
indexVersion);
+                  // Normalize column name to canonical .list.element format 
for consistent MDT storage
+                  String canonicalColumnName = 
convertArrayToListElement(columnChunkMetaData.getPath().toDotString());
                   return (HoodieColumnRangeMetadata<Comparable>) 
HoodieColumnRangeMetadata.<Comparable>create(
                       filePath,
-                      columnChunkMetaData.getPath().toDotString(),
+                      canonicalColumnName,
                       convertToNativeJavaType(
                           columnChunkMetaData.getPrimitiveType(),
                           stats.genericGetMin(),
@@ -549,4 +559,31 @@ public class ParquetUtils extends FileFormatUtils {
       throw new UnsupportedOperationException(String.format("Unsupported value 
type (%s)", val.getClass().getName()));
     }
   }
+
+  /**
+   * Converts Parquet array format (.array) to canonical format 
(.list.element).
+   * Parquet files written by Spark use ".array" while Avro uses 
".list.element".
+   * We normalize to ".list.element" as the canonical format for MDT storage.
+   *
+   * @param columnName the column name from Parquet metadata
+   * @return the column name with .array converted to .list.element
+   */
+  private static String convertArrayToListElement(String columnName) {
+    return columnName
+        .replace(HoodieSchema.PARQUET_ARRAY_SPARK + ".", 
HoodieSchema.PARQUET_ARRAY_AVRO + ".")
+        .replace(HoodieSchema.PARQUET_ARRAY_SPARK, 
HoodieSchema.PARQUET_ARRAY_AVRO);
+  }
+
+  /**
+   * Converts canonical format (.list.element) to Parquet array format 
(.array).
+   * Used to match user-specified columns against Parquet files that use 
".array" format.
+   *
+   * @param columnName the column name in canonical format
+   * @return the column name with .list.element converted to .array
+   */
+  private static String convertListElementToArray(String columnName) {
+    return columnName
+        .replace(HoodieSchema.PARQUET_ARRAY_AVRO + ".", 
HoodieSchema.PARQUET_ARRAY_SPARK + ".")
+        .replace(HoodieSchema.PARQUET_ARRAY_AVRO, 
HoodieSchema.PARQUET_ARRAY_SPARK);
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index 64abe3553cdb..bf3961744ef4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieConfig, TypedPro
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaCompatibility, 
HoodieSchemaUtils => HoodieCommonSchemaUtils}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
 import org.apache.hudi.internal.schema.InternalSchema
@@ -33,7 +33,8 @@ import 
org.apache.hudi.internal.schema.convert.InternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils
 import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchemaRequirements
 
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
 import org.slf4j.LoggerFactory
 
 import java.util.Properties
@@ -46,22 +47,30 @@ import scala.collection.JavaConverters._
 object HoodieSchemaUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
+  /**
+   * Gets a field (including nested fields) from the schema using dot notation.
+   * This method delegates to the consolidated implementation in 
[[HoodieSchema.getNestedField]].
+   *
+   * Supports nested field access using dot notation. For example:
+   * - "name" - retrieves top-level field
+   * - "user.profile.displayName" - retrieves nested field
+   * - "items.list.element" - retrieves array element schema
+   * - "metadata.key_value.key" - retrieves map key schema
+   * - "metadata.key_value.value" - retrieves map value schema
+   *
+   * @param schema    the Spark StructType to search in
+   * @param fieldName the field name (may contain dots for nested fields)
+   * @return Pair of canonical field name and the StructField
+   * @throws HoodieException if field is not found
+   */
   def getSchemaForField(schema: StructType, fieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
-  }
-
-  def getSchemaForField(schema: StructType, fieldName: String, prefix: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    if (!(fieldName.contains("."))) {
-      org.apache.hudi.common.util.collection.Pair.of(prefix + 
schema.fields(schema.fieldIndex(fieldName)).name, 
schema.fields(schema.fieldIndex(fieldName)))
-    }
-    else {
-      val rootFieldIndex: Int = fieldName.indexOf(".")
-      val rootField: StructField = 
schema.fields(schema.fieldIndex(fieldName.substring(0, rootFieldIndex)))
-      if (rootField == null) {
-        throw new HoodieException("Failed to find " + fieldName + " in the 
table schema ")
-      }
-      getSchemaForField(rootField.dataType.asInstanceOf[StructType], 
fieldName.substring(rootFieldIndex + 1), prefix + fieldName.substring(0, 
rootFieldIndex + 1))
-    }
+    val result = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(schema, 
"temp_schema", "")
+      .getNestedField(fieldName)
+      .orElseThrow(() => new HoodieException(s"Failed to find $fieldName in 
the table schema"))
+    org.apache.hudi.common.util.collection.Pair.of(result.getLeft,
+      StructField(result.getRight.name(),
+        
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(result.getRight.schema()),
+        result.getRight.isNullable))
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
index 2e14ad2c08fa..d1a60a02b80e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
@@ -20,45 +20,117 @@
 package org.apache.hudi.schema
 
 import org.apache.hudi.HoodieSchemaUtils
+import org.apache.hudi.exception.HoodieException
 
-import org.apache.spark.sql.types.{DataType, IntegerType, LongType, 
StringType, StructField, StructType}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
 
 /**
- * Tests {@link HoodieSparkSchemaUtils}
+ * Tests for {@link HoodieSchemaUtils#getSchemaForField}
  */
 class TestHoodieSparkSchemaUtils {
 
-  @Test
-  def testGetSchemaField(): Unit = {
-
-    val nestedStructType = StructType(
-        StructField("nested_long", LongType, nullable = true) ::
-        StructField("nested_int", IntegerType, nullable = true) ::
-          StructField("nested_string", StringType, nullable = true) :: Nil)
-
-    val schema = StructType(
-      StructField("_row_key", StringType, nullable = true) ::
-        StructField("first_name", StringType, nullable = false) ::
-        StructField("last_name", StringType, nullable = true) ::
-        StructField("nested_field", nestedStructType, nullable = true) ::
-        StructField("timestamp", IntegerType, nullable = true) ::
-        StructField("partition", IntegerType, nullable = true) :: Nil)
-
-    assertFieldType(schema, "first_name", StringType)
-    assertFieldType(schema, "timestamp", IntegerType)
-
-    // test nested fields.
-    assertFieldType(schema, "nested_field.nested_long", LongType)
-    assertFieldType(schema, "nested_field.nested_int", IntegerType)
-    assertFieldType(schema, "nested_field.nested_string", StringType)
+  // Test schemas
+  private val simpleSchema = StructType(
+    StructField("id", StringType) ::
+      StructField("name", StringType) ::
+      StructField("count", IntegerType) :: Nil)
+
+  private val nestedSchema = StructType(
+    StructField("id", StringType) ::
+      StructField("nested", StructType(
+        StructField("inner_string", StringType) ::
+          StructField("inner_int", IntegerType) :: Nil)) :: Nil)
+
+  private val arraySchema = StructType(
+    StructField("id", StringType) ::
+      StructField("items", ArrayType(StringType, containsNull = true)) :: Nil)
+
+  private val arrayOfStructSchema = StructType(
+    StructField("id", StringType) ::
+      StructField("items", ArrayType(StructType(
+        StructField("nested_int", IntegerType) ::
+          StructField("nested_string", StringType) :: Nil), containsNull = 
true)) :: Nil)
+
+  private val mapSchema = StructType(
+    StructField("id", StringType) ::
+      StructField("metadata", MapType(StringType, IntegerType, 
valueContainsNull = true)) :: Nil)
+
+  private val mapOfStructSchema = StructType(
+    StructField("id", StringType) ::
+      StructField("nested_map", MapType(StringType, StructType(
+        StructField("nested_int", IntegerType) ::
+          StructField("nested_string", StringType) :: Nil), valueContainsNull 
= true)) :: Nil)
+
+  private val complexSchema = StructType(
+    StructField("id", StringType) ::
+      StructField("top", StructType(
+        StructField("nested_array", ArrayType(StructType(
+          StructField("inner_field", StringType) :: Nil), containsNull = 
true)) ::
+          StructField("nested_map", MapType(StringType, IntegerType, 
valueContainsNull = true)) :: Nil)) :: Nil)
+
+  private def getSchema(name: String): StructType = name match {
+    case "simple" => simpleSchema
+    case "nested" => nestedSchema
+    case "array" => arraySchema
+    case "arrayOfStruct" => arrayOfStructSchema
+    case "map" => mapSchema
+    case "mapOfStruct" => mapOfStructSchema
+    case "complex" => complexSchema
   }
 
-  def assertFieldType(schema: StructType, fieldName: String, expectedDataType: 
DataType): Unit = {
-    val fieldNameSchemaPair = HoodieSchemaUtils.getSchemaForField(schema, 
fieldName)
-    assertEquals(fieldName, fieldNameSchemaPair.getKey)
-    assertEquals(expectedDataType, fieldNameSchemaPair.getValue.dataType)
+  @ParameterizedTest
+  @CsvSource(Array(
+    // Simple field tests
+    "simple,       id,                                            id,          
                                  string",
+    "simple,       name,                                          name,        
                                  string",
+    "simple,       count,                                         count,       
                                  int",
+    // Nested struct field tests
+    "nested,       nested.inner_string,                           
nested.inner_string,                           string",
+    "nested,       nested.inner_int,                              
nested.inner_int,                              int",
+    // Array element access using .list.element
+    "array,        items.list.element,                            
items.list.element,                            string",
+    // Array of struct - access nested fields within array elements
+    "arrayOfStruct, items.list.element.nested_int,                
items.list.element.nested_int,                 int",
+    "arrayOfStruct, items.list.element.nested_string,             
items.list.element.nested_string,              string",
+    // Map key/value access using .key_value.key and .key_value.value
+    "map,          metadata.key_value.key,                        
metadata.key_value.key,                        string",
+    "map,          metadata.key_value.value,                      
metadata.key_value.value,                      int",
+    // Map of struct - access nested fields within map values
+    "mapOfStruct,  nested_map.key_value.value.nested_int,         
nested_map.key_value.value.nested_int,         int",
+    "mapOfStruct,  nested_map.key_value.value.nested_string,      
nested_map.key_value.value.nested_string,      string",
+    // Complex nested: struct -> array -> struct
+    "complex,      top.nested_array.list.element.inner_field,     
top.nested_array.list.element.inner_field,     string",
+    // Complex nested: struct -> map
+    "complex,      top.nested_map.key_value.key,                  
top.nested_map.key_value.key,                  string",
+    "complex,      top.nested_map.key_value.value,                
top.nested_map.key_value.value,                int"
+  ))
+  def testGetSchemaForField(schemaName: String, inputPath: String, 
expectedKey: String, expectedType: String): Unit = {
+    val schema = getSchema(schemaName.trim)
+    val result = HoodieSchemaUtils.getSchemaForField(schema, inputPath.trim)
+    assertEquals(expectedKey.trim, result.getKey)
+    assertEquals(expectedType.trim, result.getValue.dataType.simpleString)
   }
 
+  @Test
+  def testInvalidPaths(): Unit = {
+    // Invalid array paths
+    assertThrows(classOf[HoodieException], () =>
+      HoodieSchemaUtils.getSchemaForField(arraySchema, "items.wrong.path"))
+    assertThrows(classOf[HoodieException], () =>
+      HoodieSchemaUtils.getSchemaForField(arraySchema, "items.list.missing"))
+
+    // Invalid map paths
+    assertThrows(classOf[HoodieException], () =>
+      HoodieSchemaUtils.getSchemaForField(mapSchema, "metadata.wrong.path"))
+    assertThrows(classOf[HoodieException], () =>
+      HoodieSchemaUtils.getSchemaForField(mapSchema, 
"metadata.key_value.key.invalid"))
+
+    // Non-existent field
+    assertThrows(classOf[HoodieException], () =>
+      HoodieSchemaUtils.getSchemaForField(simpleSchema, "nonexistent"))
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 5b6337189737..a3e0c5701b81 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, 
DataSourceWriteOptions, HoodieSchemaConversionUtils}
+import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, 
DataSourceReadOptions, DataSourceWriteOptions, HoodieSchemaConversionUtils}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
RECORDKEY_FIELD}
 import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, 
AttributeReference, Great
 import 
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, 
assertNotNull, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource}
 
@@ -259,6 +259,381 @@ class TestColumnStatsIndex extends 
ColumnStatIndexTestBase {
       addNestedFiled = true)
   }
 
+  /**
+   * Tests data skipping with nested MAP and ARRAY fields in column stats 
index.
+   * This test verifies that queries can efficiently skip files based on 
nested field values
+   * within MAP and ARRAY types using the new Parquet-style accessor patterns.
+   */
+  @ParameterizedTest
+  @MethodSource(Array("testMetadataColumnStatsIndexParamsInMemory"))
+  def testMetadataColumnStatsIndexNestedMapArrayDataSkipping(testCase: 
ColumnStatsTestCase): Unit = {
+    // Define nested struct schema
+    val nestedSchema = new StructType()
+      .add("nested_int", IntegerType, false)
+      .add("level", StringType, true)
+
+    // Define full schema with MAP and ARRAY of nested structs
+    val testSchema = new StructType()
+      .add("record_key", StringType, false)
+      .add("partition_col", IntegerType, false)
+      .add("nullable_map_field", MapType(StringType, nestedSchema), true)
+      .add("array_field", ArrayType(nestedSchema), false)
+
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key ->
+        
"record_key,partition_col,nullable_map_field.key_value.value.nested_int,nullable_map_field.key_value.value.level,array_field.list.element.nested_int,array_field.list.element.level"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_nested_skipping",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "record_key",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "record_key",
+      PARTITIONPATH_FIELD.key -> "partition_col",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> "10240",
+      HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
+    ) ++ metadataOpts
+
+    // Batch 1 - Low Range Values (should be skipped when querying for 
nested_int > 200)
+    val batch1Data = Seq(
+      Row("key_001", 1,
+        Map("item1" -> Row(50, "low"), "item2" -> Row(75, "low")),
+        Array(Row(60, "low"), Row(80, "low"))),
+      Row("key_002", 1,
+        Map("item1" -> Row(30, "low"), "item2" -> Row(90, "low")),
+        Array(Row(40, "low"), Row(70, "low")))
+    )
+
+    // Batch 2 - High Range MAP and ARRAY (should be read when querying for 
nested_int > 200)
+    val batch2Data = Seq(
+      Row("key_003", 1,
+        Map("item1" -> Row(250, "high"), "item2" -> Row(275, "high")),
+        Array(Row(260, "high"), Row(280, "high"))),
+      Row("key_004", 1,
+        Map("item1" -> Row(230, "high"), "item2" -> Row(290, "high")),
+        Array(Row(240, "high"), Row(270, "high")))
+    )
+
+    // Batch 3 - Mixed Range (low MAP, high ARRAY)
+    val batch3Data = Seq(
+      Row("key_005", 2,
+        Map("item1" -> Row(50, "mixed"), "item2" -> Row(75, "mixed")),
+        Array(Row(260, "high"), Row(280, "high"))),
+      Row("key_006", 2,
+        Map("item1" -> Row(30, "mixed"), "item2" -> Row(90, "mixed")),
+        Array(Row(240, "high"), Row(270, "high")))
+    )
+
+    // Write Batch 1
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(batch1Data), testSchema)
+    df1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    // Write Batch 2
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(batch2Data), testSchema)
+    df2.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // Write Batch 3
+    val df3 = 
spark.createDataFrame(spark.sparkContext.parallelize(batch3Data), testSchema)
+    df3.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+
+    // Query options with data skipping enabled
+    val queryOpts = Map(
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+    ) ++ metadataOpts
+
+    // Query 1: Filter on MAP nested_int (high range)
+    val resultDf1 = spark.read.format("hudi")
+      .options(queryOpts)
+      .load(basePath)
+      .filter("EXISTS(map_values(nullable_map_field), v -> v.nested_int > 
200)")
+
+    // Expected: 2 records from batch2 (key_003, key_004)
+    assertArrayEquals(
+      Array[Object]("key_003", "key_004"),
+      
resultDf1.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+    )
+
+    // Query 2: Filter on ARRAY nested_int (high range)
+    val resultDf2 = spark.read.format("hudi")
+      .options(queryOpts)
+      .load(basePath)
+      .filter("EXISTS(array_field, elem -> elem.nested_int > 200)")
+
+    // Expected: 4 records from batch2 and batch3 (key_003, key_004, key_005, 
key_006)
+    assertArrayEquals(
+      Array[Object]("key_003", "key_004", "key_005", "key_006"),
+      
resultDf2.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+    )
+
+    // Query 3: Filter on MAP level field (string)
+    val resultDf3 = spark.read.format("hudi")
+      .options(queryOpts)
+      .load(basePath)
+      .filter("EXISTS(map_values(nullable_map_field), v -> v.level = 'high')")
+
+    // Expected: 2 records from batch2
+    assertArrayEquals(
+      Array[Object]("key_003", "key_004"),
+      
resultDf3.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+    )
+
+    // Query 4: Combined filter (both MAP and ARRAY conditions)
+    val resultDf4 = spark.read.format("hudi")
+      .options(queryOpts)
+      .load(basePath)
+      .filter("EXISTS(map_values(nullable_map_field), v -> v.nested_int > 200) 
" +
+        "AND EXISTS(array_field, elem -> elem.nested_int > 200)")
+
+    // Expected: 2 records from batch2 only (both MAP and ARRAY have high 
values)
+    assertArrayEquals(
+      Array[Object]("key_003", "key_004"),
+      
resultDf4.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+    )
+
+    // Validate column stats were created for nested fields
+    val metadataConfig = HoodieMetadataConfig.newBuilder()
+      .fromProperties(toProperties(metadataOpts))
+      .build()
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(testSchema, 
"record", "")
+    val columnStatsIndex = new ColumnStatsIndexSupport(
+      spark,
+      testSchema,
+      hoodieSchema,
+      metadataConfig,
+      metaClient
+    )
+
+    val indexedColumns = Seq(
+      "nullable_map_field.key_value.value.nested_int",
+      "array_field.list.element.nested_int"
+    )
+
+    columnStatsIndex.loadTransposed(indexedColumns, 
testCase.shouldReadInMemory) { transposedDF =>
+      // Verify we have stats for all 3 file groups (may have more files for 
MOR due to updates)
+      val fileCount = transposedDF.select("fileName").distinct().count()
+      assertTrue(fileCount >= 3, s"Expected at least 3 files with column 
stats, got $fileCount")
+
+      // Verify min/max ranges for MAP field
+      val mapStats = transposedDF.select(
+        "`nullable_map_field.key_value.value.nested_int_minValue`",
+        "`nullable_map_field.key_value.value.nested_int_maxValue`"
+      ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+      // Expected stats: Batch1[30,90], Batch2[230,290], Batch3[30,90]
+      // We should have exactly one file with [230,290] (high range) and at 
least two with [30,90] (low range)
+      val mapHighRangeCount = mapStats.count(stat => stat._1 == 230 && stat._2 
== 290)
+      val mapLowRangeCount = mapStats.count(stat => stat._1 == 30 && stat._2 
== 90)
+      assertEquals(1, mapHighRangeCount, "Expected exactly 1 file with MAP 
range [230,290]")
+      assertTrue(mapLowRangeCount >= 2, s"Expected at least 2 files with MAP 
range [30,90], got $mapLowRangeCount")
+
+      // Verify min/max ranges for ARRAY field
+      val arrayStats = transposedDF.select(
+        "`array_field.list.element.nested_int_minValue`",
+        "`array_field.list.element.nested_int_maxValue`"
+      ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+      // Expected stats: Batch1[40,80], Batch2[260,280], Batch3[240,280]
+      // We should have exactly one file with [40,80] (low range) and at least 
two with high ranges
+      val arrayLowRangeCount = arrayStats.count(stat => stat._1 == 40 && 
stat._2 == 80)
+      val arrayHighRangeCount = arrayStats.count(stat => stat._1 >= 240 && 
stat._2 == 280)
+      assertEquals(1, arrayLowRangeCount, "Expected exactly 1 file with ARRAY 
range [40,80]")
+      assertTrue(arrayHighRangeCount >= 2, s"Expected at least 2 files with 
ARRAY high ranges, got $arrayHighRangeCount")
+    }
+    // Validate that indexed columns are registered correctly
+    validateColumnsToIndex(metaClient, Seq(
+      HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+      HoodieRecord.RECORD_KEY_METADATA_FIELD,
+      HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+      "record_key",
+      "partition_col",
+      "nullable_map_field.key_value.value.nested_int",
+      "nullable_map_field.key_value.value.level",
+      "array_field.list.element.nested_int",
+      "array_field.list.element.level"
+    ))
+  }
+
+  /**
+   * Tests column stats index with multi-level nested ARRAY fields (2 and 3 
levels deep).
+   * This verifies that the .list.element pattern works correctly for deeply 
nested arrays.
+   */
+  @ParameterizedTest
+  @MethodSource(Array("testMetadataColumnStatsIndexParamsInMemory"))
+  def testMetadataColumnStatsIndexMultiLevelNestedArrays(testCase: 
ColumnStatsTestCase): Unit = {
+    // Define nested struct for innermost level
+    val innerStruct = new StructType()
+      .add("value", IntegerType, false)
+
+    // Define schema with 2-level and 3-level nested arrays
+    val testSchema = new StructType()
+      .add("record_key", StringType, false)
+      .add("partition_col", IntegerType, false)
+      // 2-level: Array of Array of Int
+      .add("level2_array", ArrayType(ArrayType(IntegerType)), false)
+      // 3-level: Array of Array of Array of Int
+      .add("level3_array", ArrayType(ArrayType(ArrayType(IntegerType))), false)
+      // 2-level array of struct: Array of Array of Struct
+      .add("level2_array_struct", ArrayType(ArrayType(innerStruct)), false)
+
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key ->
+        ("record_key,partition_col," +
+          "level2_array.list.element.list.element," +
+          "level3_array.list.element.list.element.list.element," +
+          "level2_array_struct.list.element.list.element.value")
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_multi_level_arrays",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "record_key",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "record_key",
+      PARTITIONPATH_FIELD.key -> "partition_col",
+      HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+      HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> "10240",
+      HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> 
testCase.tableVersion.toString
+    ) ++ metadataOpts
+
+    // Batch 1 - Low range values
+    val batch1Data = Seq(
+      Row("key_001", 1,
+        Array(Array(10, 20), Array(30, 40)),           // level2_array: 
min=10, max=40
+        Array(Array(Array(5, 10), Array(15, 20))),    // level3_array: min=5, 
max=20
+        Array(Array(Row(100), Row(150)))),            // 
level2_array_struct.value: min=100, max=150
+      Row("key_002", 1,
+        Array(Array(15, 25), Array(35, 45)),           // level2_array: 
min=15, max=45
+        Array(Array(Array(8, 12), Array(18, 22))),    // level3_array: min=8, 
max=22
+        Array(Array(Row(110), Row(160))))             // 
level2_array_struct.value: min=110, max=160
+    )
+
+    // Batch 2 - High range values
+    val batch2Data = Seq(
+      Row("key_003", 1,
+        Array(Array(500, 600), Array(700, 800)),       // level2_array: 
min=500, max=800
+        Array(Array(Array(300, 350), Array(400, 450))), // level3_array: 
min=300, max=450
+        Array(Array(Row(1000), Row(1500)))),          // 
level2_array_struct.value: min=1000, max=1500
+      Row("key_004", 1,
+        Array(Array(550, 650), Array(750, 850)),       // level2_array: 
min=550, max=850
+        Array(Array(Array(320, 370), Array(420, 470))), // level3_array: 
min=320, max=470
+        Array(Array(Row(1100), Row(1600))))           // 
level2_array_struct.value: min=1100, max=1600
+    )
+
+    // Write Batch 1
+    val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(batch1Data), testSchema)
+    df1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    // Write Batch 2
+    val df2 = 
spark.createDataFrame(spark.sparkContext.parallelize(batch2Data), testSchema)
+    df2.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+
+    // Validate column stats were created for multi-level nested fields
+    val metadataConfig = HoodieMetadataConfig.newBuilder()
+      .fromProperties(toProperties(metadataOpts))
+      .build()
+
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(testSchema, 
"record", "")
+    val columnStatsIndex = new ColumnStatsIndexSupport(
+      spark,
+      testSchema,
+      hoodieSchema,
+      metadataConfig,
+      metaClient
+    )
+
+    val indexedColumns = Seq(
+      "level2_array.list.element.list.element",
+      "level3_array.list.element.list.element.list.element",
+      "level2_array_struct.list.element.list.element.value"
+    )
+
+    columnStatsIndex.loadTransposed(indexedColumns, 
testCase.shouldReadInMemory) { transposedDF =>
+      // Verify we have stats for both file groups
+      val fileCount = transposedDF.select("fileName").distinct().count()
+      assertTrue(fileCount >= 2, s"Expected at least 2 files with column 
stats, got $fileCount")
+
+      // Verify 2-level array stats
+      val level2Stats = transposedDF.select(
+        "`level2_array.list.element.list.element_minValue`",
+        "`level2_array.list.element.list.element_maxValue`"
+      ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+      val level2LowCount = level2Stats.count(stat => stat._1 >= 10 && stat._2 
<= 50)
+      val level2HighCount = level2Stats.count(stat => stat._1 >= 500 && 
stat._2 >= 800)
+      assertTrue(level2LowCount >= 1, s"Expected at least 1 file with level2 
low range, got $level2LowCount")
+      assertTrue(level2HighCount >= 1, s"Expected at least 1 file with level2 
high range, got $level2HighCount")
+
+      // Verify 3-level array stats
+      val level3Stats = transposedDF.select(
+        "`level3_array.list.element.list.element.list.element_minValue`",
+        "`level3_array.list.element.list.element.list.element_maxValue`"
+      ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+      val level3LowCount = level3Stats.count(stat => stat._1 >= 5 && stat._2 
<= 25)
+      val level3HighCount = level3Stats.count(stat => stat._1 >= 300 && 
stat._2 >= 450)
+      assertTrue(level3LowCount >= 1, s"Expected at least 1 file with level3 
low range, got $level3LowCount")
+      assertTrue(level3HighCount >= 1, s"Expected at least 1 file with level3 
high range, got $level3HighCount")
+
+      // Verify 2-level array of struct stats
+      val level2StructStats = transposedDF.select(
+        "`level2_array_struct.list.element.list.element.value_minValue`",
+        "`level2_array_struct.list.element.list.element.value_maxValue`"
+      ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+      val structLowCount = level2StructStats.count(stat => stat._1 >= 100 && 
stat._2 <= 200)
+      val structHighCount = level2StructStats.count(stat => stat._1 >= 1000 && 
stat._2 >= 1500)
+      assertTrue(structLowCount >= 1, s"Expected at least 1 file with struct 
low range, got $structLowCount")
+      assertTrue(structHighCount >= 1, s"Expected at least 1 file with struct 
high range, got $structHighCount")
+    }
+
+    // Validate that indexed columns are registered correctly
+    validateColumnsToIndex(metaClient, Seq(
+      HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+      HoodieRecord.RECORD_KEY_METADATA_FIELD,
+      HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+      "record_key",
+      "partition_col",
+      "level2_array.list.element.list.element",
+      "level3_array.list.element.list.element.list.element",
+      "level2_array_struct.list.element.list.element.value"
+    ))
+  }
+
   @ParameterizedTest
   @MethodSource(Array("testTableTypePartitionTypeParams"))
   def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: 
HoodieTableType, partitionCol : String, tableVersion: Int): Unit = {

Reply via email to