voonhous commented on code in PR #17694:
URL: https://github.com/apache/hudi/pull/17694#discussion_r2674963220


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
 object HoodieSchemaUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def getSchemaForField(schema: StructType, fieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private final val ARRAY_LIST = "list"
+  private final val ARRAY_ELEMENT = "element"
+  private final val ARRAY_SPARK = "array" // Spark writer uses this
+  private final val MAP_KEY_VALUE = "key_value"
+  private final val MAP_KEY = "key"
+  private final val MAP_VALUE = "value"
+
+  private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+  private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+  private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+  /**
+   * Advances offset past a component name in the path, handling end-of-path 
and dot separator.
+   *
+   * @param path      the full path string
+   * @param offset    current position in path
+   * @param component the component name to match (e.g., "element", "key", 
"value")
+   * @return new offset after component and dot, or path.length() if at end, 
or -1 if no match
+   */
+  private def getNextOffset(path: String, offset: Int, component: String): Int 
= {
+    if (!path.regionMatches(offset, component, 0, component.length)) {
+      -1
+    } else {
+      val next = offset + component.length
+      if (next == path.length) {
+        next
+      } else {
+        if (path.charAt(next) == '.') next + 1 else -1
+      }
+    }
   }
 
-  def getSchemaForField(schema: StructType, fieldName: String, prefix: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    if (!(fieldName.contains("."))) {
-      org.apache.hudi.common.util.collection.Pair.of(prefix + 
schema.fields(schema.fieldIndex(fieldName)).name, 
schema.fields(schema.fieldIndex(fieldName)))
+  /**
+   * Handles navigation into ARRAY types using the Avro ".list.element" 
pattern.
+   *
+   * @param arrayType  the ArrayType to navigate into
+   * @param fullPath   the full field path string
+   * @param offset     current position in fullPath (should point to start of 
"list")
+   * @param prefix     the accumulated field path prefix
+   * @param rootFieldName the name of the array field
+   * @return Pair of canonical field name and the StructField
+   */
+  private def handleArrayNavigationAvro(arrayType: ArrayType, fullPath: 
String, offset: Int,
+                                        prefix: String, rootFieldName: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+    val elementType = arrayType.elementType
+    val listOffset = getNextOffset(fullPath, offset, ARRAY_LIST)
+
+    if (listOffset == -1) {
+      throw new HoodieException(s"Array field requires 
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+    }
+
+    val elementOffset = getNextOffset(fullPath, listOffset, ARRAY_ELEMENT)
+    if (elementOffset == -1) {
+      throw new HoodieException(s"Array field requires 
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+    }
+
+    if (elementOffset == fullPath.length) {
+      // Terminal case: just accessing the element type
+      val elementField = StructField(ARRAY_ELEMENT, elementType, 
arrayType.containsNull)
+      org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName + 
"." + ARRAY_LIST_ELEMENT, elementField)
+    } else if (elementType.isInstanceOf[StructType]) {
+      // Recursive case: accessing fields within array elements
+      getSchemaForFieldInternal(elementType.asInstanceOf[StructType], 
fullPath, elementOffset,
+        prefix + rootFieldName + "." + ARRAY_LIST_ELEMENT + ".")
+    } else {
+      throw new HoodieException(s"Invalid array navigation pattern: $fullPath")
+    }
+  }
+
+  /**
+   * Handles navigation into ARRAY types using the Spark ".array" pattern.
+   *
+   * @param arrayType  the ArrayType to navigate into
+   * @param fullPath   the full field path string
+   * @param offset     current position in fullPath (should point to start of 
"array")
+   * @param prefix     the accumulated field path prefix
+   * @param rootFieldName the name of the array field
+   * @return Pair of canonical field name and the StructField
+   */
+  private def handleArrayNavigationSpark(arrayType: ArrayType, fullPath: 
String, offset: Int,
+                                         prefix: String, rootFieldName: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+    val elementType = arrayType.elementType
+    val sparkArrayOffset = getNextOffset(fullPath, offset, ARRAY_SPARK)
+
+    if (sparkArrayOffset == -1) {
+      throw new HoodieException(s"Array field requires .$ARRAY_SPARK accessor 
pattern: $fullPath")
+    }
+
+    if (sparkArrayOffset == fullPath.length) {
+      // Terminal case: just accessing the element type
+      val elementField = StructField(ARRAY_SPARK, elementType, 
arrayType.containsNull)
+      org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName + 
"." + ARRAY_SPARK, elementField)
+    } else if (elementType.isInstanceOf[StructType]) {
+      // Recursive case: accessing fields within array elements
+      getSchemaForFieldInternal(elementType.asInstanceOf[StructType], 
fullPath, sparkArrayOffset,

Review Comment:
   Let's normalize the returned path string. We are returning the `Spark-style` 
path. We should map this to the Avro-style path so the stored index key is 
consistent across engines?
   
   Since we are supporting 2 formats/structures, i feel we should pick one, and 
normalize all others into one. 
   
   After which, we can add a test case to verify that `items.array.id` returns 
`items.list.element.id`, where `items.list.element.id` is the canonical path.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
 object HoodieSchemaUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def getSchemaForField(schema: StructType, fieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private final val ARRAY_LIST = "list"
+  private final val ARRAY_ELEMENT = "element"
+  private final val ARRAY_SPARK = "array" // Spark writer uses this
+  private final val MAP_KEY_VALUE = "key_value"
+  private final val MAP_KEY = "key"
+  private final val MAP_VALUE = "value"
+
+  private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+  private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+  private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+  /**
+   * Advances offset past a component name in the path, handling end-of-path 
and dot separator.
+   *
+   * @param path      the full path string
+   * @param offset    current position in path
+   * @param component the component name to match (e.g., "element", "key", 
"value")
+   * @return new offset after component and dot, or path.length() if at end, 
or -1 if no match
+   */
+  private def getNextOffset(path: String, offset: Int, component: String): Int 
= {

Review Comment:
   Possible for us to import this directly from java?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
 object HoodieSchemaUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def getSchemaForField(schema: StructType, fieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private final val ARRAY_LIST = "list"
+  private final val ARRAY_ELEMENT = "element"
+  private final val ARRAY_SPARK = "array" // Spark writer uses this
+  private final val MAP_KEY_VALUE = "key_value"
+  private final val MAP_KEY = "key"
+  private final val MAP_VALUE = "value"
+
+  private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+  private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+  private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE

Review Comment:
   These these are reserved keywords now. Let's include a test case where users 
are using these reserve keywords as column/field names.
   
   For example:
   ```
   // A struct containing an array of structs, where the inner struct has a 
field named "array"
   StructField("items", ArrayType(
     StructType(Seq(
       StructField("array", StringType) // <--- Field named "array"
     ))
   ))
   ```
   
   We need to ensure that `items.array.array` is properly handled. The test 
will most likely pass as we are only doing special handling for `ArrayTypes`, 
but let's err on the side of caution to guard against this.
   
   Let's also apply this to the other reserved keywords.
   
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
 object HoodieSchemaUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def getSchemaForField(schema: StructType, fieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private final val ARRAY_LIST = "list"
+  private final val ARRAY_ELEMENT = "element"
+  private final val ARRAY_SPARK = "array" // Spark writer uses this
+  private final val MAP_KEY_VALUE = "key_value"
+  private final val MAP_KEY = "key"
+  private final val MAP_VALUE = "value"
+
+  private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+  private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+  private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+  /**
+   * Advances offset past a component name in the path, handling end-of-path 
and dot separator.
+   *
+   * @param path      the full path string
+   * @param offset    current position in path
+   * @param component the component name to match (e.g., "element", "key", 
"value")
+   * @return new offset after component and dot, or path.length() if at end, 
or -1 if no match
+   */
+  private def getNextOffset(path: String, offset: Int, component: String): Int 
= {
+    if (!path.regionMatches(offset, component, 0, component.length)) {
+      -1
+    } else {
+      val next = offset + component.length
+      if (next == path.length) {
+        next
+      } else {
+        if (path.charAt(next) == '.') next + 1 else -1
+      }
+    }
   }
 
-  def getSchemaForField(schema: StructType, fieldName: String, prefix: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    if (!(fieldName.contains("."))) {
-      org.apache.hudi.common.util.collection.Pair.of(prefix + 
schema.fields(schema.fieldIndex(fieldName)).name, 
schema.fields(schema.fieldIndex(fieldName)))
+  /**
+   * Handles navigation into ARRAY types using the Avro ".list.element" 
pattern.
+   *
+   * @param arrayType  the ArrayType to navigate into
+   * @param fullPath   the full field path string
+   * @param offset     current position in fullPath (should point to start of 
"list")
+   * @param prefix     the accumulated field path prefix
+   * @param rootFieldName the name of the array field
+   * @return Pair of canonical field name and the StructField
+   */
+  private def handleArrayNavigationAvro(arrayType: ArrayType, fullPath: 
String, offset: Int,
+                                        prefix: String, rootFieldName: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+    val elementType = arrayType.elementType
+    val listOffset = getNextOffset(fullPath, offset, ARRAY_LIST)
+
+    if (listOffset == -1) {
+      throw new HoodieException(s"Array field requires 
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+    }
+
+    val elementOffset = getNextOffset(fullPath, listOffset, ARRAY_ELEMENT)
+    if (elementOffset == -1) {
+      throw new HoodieException(s"Array field requires 
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+    }
+
+    if (elementOffset == fullPath.length) {
+      // Terminal case: just accessing the element type
+      val elementField = StructField(ARRAY_ELEMENT, elementType, 
arrayType.containsNull)
+      org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName + 
"." + ARRAY_LIST_ELEMENT, elementField)
+    } else if (elementType.isInstanceOf[StructType]) {
+      // Recursive case: accessing fields within array elements
+      getSchemaForFieldInternal(elementType.asInstanceOf[StructType], 
fullPath, elementOffset,
+        prefix + rootFieldName + "." + ARRAY_LIST_ELEMENT + ".")
+    } else {
+      throw new HoodieException(s"Invalid array navigation pattern: $fullPath")
+    }
+  }
+
+  /**
+   * Handles navigation into ARRAY types using the Spark ".array" pattern.
+   *
+   * @param arrayType  the ArrayType to navigate into
+   * @param fullPath   the full field path string
+   * @param offset     current position in fullPath (should point to start of 
"array")
+   * @param prefix     the accumulated field path prefix
+   * @param rootFieldName the name of the array field
+   * @return Pair of canonical field name and the StructField
+   */
+  private def handleArrayNavigationSpark(arrayType: ArrayType, fullPath: 
String, offset: Int,
+                                         prefix: String, rootFieldName: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+    val elementType = arrayType.elementType
+    val sparkArrayOffset = getNextOffset(fullPath, offset, ARRAY_SPARK)
+
+    if (sparkArrayOffset == -1) {
+      throw new HoodieException(s"Array field requires .$ARRAY_SPARK accessor 
pattern: $fullPath")
+    }
+
+    if (sparkArrayOffset == fullPath.length) {
+      // Terminal case: just accessing the element type
+      val elementField = StructField(ARRAY_SPARK, elementType, 
arrayType.containsNull)
+      org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName + 
"." + ARRAY_SPARK, elementField)
+    } else if (elementType.isInstanceOf[StructType]) {
+      // Recursive case: accessing fields within array elements
+      getSchemaForFieldInternal(elementType.asInstanceOf[StructType], 
fullPath, sparkArrayOffset,
+        prefix + rootFieldName + "." + ARRAY_SPARK + ".")
+    } else {
+      throw new HoodieException(s"Invalid array navigation pattern: $fullPath")
+    }
+  }
+
+  /**
+   * Handles navigation into MAP types using the Parquet-style 
".key_value.key" or ".key_value.value" patterns.
+   *
+   * @param mapType    the MapType to navigate into
+   * @param fullPath   the full field path string
+   * @param offset     current position in fullPath (should point to start of 
"key_value")
+   * @param prefix     the accumulated field path prefix
+   * @param rootFieldName the name of the map field
+   * @return Pair of canonical field name and the StructField
+   */
+  private def handleMapNavigation(mapType: MapType, fullPath: String, offset: 
Int,
+                                   prefix: String, rootFieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+    val keyValueOffset = getNextOffset(fullPath, offset, MAP_KEY_VALUE)
+    if (keyValueOffset == -1) {
+      throw new HoodieException(s"Invalid map navigation pattern: $fullPath. 
Expected .$MAP_KEY_VALUE_KEY or .$MAP_KEY_VALUE_VALUE")
     }
-    else {
-      val rootFieldIndex: Int = fieldName.indexOf(".")
-      val rootField: StructField = 
schema.fields(schema.fieldIndex(fieldName.substring(0, rootFieldIndex)))
+
+    // Check for .key pattern
+    val keyOffset = getNextOffset(fullPath, keyValueOffset, MAP_KEY)
+    if (keyOffset != -1) {
+      // Accessing map keys (always the key type)
+      val keyField = StructField(MAP_KEY, mapType.keyType, false)
+      if (keyOffset == fullPath.length) {
+        org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName 
+ "." + MAP_KEY_VALUE_KEY, keyField)
+      } else {
+        throw new HoodieException(s"Cannot navigate beyond map key: $fullPath")
+      }
+    } else {
+      // Check for .value pattern
+      val valueOffset = getNextOffset(fullPath, keyValueOffset, MAP_VALUE)
+      if (valueOffset == -1) {
+        throw new HoodieException(s"Invalid map navigation pattern: $fullPath. 
Expected .$MAP_KEY_VALUE_KEY or .$MAP_KEY_VALUE_VALUE")
+      }
+
+      val valueType = mapType.valueType
+      if (valueOffset == fullPath.length) {
+        // Terminal case: just accessing the value type
+        val valueField = StructField(MAP_VALUE, valueType, 
mapType.valueContainsNull)
+        org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName 
+ "." + MAP_KEY_VALUE_VALUE, valueField)
+      } else if (valueType.isInstanceOf[StructType]) {
+        // Recursive case: accessing fields within map values
+        getSchemaForFieldInternal(valueType.asInstanceOf[StructType], 
fullPath, valueOffset,
+          prefix + rootFieldName + "." + MAP_KEY_VALUE_VALUE + ".")
+      } else {
+        throw new HoodieException(s"Invalid map value navigation pattern: 
$fullPath")
+      }
+    }
+  }
+
+  /**
+   * Internal helper method for retrieving nested fields using offset-based 
navigation.
+   * This method uses offset arithmetic to navigate through the path string 
instead of
+   * creating intermediate substring objects, improving performance and memory 
efficiency.
+   *
+   * @param schema   the schema to search in
+   * @param fullPath the full field path string
+   * @param offset   current position in fullPath
+   * @param prefix   the accumulated field path prefix
+   * @return Pair of canonical field name and the StructField
+   * @throws HoodieException if field is not found or navigation pattern is 
invalid
+   */
+  private def getSchemaForFieldInternal(schema: StructType, fullPath: String, 
offset: Int, prefix: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {

Review Comment:
   We can make this code less java-like and more scala-like. Scala's `match` 
expression is more readable and safer.
   
   ```scala
   private def getSchemaForFieldInternal(schema: StructType, fullPath: String, 
offset: Int, prefix: String): Pair[String, StructField] = {
     val nextDot = fullPath.indexOf('.', offset)
     
     // Terminal case
     if (nextDot == -1) {
       val field = schema(fullPath.substring(offset)) // StructType.apply 
handles lookup
       return Pair.of(prefix + field.name, field)
     }
   
     val rootFieldName = fullPath.substring(offset, nextDot)
     val rootField = schema(rootFieldName)
     val nextOffset = nextDot + 1
   
     rootField.dataType match {
       case at: ArrayType =>
         // Cleanly separating Spark vs Avro logic using Pattern Matching
         if (isNextToken(fullPath, nextOffset, SPARK_ARRAY_ACCESSOR)) {
            handleArraySpark(at, fullPath, nextOffset, prefix, rootFieldName)
         } else if (isNextToken(fullPath, nextOffset, ARRAY_LIST_FIELD_NAME)) {
            handleArrayAvro(at, fullPath, nextOffset, prefix, rootFieldName)
         } else {
            throw new HoodieException(s"Invalid array path: $fullPath")
         }
   
       case mt: MapType =>
         handleMapNavigation(mt, fullPath, nextOffset, prefix, rootFieldName)
   
       case st: StructType =>
         getSchemaForFieldInternal(st, fullPath, nextOffset, prefix + 
rootFieldName + ".")
   
       case _ =>
         throw new HoodieException(s"Unsupported type: ${rootField.dataType}")
     }
   }
   ```
   
   Above is just an example.



##########
hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala:
##########
@@ -55,6 +56,177 @@ class TestHoodieSparkSchemaUtils {
     assertFieldType(schema, "nested_field.nested_string", StringType)
   }
 
+  @Test
+  def testGetNestedFieldWithArraySpark(): Unit = {

Review Comment:
   We can parameterize the tests here, instead of writing so many tests, we can 
make it more maintainable like this:
   
   ```scala
   @ParameterizedTest
   @CsvSource(value = Array(
     "items.array,             items.list.element,       string", // Input 
Spark -> Expect Avro Key
     "items.list.element,      items.list.element,       string", // Input Avro 
 -> Expect Avro Key
     "items.array.nested,      items.list.element.nested, int",   // Nested 
Spark
     "items.list.element.nested, items.list.element.nested, int"  // Nested Avro
   ))
   def testPathNormalization(inputPath: String, expectedKey: String, 
expectedType: String): Unit = {
     val result = HoodieSchemaUtils.getSchemaForField(complexSchema, inputPath)
     assertEquals(expectedKey, result.getKey) // Verifies normalization!
     assertEquals(expectedType, result.getValue.dataType.simpleString)
   }
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
 object HoodieSchemaUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def getSchemaForField(schema: StructType, fieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private final val ARRAY_LIST = "list"
+  private final val ARRAY_ELEMENT = "element"
+  private final val ARRAY_SPARK = "array" // Spark writer uses this
+  private final val MAP_KEY_VALUE = "key_value"
+  private final val MAP_KEY = "key"
+  private final val MAP_VALUE = "value"
+
+  private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+  private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+  private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+  /**
+   * Advances offset past a component name in the path, handling end-of-path 
and dot separator.
+   *
+   * @param path      the full path string
+   * @param offset    current position in path
+   * @param component the component name to match (e.g., "element", "key", 
"value")
+   * @return new offset after component and dot, or path.length() if at end, 
or -1 if no match
+   */
+  private def getNextOffset(path: String, offset: Int, component: String): Int 
= {
+    if (!path.regionMatches(offset, component, 0, component.length)) {
+      -1
+    } else {
+      val next = offset + component.length
+      if (next == path.length) {
+        next
+      } else {
+        if (path.charAt(next) == '.') next + 1 else -1
+      }
+    }
   }
 
-  def getSchemaForField(schema: StructType, fieldName: String, prefix: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    if (!(fieldName.contains("."))) {
-      org.apache.hudi.common.util.collection.Pair.of(prefix + 
schema.fields(schema.fieldIndex(fieldName)).name, 
schema.fields(schema.fieldIndex(fieldName)))
+  /**
+   * Handles navigation into ARRAY types using the Avro ".list.element" 
pattern.
+   *
+   * @param arrayType  the ArrayType to navigate into
+   * @param fullPath   the full field path string
+   * @param offset     current position in fullPath (should point to start of 
"list")
+   * @param prefix     the accumulated field path prefix
+   * @param rootFieldName the name of the array field
+   * @return Pair of canonical field name and the StructField
+   */
+  private def handleArrayNavigationAvro(arrayType: ArrayType, fullPath: 
String, offset: Int,
+                                        prefix: String, rootFieldName: 
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {

Review Comment:
   Possible for us to import `Pair`?



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java:
##########
@@ -59,6 +59,41 @@ public final class HoodieSchemaUtils {
   public static final HoodieSchema METADATA_FIELD_SCHEMA = 
HoodieSchema.createNullable(HoodieSchemaType.STRING);
   public static final HoodieSchema RECORD_KEY_SCHEMA = initRecordKeySchema();
 
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private static final String ARRAY_LIST = "list";
+  private static final String ARRAY_ELEMENT = "element";
+  private static final String ARRAY_SPARK = "array"; // Spark writer uses this
+  private static final String MAP_KEY_VALUE = "key_value";
+  private static final String MAP_KEY = "key";
+  private static final String MAP_VALUE = "value";
+
+  private static final String ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + 
ARRAY_ELEMENT;

Review Comment:
   we can collapse this too so `HoodieSchemaUtils` in spark can use this.
   
   ```java
   /**
    * Returns the canonical path segment for a given input segment.
    * e.g., input "array" -> returns "list.element"
    * e.g., input "list.element" -> returns "list.element"
    */
   public static String getCanonicalArrayPath(String segment) {
       if (segment.equals(SPARK_ARRAY)) {
           return ARRAY_LIST + "." + ARRAY_ELEMENT;
       }
       return segment;
   }
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
 object HoodieSchemaUtils {
   private val log = LoggerFactory.getLogger(getClass)
 
-  def getSchemaForField(schema: StructType, fieldName: String): 
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
-    getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+  /**
+   * Constants for Parquet-style accessor patterns used in nested MAP and 
ARRAY navigation.
+   * These patterns are specifically used for column stats generation and 
differ from
+   * InternalSchema constants which are used in schema evolution contexts.
+   */
+  private final val ARRAY_LIST = "list"
+  private final val ARRAY_ELEMENT = "element"
+  private final val ARRAY_SPARK = "array" // Spark writer uses this
+  private final val MAP_KEY_VALUE = "key_value"
+  private final val MAP_KEY = "key"
+  private final val MAP_VALUE = "value"
+
+  private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+  private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+  private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE

Review Comment:
   Let's also centralize these constants in Java and import them directly from 
`HoodieSchemaUtils.java`.
   
   We should split them up accordingly like this for better readability:
   
   ```java
   // Physical Storage Constants (Parquet/Avro)
   public static final String ARRAY_LIST_FIELD_NAME = "list";
   public static final String ARRAY_ELEMENT_FIELD_NAME = "element";
   public static final String MAP_KEY_VALUE_FIELD_NAME = "key_value";
   public static final String MAP_KEY_FIELD_NAME = "key";
   public static final String MAP_VALUE_FIELD_NAME = "value";
   
   // Logical Accessor Constants (Spark)
   public static final String SPARK_ARRAY_ACCESSOR = "array";
   
   // Canonical Paths (For Output consistency)
   public static final String CANONICAL_ARRAY_PATH = ARRAY_LIST_FIELD_NAME + 
"." + ARRAY_ELEMENT_FIELD_NAME;
   public static final String CANONICAL_MAP_KEY_PATH = MAP_KEY_VALUE_FIELD_NAME 
+ "." + MAP_KEY_FIELD_NAME;
   public static final String CANONICAL_MAP_VALUE_PATH = 
MAP_KEY_VALUE_FIELD_NAME + "." + MAP_VALUE_FIELD_NAME;
   ```
   
   Understand that we need to a separate traversal logic here. That's fine as 
we can have different traversal logic in different areas, but we should reduce 
code smell by centralizing the rules, e.g. common reserved keywords and format 
of fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to