This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ca4c40bc5a7 fix: Handle nested map and array columns in MDT (#17694)
6ca4c40bc5a7 is described below
commit 6ca4c40bc5a74d42fe844db0557ece598b5fdb5b
Author: Vinish Reddy <[email protected]>
AuthorDate: Sat Jan 24 01:51:20 2026 +0530
fix: Handle nested map and array columns in MDT (#17694)
* Handle nested map and array columns in MDT
* Address comments from self review
* Address comments and Add functional tests for nested fields
* Handle nested arrary and map fields in HoodieSchemaUtils.scala as well
* Remove duplicate code by moving nested schema handling to HoodieSchema,
standarize to list.element for nested array columns
* Address comments - 2
* Address comments - Tim
---
.../hudi/common/config/HoodieMetadataConfig.java | 7 +-
.../apache/hudi/common/schema/HoodieSchema.java | 176 ++++++++++
.../hudi/common/schema/HoodieSchemaUtils.java | 44 +--
.../hudi/common/schema/TestHoodieSchemaUtils.java | 200 +++++++++++
.../org/apache/hudi/common/util/ParquetUtils.java | 41 ++-
.../scala/org/apache/hudi/HoodieSchemaUtils.scala | 43 ++-
.../hudi/schema/TestHoodieSparkSchemaUtils.scala | 132 +++++--
.../hudi/functional/TestColumnStatsIndex.scala | 379 ++++++++++++++++++++-
8 files changed, 931 insertions(+), 91 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index ec688f7768d6..d4bc9dca8fc4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -219,7 +219,12 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.11.0")
- .withDocumentation("Comma-separated list of columns for which column
stats index will be built. If not set, all columns will be indexed");
+ .withDocumentation("Comma-separated list of columns for which column
stats index will be built. "
+ + "If not set, all columns will be indexed. "
+ + "For nested fields within ARRAY types, use: field.list.element "
+ + "(e.g., items.list.element or items.list.element.price). "
+ + "For nested fields within MAP types, use: field.key_value.key for
keys or field.key_value.value for values "
+ + "(e.g., metadata.key_value.key, metadata.key_value.value, or
metadata.key_value.value.nested_field).");
public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_MAX_COLUMNS =
ConfigProperty
.key(METADATA_PREFIX + ".index.column.stats.max.columns.to.index")
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index a8222670df1a..e13fdc17179e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -28,6 +28,8 @@ import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
+import org.apache.hudi.common.util.collection.Pair;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -86,6 +88,25 @@ public class HoodieSchema implements Serializable {
public static final Object NULL_VALUE = JsonProperties.NULL_VALUE;
public static final HoodieSchema NULL_SCHEMA =
HoodieSchema.create(HoodieSchemaType.NULL);
private static final long serialVersionUID = 1L;
+
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private static final String ARRAY_LIST = "list";
+ private static final String ARRAY_ELEMENT = "element";
+ private static final String MAP_KEY_VALUE = "key_value";
+ private static final String MAP_KEY = "key";
+ private static final String MAP_VALUE = "value";
+
+ private static final String ARRAY_LIST_ELEMENT = ARRAY_LIST + "." +
ARRAY_ELEMENT;
+ private static final String MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." +
MAP_KEY;
+ private static final String MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." +
MAP_VALUE;
+
+ public static final String PARQUET_ARRAY_SPARK = ".array";
+ public static final String PARQUET_ARRAY_AVRO = "." + ARRAY_LIST_ELEMENT;
+
private Schema avroSchema;
private HoodieSchemaType type;
@@ -754,6 +775,161 @@ public class HoodieSchema implements Serializable {
return types.get(0).getType() != HoodieSchemaType.NULL ? types.get(0) :
types.get(1);
}
+ /**
+ * Gets a nested field using dot notation, supporting Parquet-style
array/map accessors.
+ *
+ * <p>Supports nested field access using dot notation including MAP and
ARRAY types
+ * using Parquet-style accessor patterns:</p>
+ *
+ * <ul>
+ * <li><b>RECORD types:</b> Standard dot notation (e.g., {@code
"user.profile.name"})</li>
+ * <li><b>ARRAY types:</b> Use {@code ".list.element"} to access array
elements
+ * <ul>
+ * <li>Example: {@code "items.list.element"} accesses element schema
of array</li>
+ * <li>Example: {@code "items.list.element.id"} accesses nested
field within array elements</li>
+ * </ul>
+ * </li>
+ * <li><b>MAP types:</b> Use {@code ".key_value.key"} or {@code
".key_value.value"} to access map components
+ * <ul>
+ * <li>Example: {@code "metadata.key_value.key"} accesses map keys
(always STRING)</li>
+ * <li>Example: {@code "metadata.key_value.value"} accesses map
value schema</li>
+ * <li>Example: {@code "nested_map.key_value.value.field"} accesses
nested field within map values</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>Note: Spark Parquet files may use {@code ".array"} format instead of
{@code ".list.element"}.
+ * This translation is handled at the Parquet reading level in ParquetUtils,
not here.</p>
+ *
+ * @param fieldName Field path (e.g., "user.profile.name",
"items.list.element", "metadata.key_value.value")
+ * @return Option containing a pair of canonical field name and the
HoodieSchemaField, or Option.empty() if not found
+ */
+ public Option<Pair<String, HoodieSchemaField>> getNestedField(String
fieldName) {
+ ValidationUtils.checkArgument(fieldName != null && !fieldName.isEmpty(),
"Field name cannot be null or empty");
+ return getNestedFieldInternal(this, fieldName, 0, "");
+ }
+
+ /**
+ * Internal helper method for recursively retrieving nested fields using
offset-based navigation.
+ *
+ * @param schema the current schema to search in
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath
+ * @param prefix the accumulated field path prefix
+ * @return Option containing a pair of canonical field name and the
HoodieSchemaField, or Option.empty() if field not found
+ */
+ private static Option<Pair<String, HoodieSchemaField>>
getNestedFieldInternal(
+ HoodieSchema schema, String fullPath, int offset, String prefix) {
+ HoodieSchema nonNullableSchema = schema.getNonNullType();
+ int nextDot = fullPath.indexOf('.', offset);
+ // Terminal case: no more dots in this segment
+ if (nextDot == -1) {
+ String fieldName = fullPath.substring(offset);
+ // Handle RECORD terminal case
+ if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) {
+ return Option.empty();
+ }
+ return nonNullableSchema.getField(fieldName)
+ .map(field -> Pair.of(prefix + fieldName, field));
+ }
+ // Recursive case: more nesting to explore
+ String rootFieldName = fullPath.substring(offset, nextDot);
+ int nextOffset = nextDot + 1;
+ // Handle RECORD: standard field navigation
+ if (nonNullableSchema.getType() == HoodieSchemaType.RECORD) {
+ return nonNullableSchema.getField(rootFieldName)
+ .flatMap(field -> getNestedFieldInternal(field.schema(), fullPath,
nextOffset, prefix + rootFieldName + "."));
+ }
+ // Handle ARRAY: expect ".list.element"
+ if (nonNullableSchema.getType() == HoodieSchemaType.ARRAY &&
ARRAY_LIST.equals(rootFieldName)) {
+ return handleArrayNavigation(nonNullableSchema, fullPath, nextOffset,
prefix);
+ }
+ // Handle MAP: expect ".key_value.key" or ".key_value.value"
+ if (nonNullableSchema.getType() == HoodieSchemaType.MAP &&
MAP_KEY_VALUE.equals(rootFieldName)) {
+ return handleMapNavigation(nonNullableSchema, fullPath, nextOffset,
prefix);
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Handles navigation into ARRAY types using the ".list.element" pattern.
+ *
+ * @param arraySchema the ARRAY schema to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to
"element")
+ * @param prefix the accumulated field path prefix
+ * @return Option containing the nested field, or Option.empty() if invalid
path
+ */
+ private static Option<Pair<String, HoodieSchemaField>> handleArrayNavigation(
+ HoodieSchema arraySchema, String fullPath, int offset, String prefix) {
+ int nextPos = getNextOffset(fullPath, offset, ARRAY_ELEMENT);
+ if (nextPos == -1) {
+ return Option.empty();
+ }
+
+ HoodieSchema elementSchema = arraySchema.getElementType();
+ if (nextPos == fullPath.length()) {
+ return Option.of(Pair.of(prefix + ARRAY_LIST_ELEMENT,
+ HoodieSchemaField.of(ARRAY_ELEMENT, elementSchema, null, null)));
+ }
+ return getNestedFieldInternal(elementSchema, fullPath, nextPos, prefix +
ARRAY_LIST_ELEMENT + ".");
+ }
+
+ /**
+ * Handles navigation into MAP types using the Parquet-style
".key_value.key" or ".key_value.value" patterns.
+ *
+ * @param mapSchema the MAP schema to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to "key" or
"value")
+ * @param prefix the accumulated field path prefix
+ * @return Option containing the nested field, or Option.empty() if invalid
path
+ */
+ private static Option<Pair<String, HoodieSchemaField>> handleMapNavigation(
+ HoodieSchema mapSchema, String fullPath, int offset, String prefix) {
+ // Check for "key" path
+ int keyPos = getNextOffset(fullPath, offset, MAP_KEY);
+ if (keyPos != -1) {
+ if (keyPos == fullPath.length()) {
+ return Option.of(Pair.of(prefix + MAP_KEY_VALUE_KEY,
+ HoodieSchemaField.of(MAP_KEY, mapSchema.getKeyType(), null,
null)));
+ }
+ // Map keys are primitives, cannot navigate further
+ return Option.empty();
+ }
+
+ // Check for "value" path
+ int valuePos = getNextOffset(fullPath, offset, MAP_VALUE);
+ if (valuePos == -1) {
+ return Option.empty();
+ }
+
+ HoodieSchema valueSchema = mapSchema.getValueType();
+ if (valuePos == fullPath.length()) {
+ return Option.of(Pair.of(prefix + MAP_KEY_VALUE_VALUE,
+ HoodieSchemaField.of(MAP_VALUE, valueSchema, null, null)));
+ }
+ return getNestedFieldInternal(valueSchema, fullPath, valuePos, prefix +
MAP_KEY_VALUE_VALUE + ".");
+ }
+
+ /**
+ * Advances offset past a component name in the path, handling end-of-path
and dot separator.
+ *
+ * @param path the full path string
+ * @param offset current position in path
+ * @param component the component name to match (e.g., "element", "key",
"value")
+ * @return new offset after component and dot, or path.length() if at end,
or -1 if no match
+ */
+ private static int getNextOffset(String path, int offset, String component) {
+ if (!path.regionMatches(offset, component, 0, component.length())) {
+ return -1;
+ }
+ int next = offset + component.length();
+ if (next == path.length()) {
+ return next;
+ }
+ return (path.charAt(next) == '.') ? next + 1 : -1;
+ }
+
/**
* Returns the underlying Avro schema for compatibility purposes.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index c8244fc32e61..eea2ea6ca723 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -386,12 +386,15 @@ public final class HoodieSchemaUtils {
/**
* Gets a field (including nested fields) from the schema using dot notation.
- * This is equivalent to HoodieAvroUtils.getSchemaForField() but operates on
HoodieSchema.
+ * This method delegates to {@link HoodieSchema#getNestedField(String)}.
* <p>
* Supports nested field access using dot notation. For example:
* <ul>
* <li>"name" - retrieves top-level field</li>
* <li>"user.profile.displayName" - retrieves nested field</li>
+ * <li>"items.list.element" - retrieves array element schema </li>
+ * <li>"metadata.key_value.key" - retrieves map key schema</li>
+ * <li>"metadata.key_value.value" - retrieves map value schema</li>
* </ul>
*
* @param schema the schema to search in
@@ -403,44 +406,7 @@ public final class HoodieSchemaUtils {
public static Option<Pair<String, HoodieSchemaField>>
getNestedField(HoodieSchema schema, String fieldName) {
ValidationUtils.checkArgument(schema != null, "Schema cannot be null");
ValidationUtils.checkArgument(fieldName != null && !fieldName.isEmpty(),
"Field name cannot be null or empty");
- return getNestedFieldInternal(schema, fieldName, "");
- }
-
- /**
- * Internal helper method for recursively retrieving nested fields.
- *
- * @param schema the current schema to search in
- * @param fieldName the remaining field path
- * @param prefix the accumulated field path prefix
- * @return Option containing Pair of canonical field name and the
HoodieSchemaField, or Option.empty() if field not found
- */
- private static Option<Pair<String, HoodieSchemaField>>
getNestedFieldInternal(HoodieSchema schema, String fieldName, String prefix) {
- HoodieSchema nonNullableSchema = getNonNullTypeFromUnion(schema);
-
- if (!fieldName.contains(".")) {
- // Base case: simple field name
- if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) {
- return Option.empty();
- }
- return nonNullableSchema.getField(fieldName)
- .map(field -> Pair.of(prefix + fieldName, field));
- } else {
- // Recursive case: nested field
- if (nonNullableSchema.getType() != HoodieSchemaType.RECORD) {
- return Option.empty();
- }
-
- int dotIndex = fieldName.indexOf(".");
- String rootFieldName = fieldName.substring(0, dotIndex);
- String remainingPath = fieldName.substring(dotIndex + 1);
-
- return nonNullableSchema.getField(rootFieldName)
- .flatMap(rootField -> getNestedFieldInternal(
- rootField.schema(),
- remainingPath,
- prefix + rootFieldName + "."
- ));
- }
+ return schema.getNestedField(fieldName);
}
/**
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index c35e9e11b78e..851d07e02cb0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -2175,4 +2175,204 @@ public class TestHoodieSchemaUtils {
assertEquals("timestamp-micros",
prunedCol0FieldOpt.get().schema().getProp(logicalTypeKey));
assertEquals("uuid",
prunedCol1FieldOpt.get().schema().getProp(logicalTypeKey));
}
+
+ @Test
+ public void testGetNestedFieldWithArrayListElement() {
+ // Create schema with array field
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("items",
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)))
+ )
+ );
+
+ // Test: items.list.element should resolve to STRING
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(schema, "items.list.element");
+ assertTrue(result.isPresent());
+ assertEquals("items.list.element", result.get().getLeft());
+ assertEquals("element", result.get().getRight().name());
+ assertEquals(HoodieSchemaType.STRING,
result.get().getRight().schema().getType());
+ }
+
+ @Test
+ public void testGetNestedFieldWithArrayListElementAndNesting() {
+ // Create schema with array of records
+ HoodieSchema nestedSchema = HoodieSchema.createRecord(
+ "NestedRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("nested_int",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("nested_string",
HoodieSchema.create(HoodieSchemaType.STRING))
+ )
+ );
+
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("items",
HoodieSchema.createArray(nestedSchema))
+ )
+ );
+
+ // Test: items.list.element.nested_int should resolve to INT
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(schema, "items.list.element.nested_int");
+ assertTrue(result.isPresent());
+ assertEquals("items.list.element.nested_int", result.get().getLeft());
+ assertEquals("nested_int", result.get().getRight().name());
+ assertEquals(HoodieSchemaType.INT,
result.get().getRight().schema().getType());
+ }
+
+ @Test
+ public void testGetNestedFieldWithMapKeyValueKey() {
+ // Create schema with map field
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("metadata",
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)))
+ )
+ );
+
+ // Test: metadata.key_value.key should resolve to STRING (MAP keys are
always STRING)
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(schema, "metadata.key_value.key");
+ assertTrue(result.isPresent());
+ assertEquals("metadata.key_value.key", result.get().getLeft());
+ assertEquals("key", result.get().getRight().name());
+ assertEquals(HoodieSchemaType.STRING,
result.get().getRight().schema().getType());
+ }
+
+ @Test
+ public void testGetNestedFieldWithMapKeyValueValue() {
+ // Create schema with map field
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("metadata",
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)))
+ )
+ );
+
+ // Test: metadata.key_value.value should resolve to INT
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(schema, "metadata.key_value.value");
+ assertTrue(result.isPresent());
+ assertEquals("metadata.key_value.value", result.get().getLeft());
+ assertEquals("value", result.get().getRight().name());
+ assertEquals(HoodieSchemaType.INT,
result.get().getRight().schema().getType());
+ }
+
+ @Test
+ public void testGetNestedFieldWithMapKeyValueValueAndNesting() {
+ // Create schema with map of records (like the original failing case)
+ HoodieSchema nestedSchema = HoodieSchema.createRecord(
+ "NestedRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("nested_int",
HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("nested_string",
HoodieSchema.create(HoodieSchemaType.STRING))
+ )
+ );
+
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("id",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("nested_map",
HoodieSchema.createMap(nestedSchema))
+ )
+ );
+
+ // Test: nested_map.key_value.value.nested_int should resolve to INT
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(schema,
"nested_map.key_value.value.nested_int");
+ assertTrue(result.isPresent());
+ assertEquals("nested_map.key_value.value.nested_int",
result.get().getLeft());
+ assertEquals("nested_int", result.get().getRight().name());
+ assertEquals(HoodieSchemaType.INT,
result.get().getRight().schema().getType());
+ }
+
+ @Test
+ public void testGetNestedFieldWithInvalidArrayPath() {
+ // Create schema with array field
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("items",
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)))
+ )
+ );
+
+ // Test: Invalid array path should return Option.empty()
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(schema, "items.wrong.path");
+ assertFalse(result.isPresent());
+
+ // Test: Missing "element" should return Option.empty()
+ result = HoodieSchemaUtils.getNestedField(schema, "items.list.missing");
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testGetNestedFieldWithInvalidMapPath() {
+ // Create schema with map field
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("metadata",
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)))
+ )
+ );
+
+ // Test: Invalid map path should return Option.empty()
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(schema, "metadata.wrong.path");
+ assertFalse(result.isPresent());
+
+ // Test: Missing "key" or "value" should return Option.empty()
+ result = HoodieSchemaUtils.getNestedField(schema,
"metadata.key_value.missing");
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testGetNestedFieldComplexNestedMapAndArray() {
+ // Create complex schema: record with map of arrays of records
+ HoodieSchema innerRecord = HoodieSchema.createRecord(
+ "InnerRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("value",
HoodieSchema.create(HoodieSchemaType.LONG))
+ )
+ );
+
+ HoodieSchema arrayOfRecords = HoodieSchema.createArray(innerRecord);
+ HoodieSchema mapOfArrays = HoodieSchema.createMap(arrayOfRecords);
+
+ HoodieSchema schema = HoodieSchema.createRecord(
+ "TestRecord",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("complex_field", mapOfArrays)
+ )
+ );
+
+ // Test: complex_field.key_value.value.list.element.value should resolve
to LONG
+ Option<Pair<String, HoodieSchemaField>> result =
HoodieSchemaUtils.getNestedField(
+ schema, "complex_field.key_value.value.list.element.value");
+ assertTrue(result.isPresent());
+ assertEquals("complex_field.key_value.value.list.element.value",
result.get().getLeft());
+ assertEquals("value", result.get().getRight().name());
+ assertEquals(HoodieSchemaType.LONG,
result.get().getRight().schema().getType());
+ }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e0fdbe5c849d..678c026e0089 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -294,17 +294,27 @@ public class ParquetUtils extends FileFormatUtils {
}
public List<HoodieColumnRangeMetadata<Comparable>>
readColumnStatsFromMetadata(ParquetMetadata metadata, String filePath,
Option<List<String>> columnList, HoodieIndexVersion indexVersion) {
+ // Create a set of columns to match, including both canonical
(.list.element) and Parquet (.array) formats
+ // This handles files written by both Spark (uses .array) and Avro (uses
.list.element) writers
+ Set<String> columnsToMatch = columnList.map(cols -> {
+ Set<String> set = new HashSet<>(cols);
+ cols.forEach(col -> set.add(convertListElementToArray(col)));
+ return set;
+ }).orElse(null);
+
// Collect stats from all individual Parquet blocks
Stream<HoodieColumnRangeMetadata<Comparable>>
hoodieColumnRangeMetadataStream =
metadata.getBlocks().stream().sequential().flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
- .filter(f -> !columnList.isPresent() ||
columnList.get().contains(f.getPath().toDotString()))
+ .filter(f -> columnsToMatch == null ||
columnsToMatch.contains(f.getPath().toDotString()))
.map(columnChunkMetaData -> {
Statistics stats = columnChunkMetaData.getStatistics();
ValueMetadata valueMetadata =
ValueMetadata.getValueMetadata(columnChunkMetaData.getPrimitiveType(),
indexVersion);
+ // Normalize column name to canonical .list.element format
for consistent MDT storage
+ String canonicalColumnName =
convertArrayToListElement(columnChunkMetaData.getPath().toDotString());
return (HoodieColumnRangeMetadata<Comparable>)
HoodieColumnRangeMetadata.<Comparable>create(
filePath,
- columnChunkMetaData.getPath().toDotString(),
+ canonicalColumnName,
convertToNativeJavaType(
columnChunkMetaData.getPrimitiveType(),
stats.genericGetMin(),
@@ -549,4 +559,31 @@ public class ParquetUtils extends FileFormatUtils {
throw new UnsupportedOperationException(String.format("Unsupported value
type (%s)", val.getClass().getName()));
}
}
+
+ /**
+ * Converts Parquet array format (.array) to canonical format
(.list.element).
+ * Parquet files written by Spark use ".array" while Avro uses
".list.element".
+ * We normalize to ".list.element" as the canonical format for MDT storage.
+ *
+ * @param columnName the column name from Parquet metadata
+ * @return the column name with .array converted to .list.element
+ */
+ private static String convertArrayToListElement(String columnName) {
+ return columnName
+ .replace(HoodieSchema.PARQUET_ARRAY_SPARK + ".",
HoodieSchema.PARQUET_ARRAY_AVRO + ".")
+ .replace(HoodieSchema.PARQUET_ARRAY_SPARK,
HoodieSchema.PARQUET_ARRAY_AVRO);
+ }
+
+ /**
+ * Converts canonical format (.list.element) to Parquet array format
(.array).
+ * Used to match user-specified columns against Parquet files that use
".array" format.
+ *
+ * @param columnName the column name in canonical format
+ * @return the column name with .list.element converted to .array
+ */
+ private static String convertListElementToArray(String columnName) {
+ return columnName
+ .replace(HoodieSchema.PARQUET_ARRAY_AVRO + ".",
HoodieSchema.PARQUET_ARRAY_SPARK + ".")
+ .replace(HoodieSchema.PARQUET_ARRAY_AVRO,
HoodieSchema.PARQUET_ARRAY_SPARK);
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index 64abe3553cdb..bf3961744ef4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieConfig, TypedPro
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaCompatibility,
HoodieSchemaUtils => HoodieCommonSchemaUtils}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
import org.apache.hudi.internal.schema.InternalSchema
@@ -33,7 +33,8 @@ import
org.apache.hudi.internal.schema.convert.InternalSchemaConverter
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils
import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchemaRequirements
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
import java.util.Properties
@@ -46,22 +47,30 @@ import scala.collection.JavaConverters._
object HoodieSchemaUtils {
private val log = LoggerFactory.getLogger(getClass)
+ /**
+ * Gets a field (including nested fields) from the schema using dot notation.
+ * This method delegates to the consolidated implementation in
[[HoodieSchema.getNestedField]].
+ *
+ * Supports nested field access using dot notation. For example:
+ * - "name" - retrieves top-level field
+ * - "user.profile.displayName" - retrieves nested field
+ * - "items.list.element" - retrieves array element schema
+ * - "metadata.key_value.key" - retrieves map key schema
+ * - "metadata.key_value.value" - retrieves map value schema
+ *
+ * @param schema the Spark StructType to search in
+ * @param fieldName the field name (may contain dots for nested fields)
+ * @return Pair of canonical field name and the StructField
+ * @throws HoodieException if field is not found
+ */
def getSchemaForField(schema: StructType, fieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
- }
-
- def getSchemaForField(schema: StructType, fieldName: String, prefix:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- if (!(fieldName.contains("."))) {
- org.apache.hudi.common.util.collection.Pair.of(prefix +
schema.fields(schema.fieldIndex(fieldName)).name,
schema.fields(schema.fieldIndex(fieldName)))
- }
- else {
- val rootFieldIndex: Int = fieldName.indexOf(".")
- val rootField: StructField =
schema.fields(schema.fieldIndex(fieldName.substring(0, rootFieldIndex)))
- if (rootField == null) {
- throw new HoodieException("Failed to find " + fieldName + " in the
table schema ")
- }
- getSchemaForField(rootField.dataType.asInstanceOf[StructType],
fieldName.substring(rootFieldIndex + 1), prefix + fieldName.substring(0,
rootFieldIndex + 1))
- }
+ val result =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(schema,
"temp_schema", "")
+ .getNestedField(fieldName)
+ .orElseThrow(() => new HoodieException(s"Failed to find $fieldName in
the table schema"))
+ org.apache.hudi.common.util.collection.Pair.of(result.getLeft,
+ StructField(result.getRight.name(),
+
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(result.getRight.schema()),
+ result.getRight.isNullable))
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
index 2e14ad2c08fa..d1a60a02b80e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala
@@ -20,45 +20,117 @@
package org.apache.hudi.schema
import org.apache.hudi.HoodieSchemaUtils
+import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.types.{DataType, IntegerType, LongType,
StringType, StructField, StructType}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
/**
- * Tests {@link HoodieSparkSchemaUtils}
+ * Tests for {@link HoodieSchemaUtils#getSchemaForField}
*/
class TestHoodieSparkSchemaUtils {
- @Test
- def testGetSchemaField(): Unit = {
-
- val nestedStructType = StructType(
- StructField("nested_long", LongType, nullable = true) ::
- StructField("nested_int", IntegerType, nullable = true) ::
- StructField("nested_string", StringType, nullable = true) :: Nil)
-
- val schema = StructType(
- StructField("_row_key", StringType, nullable = true) ::
- StructField("first_name", StringType, nullable = false) ::
- StructField("last_name", StringType, nullable = true) ::
- StructField("nested_field", nestedStructType, nullable = true) ::
- StructField("timestamp", IntegerType, nullable = true) ::
- StructField("partition", IntegerType, nullable = true) :: Nil)
-
- assertFieldType(schema, "first_name", StringType)
- assertFieldType(schema, "timestamp", IntegerType)
-
- // test nested fields.
- assertFieldType(schema, "nested_field.nested_long", LongType)
- assertFieldType(schema, "nested_field.nested_int", IntegerType)
- assertFieldType(schema, "nested_field.nested_string", StringType)
+ // Test schemas
+ private val simpleSchema = StructType(
+ StructField("id", StringType) ::
+ StructField("name", StringType) ::
+ StructField("count", IntegerType) :: Nil)
+
+ private val nestedSchema = StructType(
+ StructField("id", StringType) ::
+ StructField("nested", StructType(
+ StructField("inner_string", StringType) ::
+ StructField("inner_int", IntegerType) :: Nil)) :: Nil)
+
+ private val arraySchema = StructType(
+ StructField("id", StringType) ::
+ StructField("items", ArrayType(StringType, containsNull = true)) :: Nil)
+
+ private val arrayOfStructSchema = StructType(
+ StructField("id", StringType) ::
+ StructField("items", ArrayType(StructType(
+ StructField("nested_int", IntegerType) ::
+ StructField("nested_string", StringType) :: Nil), containsNull =
true)) :: Nil)
+
+ private val mapSchema = StructType(
+ StructField("id", StringType) ::
+ StructField("metadata", MapType(StringType, IntegerType,
valueContainsNull = true)) :: Nil)
+
+ private val mapOfStructSchema = StructType(
+ StructField("id", StringType) ::
+ StructField("nested_map", MapType(StringType, StructType(
+ StructField("nested_int", IntegerType) ::
+ StructField("nested_string", StringType) :: Nil), valueContainsNull
= true)) :: Nil)
+
+ private val complexSchema = StructType(
+ StructField("id", StringType) ::
+ StructField("top", StructType(
+ StructField("nested_array", ArrayType(StructType(
+ StructField("inner_field", StringType) :: Nil), containsNull =
true)) ::
+ StructField("nested_map", MapType(StringType, IntegerType,
valueContainsNull = true)) :: Nil)) :: Nil)
+
+ private def getSchema(name: String): StructType = name match {
+ case "simple" => simpleSchema
+ case "nested" => nestedSchema
+ case "array" => arraySchema
+ case "arrayOfStruct" => arrayOfStructSchema
+ case "map" => mapSchema
+ case "mapOfStruct" => mapOfStructSchema
+ case "complex" => complexSchema
}
- def assertFieldType(schema: StructType, fieldName: String, expectedDataType:
DataType): Unit = {
- val fieldNameSchemaPair = HoodieSchemaUtils.getSchemaForField(schema,
fieldName)
- assertEquals(fieldName, fieldNameSchemaPair.getKey)
- assertEquals(expectedDataType, fieldNameSchemaPair.getValue.dataType)
+ @ParameterizedTest
+ @CsvSource(Array(
+ // Simple field tests
+ "simple, id, id,
string",
+ "simple, name, name,
string",
+ "simple, count, count,
int",
+ // Nested struct field tests
+ "nested, nested.inner_string,
nested.inner_string, string",
+ "nested, nested.inner_int,
nested.inner_int, int",
+ // Array element access using .list.element
+ "array, items.list.element,
items.list.element, string",
+ // Array of struct - access nested fields within array elements
+ "arrayOfStruct, items.list.element.nested_int,
items.list.element.nested_int, int",
+ "arrayOfStruct, items.list.element.nested_string,
items.list.element.nested_string, string",
+ // Map key/value access using .key_value.key and .key_value.value
+ "map, metadata.key_value.key,
metadata.key_value.key, string",
+ "map, metadata.key_value.value,
metadata.key_value.value, int",
+ // Map of struct - access nested fields within map values
+ "mapOfStruct, nested_map.key_value.value.nested_int,
nested_map.key_value.value.nested_int, int",
+ "mapOfStruct, nested_map.key_value.value.nested_string,
nested_map.key_value.value.nested_string, string",
+ // Complex nested: struct -> array -> struct
+ "complex, top.nested_array.list.element.inner_field,
top.nested_array.list.element.inner_field, string",
+ // Complex nested: struct -> map
+ "complex, top.nested_map.key_value.key,
top.nested_map.key_value.key, string",
+ "complex, top.nested_map.key_value.value,
top.nested_map.key_value.value, int"
+ ))
+ def testGetSchemaForField(schemaName: String, inputPath: String,
expectedKey: String, expectedType: String): Unit = {
+ val schema = getSchema(schemaName.trim)
+ val result = HoodieSchemaUtils.getSchemaForField(schema, inputPath.trim)
+ assertEquals(expectedKey.trim, result.getKey)
+ assertEquals(expectedType.trim, result.getValue.dataType.simpleString)
}
+ @Test
+ def testInvalidPaths(): Unit = {
+ // Invalid array paths
+ assertThrows(classOf[HoodieException], () =>
+ HoodieSchemaUtils.getSchemaForField(arraySchema, "items.wrong.path"))
+ assertThrows(classOf[HoodieException], () =>
+ HoodieSchemaUtils.getSchemaForField(arraySchema, "items.list.missing"))
+
+ // Invalid map paths
+ assertThrows(classOf[HoodieException], () =>
+ HoodieSchemaUtils.getSchemaForField(mapSchema, "metadata.wrong.path"))
+ assertThrows(classOf[HoodieException], () =>
+ HoodieSchemaUtils.getSchemaForField(mapSchema,
"metadata.key_value.key.invalid"))
+
+ // Non-existent field
+ assertThrows(classOf[HoodieException], () =>
+ HoodieSchemaUtils.getSchemaForField(simpleSchema, "nonexistent"))
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 5b6337189737..a3e0c5701b81 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -18,7 +18,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport,
DataSourceWriteOptions, HoodieSchemaConversionUtils}
+import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport,
DataSourceReadOptions, DataSourceWriteOptions, HoodieSchemaConversionUtils}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD,
RECORDKEY_FIELD}
import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.{And,
AttributeReference, Great
import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.types._
import org.junit.jupiter.api._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals,
assertNotNull, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource}
@@ -259,6 +259,381 @@ class TestColumnStatsIndex extends
ColumnStatIndexTestBase {
addNestedFiled = true)
}
+ /**
+ * Tests data skipping with nested MAP and ARRAY fields in column stats
index.
+ * This test verifies that queries can efficiently skip files based on
nested field values
+ * within MAP and ARRAY types using the new Parquet-style accessor patterns.
+ */
+ @ParameterizedTest
+ @MethodSource(Array("testMetadataColumnStatsIndexParamsInMemory"))
+ def testMetadataColumnStatsIndexNestedMapArrayDataSkipping(testCase:
ColumnStatsTestCase): Unit = {
+ // Define nested struct schema
+ val nestedSchema = new StructType()
+ .add("nested_int", IntegerType, false)
+ .add("level", StringType, true)
+
+ // Define full schema with MAP and ARRAY of nested structs
+ val testSchema = new StructType()
+ .add("record_key", StringType, false)
+ .add("partition_col", IntegerType, false)
+ .add("nullable_map_field", MapType(StringType, nestedSchema), true)
+ .add("array_field", ArrayType(nestedSchema), false)
+
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key ->
+
"record_key,partition_col,nullable_map_field.key_value.value.nested_int,nullable_map_field.key_value.value.level,array_field.list.element.nested_int,array_field.list.element.level"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_nested_skipping",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "record_key",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "record_key",
+ PARTITIONPATH_FIELD.key -> "partition_col",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> "10240",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
+ ) ++ metadataOpts
+
+ // Batch 1 - Low Range Values (should be skipped when querying for
nested_int > 200)
+ val batch1Data = Seq(
+ Row("key_001", 1,
+ Map("item1" -> Row(50, "low"), "item2" -> Row(75, "low")),
+ Array(Row(60, "low"), Row(80, "low"))),
+ Row("key_002", 1,
+ Map("item1" -> Row(30, "low"), "item2" -> Row(90, "low")),
+ Array(Row(40, "low"), Row(70, "low")))
+ )
+
+ // Batch 2 - High Range MAP and ARRAY (should be read when querying for
nested_int > 200)
+ val batch2Data = Seq(
+ Row("key_003", 1,
+ Map("item1" -> Row(250, "high"), "item2" -> Row(275, "high")),
+ Array(Row(260, "high"), Row(280, "high"))),
+ Row("key_004", 1,
+ Map("item1" -> Row(230, "high"), "item2" -> Row(290, "high")),
+ Array(Row(240, "high"), Row(270, "high")))
+ )
+
+ // Batch 3 - Mixed Range (low MAP, high ARRAY)
+ val batch3Data = Seq(
+ Row("key_005", 2,
+ Map("item1" -> Row(50, "mixed"), "item2" -> Row(75, "mixed")),
+ Array(Row(260, "high"), Row(280, "high"))),
+ Row("key_006", 2,
+ Map("item1" -> Row(30, "mixed"), "item2" -> Row(90, "mixed")),
+ Array(Row(240, "high"), Row(270, "high")))
+ )
+
+ // Write Batch 1
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(batch1Data), testSchema)
+ df1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ // Write Batch 2
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(batch2Data), testSchema)
+ df2.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // Write Batch 3
+ val df3 =
spark.createDataFrame(spark.sparkContext.parallelize(batch3Data), testSchema)
+ df3.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+
+ // Query options with data skipping enabled
+ val queryOpts = Map(
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ ) ++ metadataOpts
+
+ // Query 1: Filter on MAP nested_int (high range)
+ val resultDf1 = spark.read.format("hudi")
+ .options(queryOpts)
+ .load(basePath)
+ .filter("EXISTS(map_values(nullable_map_field), v -> v.nested_int >
200)")
+
+ // Expected: 2 records from batch2 (key_003, key_004)
+ assertArrayEquals(
+ Array[Object]("key_003", "key_004"),
+
resultDf1.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+ )
+
+ // Query 2: Filter on ARRAY nested_int (high range)
+ val resultDf2 = spark.read.format("hudi")
+ .options(queryOpts)
+ .load(basePath)
+ .filter("EXISTS(array_field, elem -> elem.nested_int > 200)")
+
+ // Expected: 4 records from batch2 and batch3 (key_003, key_004, key_005,
key_006)
+ assertArrayEquals(
+ Array[Object]("key_003", "key_004", "key_005", "key_006"),
+
resultDf2.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+ )
+
+ // Query 3: Filter on MAP level field (string)
+ val resultDf3 = spark.read.format("hudi")
+ .options(queryOpts)
+ .load(basePath)
+ .filter("EXISTS(map_values(nullable_map_field), v -> v.level = 'high')")
+
+ // Expected: 2 records from batch2
+ assertArrayEquals(
+ Array[Object]("key_003", "key_004"),
+
resultDf3.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+ )
+
+ // Query 4: Combined filter (both MAP and ARRAY conditions)
+ val resultDf4 = spark.read.format("hudi")
+ .options(queryOpts)
+ .load(basePath)
+ .filter("EXISTS(map_values(nullable_map_field), v -> v.nested_int > 200)
" +
+ "AND EXISTS(array_field, elem -> elem.nested_int > 200)")
+
+ // Expected: 2 records from batch2 only (both MAP and ARRAY have high
values)
+ assertArrayEquals(
+ Array[Object]("key_003", "key_004"),
+
resultDf4.select("record_key").collect().map(_.getString(0)).sorted.asInstanceOf[Array[Object]]
+ )
+
+ // Validate column stats were created for nested fields
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(toProperties(metadataOpts))
+ .build()
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(testSchema,
"record", "")
+ val columnStatsIndex = new ColumnStatsIndexSupport(
+ spark,
+ testSchema,
+ hoodieSchema,
+ metadataConfig,
+ metaClient
+ )
+
+ val indexedColumns = Seq(
+ "nullable_map_field.key_value.value.nested_int",
+ "array_field.list.element.nested_int"
+ )
+
+ columnStatsIndex.loadTransposed(indexedColumns,
testCase.shouldReadInMemory) { transposedDF =>
+ // Verify we have stats for all 3 file groups (may have more files for
MOR due to updates)
+ val fileCount = transposedDF.select("fileName").distinct().count()
+ assertTrue(fileCount >= 3, s"Expected at least 3 files with column
stats, got $fileCount")
+
+ // Verify min/max ranges for MAP field
+ val mapStats = transposedDF.select(
+ "`nullable_map_field.key_value.value.nested_int_minValue`",
+ "`nullable_map_field.key_value.value.nested_int_maxValue`"
+ ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+ // Expected stats: Batch1[30,90], Batch2[230,290], Batch3[30,90]
+ // We should have exactly one file with [230,290] (high range) and at
least two with [30,90] (low range)
+ val mapHighRangeCount = mapStats.count(stat => stat._1 == 230 && stat._2
== 290)
+ val mapLowRangeCount = mapStats.count(stat => stat._1 == 30 && stat._2
== 90)
+ assertEquals(1, mapHighRangeCount, "Expected exactly 1 file with MAP
range [230,290]")
+ assertTrue(mapLowRangeCount >= 2, s"Expected at least 2 files with MAP
range [30,90], got $mapLowRangeCount")
+
+ // Verify min/max ranges for ARRAY field
+ val arrayStats = transposedDF.select(
+ "`array_field.list.element.nested_int_minValue`",
+ "`array_field.list.element.nested_int_maxValue`"
+ ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+ // Expected stats: Batch1[40,80], Batch2[260,280], Batch3[240,280]
+ // We should have exactly one file with [40,80] (low range) and at least
two with high ranges
+ val arrayLowRangeCount = arrayStats.count(stat => stat._1 == 40 &&
stat._2 == 80)
+ val arrayHighRangeCount = arrayStats.count(stat => stat._1 >= 240 &&
stat._2 == 280)
+ assertEquals(1, arrayLowRangeCount, "Expected exactly 1 file with ARRAY
range [40,80]")
+ assertTrue(arrayHighRangeCount >= 2, s"Expected at least 2 files with
ARRAY high ranges, got $arrayHighRangeCount")
+ }
+ // Validate that indexed columns are registered correctly
+ validateColumnsToIndex(metaClient, Seq(
+ HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
+ HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+ "record_key",
+ "partition_col",
+ "nullable_map_field.key_value.value.nested_int",
+ "nullable_map_field.key_value.value.level",
+ "array_field.list.element.nested_int",
+ "array_field.list.element.level"
+ ))
+ }
+
+ /**
+ * Tests column stats index with multi-level nested ARRAY fields (2 and 3
levels deep).
+ * This verifies that the .list.element pattern works correctly for deeply
nested arrays.
+ */
+ @ParameterizedTest
+ @MethodSource(Array("testMetadataColumnStatsIndexParamsInMemory"))
+ def testMetadataColumnStatsIndexMultiLevelNestedArrays(testCase:
ColumnStatsTestCase): Unit = {
+ // Define nested struct for innermost level
+ val innerStruct = new StructType()
+ .add("value", IntegerType, false)
+
+ // Define schema with 2-level and 3-level nested arrays
+ val testSchema = new StructType()
+ .add("record_key", StringType, false)
+ .add("partition_col", IntegerType, false)
+ // 2-level: Array of Array of Int
+ .add("level2_array", ArrayType(ArrayType(IntegerType)), false)
+ // 3-level: Array of Array of Array of Int
+ .add("level3_array", ArrayType(ArrayType(ArrayType(IntegerType))), false)
+ // 2-level array of struct: Array of Array of Struct
+ .add("level2_array_struct", ArrayType(ArrayType(innerStruct)), false)
+
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key ->
+ ("record_key,partition_col," +
+ "level2_array.list.element.list.element," +
+ "level3_array.list.element.list.element.list.element," +
+ "level2_array_struct.list.element.list.element.value")
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_multi_level_arrays",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "record_key",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "record_key",
+ PARTITIONPATH_FIELD.key -> "partition_col",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
+ HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key -> "10240",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key() ->
testCase.tableVersion.toString
+ ) ++ metadataOpts
+
+ // Batch 1 - Low range values
+ val batch1Data = Seq(
+ Row("key_001", 1,
+ Array(Array(10, 20), Array(30, 40)), // level2_array:
min=10, max=40
+ Array(Array(Array(5, 10), Array(15, 20))), // level3_array: min=5,
max=20
+ Array(Array(Row(100), Row(150)))), //
level2_array_struct.value: min=100, max=150
+ Row("key_002", 1,
+ Array(Array(15, 25), Array(35, 45)), // level2_array:
min=15, max=45
+ Array(Array(Array(8, 12), Array(18, 22))), // level3_array: min=8,
max=22
+ Array(Array(Row(110), Row(160)))) //
level2_array_struct.value: min=110, max=160
+ )
+
+ // Batch 2 - High range values
+ val batch2Data = Seq(
+ Row("key_003", 1,
+ Array(Array(500, 600), Array(700, 800)), // level2_array:
min=500, max=800
+ Array(Array(Array(300, 350), Array(400, 450))), // level3_array:
min=300, max=450
+ Array(Array(Row(1000), Row(1500)))), //
level2_array_struct.value: min=1000, max=1500
+ Row("key_004", 1,
+ Array(Array(550, 650), Array(750, 850)), // level2_array:
min=550, max=850
+ Array(Array(Array(320, 370), Array(420, 470))), // level3_array:
min=320, max=470
+ Array(Array(Row(1100), Row(1600)))) //
level2_array_struct.value: min=1100, max=1600
+ )
+
+ // Write Batch 1
+ val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(batch1Data), testSchema)
+ df1.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ // Write Batch 2
+ val df2 =
spark.createDataFrame(spark.sparkContext.parallelize(batch2Data), testSchema)
+ df2.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+
+ // Validate column stats were created for multi-level nested fields
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(toProperties(metadataOpts))
+ .build()
+
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(testSchema,
"record", "")
+ val columnStatsIndex = new ColumnStatsIndexSupport(
+ spark,
+ testSchema,
+ hoodieSchema,
+ metadataConfig,
+ metaClient
+ )
+
+ val indexedColumns = Seq(
+ "level2_array.list.element.list.element",
+ "level3_array.list.element.list.element.list.element",
+ "level2_array_struct.list.element.list.element.value"
+ )
+
+ columnStatsIndex.loadTransposed(indexedColumns,
testCase.shouldReadInMemory) { transposedDF =>
+ // Verify we have stats for both file groups
+ val fileCount = transposedDF.select("fileName").distinct().count()
+ assertTrue(fileCount >= 2, s"Expected at least 2 files with column
stats, got $fileCount")
+
+ // Verify 2-level array stats
+ val level2Stats = transposedDF.select(
+ "`level2_array.list.element.list.element_minValue`",
+ "`level2_array.list.element.list.element_maxValue`"
+ ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+ val level2LowCount = level2Stats.count(stat => stat._1 >= 10 && stat._2
<= 50)
+ val level2HighCount = level2Stats.count(stat => stat._1 >= 500 &&
stat._2 >= 800)
+ assertTrue(level2LowCount >= 1, s"Expected at least 1 file with level2
low range, got $level2LowCount")
+ assertTrue(level2HighCount >= 1, s"Expected at least 1 file with level2
high range, got $level2HighCount")
+
+ // Verify 3-level array stats
+ val level3Stats = transposedDF.select(
+ "`level3_array.list.element.list.element.list.element_minValue`",
+ "`level3_array.list.element.list.element.list.element_maxValue`"
+ ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+ val level3LowCount = level3Stats.count(stat => stat._1 >= 5 && stat._2
<= 25)
+ val level3HighCount = level3Stats.count(stat => stat._1 >= 300 &&
stat._2 >= 450)
+ assertTrue(level3LowCount >= 1, s"Expected at least 1 file with level3
low range, got $level3LowCount")
+ assertTrue(level3HighCount >= 1, s"Expected at least 1 file with level3
high range, got $level3HighCount")
+
+ // Verify 2-level array of struct stats
+ val level2StructStats = transposedDF.select(
+ "`level2_array_struct.list.element.list.element.value_minValue`",
+ "`level2_array_struct.list.element.list.element.value_maxValue`"
+ ).collect().map(row => (row.getInt(0), row.getInt(1))).sorted
+
+ val structLowCount = level2StructStats.count(stat => stat._1 >= 100 &&
stat._2 <= 200)
+ val structHighCount = level2StructStats.count(stat => stat._1 >= 1000 &&
stat._2 >= 1500)
+ assertTrue(structLowCount >= 1, s"Expected at least 1 file with struct
low range, got $structLowCount")
+ assertTrue(structHighCount >= 1, s"Expected at least 1 file with struct
high range, got $structHighCount")
+ }
+
+ // Validate that indexed columns are registered correctly
+ validateColumnsToIndex(metaClient, Seq(
+ HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
+ HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+ "record_key",
+ "partition_col",
+ "level2_array.list.element.list.element",
+ "level3_array.list.element.list.element.list.element",
+ "level2_array_struct.list.element.list.element.value"
+ ))
+ }
+
@ParameterizedTest
@MethodSource(Array("testTableTypePartitionTypeParams"))
def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String, tableVersion: Int): Unit = {