voonhous commented on code in PR #17694:
URL: https://github.com/apache/hudi/pull/17694#discussion_r2674963220
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
object HoodieSchemaUtils {
private val log = LoggerFactory.getLogger(getClass)
- def getSchemaForField(schema: StructType, fieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private final val ARRAY_LIST = "list"
+ private final val ARRAY_ELEMENT = "element"
+ private final val ARRAY_SPARK = "array" // Spark writer uses this
+ private final val MAP_KEY_VALUE = "key_value"
+ private final val MAP_KEY = "key"
+ private final val MAP_VALUE = "value"
+
+ private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+ private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+ private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+ /**
+ * Advances offset past a component name in the path, handling end-of-path
and dot separator.
+ *
+ * @param path the full path string
+ * @param offset current position in path
+ * @param component the component name to match (e.g., "element", "key",
"value")
+ * @return new offset after component and dot, or path.length() if at end,
or -1 if no match
+ */
+ private def getNextOffset(path: String, offset: Int, component: String): Int
= {
+ if (!path.regionMatches(offset, component, 0, component.length)) {
+ -1
+ } else {
+ val next = offset + component.length
+ if (next == path.length) {
+ next
+ } else {
+ if (path.charAt(next) == '.') next + 1 else -1
+ }
+ }
}
- def getSchemaForField(schema: StructType, fieldName: String, prefix:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- if (!(fieldName.contains("."))) {
- org.apache.hudi.common.util.collection.Pair.of(prefix +
schema.fields(schema.fieldIndex(fieldName)).name,
schema.fields(schema.fieldIndex(fieldName)))
+ /**
+ * Handles navigation into ARRAY types using the Avro ".list.element"
pattern.
+ *
+ * @param arrayType the ArrayType to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to start of
"list")
+ * @param prefix the accumulated field path prefix
+ * @param rootFieldName the name of the array field
+ * @return Pair of canonical field name and the StructField
+ */
+ private def handleArrayNavigationAvro(arrayType: ArrayType, fullPath:
String, offset: Int,
+ prefix: String, rootFieldName:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+ val elementType = arrayType.elementType
+ val listOffset = getNextOffset(fullPath, offset, ARRAY_LIST)
+
+ if (listOffset == -1) {
+ throw new HoodieException(s"Array field requires
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+ }
+
+ val elementOffset = getNextOffset(fullPath, listOffset, ARRAY_ELEMENT)
+ if (elementOffset == -1) {
+ throw new HoodieException(s"Array field requires
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+ }
+
+ if (elementOffset == fullPath.length) {
+ // Terminal case: just accessing the element type
+ val elementField = StructField(ARRAY_ELEMENT, elementType,
arrayType.containsNull)
+ org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName +
"." + ARRAY_LIST_ELEMENT, elementField)
+ } else if (elementType.isInstanceOf[StructType]) {
+ // Recursive case: accessing fields within array elements
+ getSchemaForFieldInternal(elementType.asInstanceOf[StructType],
fullPath, elementOffset,
+ prefix + rootFieldName + "." + ARRAY_LIST_ELEMENT + ".")
+ } else {
+ throw new HoodieException(s"Invalid array navigation pattern: $fullPath")
+ }
+ }
+
+ /**
+ * Handles navigation into ARRAY types using the Spark ".array" pattern.
+ *
+ * @param arrayType the ArrayType to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to start of
"array")
+ * @param prefix the accumulated field path prefix
+ * @param rootFieldName the name of the array field
+ * @return Pair of canonical field name and the StructField
+ */
+ private def handleArrayNavigationSpark(arrayType: ArrayType, fullPath:
String, offset: Int,
+ prefix: String, rootFieldName:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+ val elementType = arrayType.elementType
+ val sparkArrayOffset = getNextOffset(fullPath, offset, ARRAY_SPARK)
+
+ if (sparkArrayOffset == -1) {
+ throw new HoodieException(s"Array field requires .$ARRAY_SPARK accessor
pattern: $fullPath")
+ }
+
+ if (sparkArrayOffset == fullPath.length) {
+ // Terminal case: just accessing the element type
+ val elementField = StructField(ARRAY_SPARK, elementType,
arrayType.containsNull)
+ org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName +
"." + ARRAY_SPARK, elementField)
+ } else if (elementType.isInstanceOf[StructType]) {
+ // Recursive case: accessing fields within array elements
+ getSchemaForFieldInternal(elementType.asInstanceOf[StructType],
fullPath, sparkArrayOffset,
Review Comment:
Let's normalize the returned path string. We are returning the `Spark-style`
path. We should map this to the Avro-style path so the stored index key is
consistent across engines?
Since we are supporting 2 formats/structures, i feel we should pick one, and
normalize all others into one.
After which, we can add a test case to verify that `items.array.id` returns
`items.list.element.id`, where `items.list.element.id` is the canonical path.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
object HoodieSchemaUtils {
private val log = LoggerFactory.getLogger(getClass)
- def getSchemaForField(schema: StructType, fieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private final val ARRAY_LIST = "list"
+ private final val ARRAY_ELEMENT = "element"
+ private final val ARRAY_SPARK = "array" // Spark writer uses this
+ private final val MAP_KEY_VALUE = "key_value"
+ private final val MAP_KEY = "key"
+ private final val MAP_VALUE = "value"
+
+ private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+ private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+ private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+ /**
+ * Advances offset past a component name in the path, handling end-of-path
and dot separator.
+ *
+ * @param path the full path string
+ * @param offset current position in path
+ * @param component the component name to match (e.g., "element", "key",
"value")
+ * @return new offset after component and dot, or path.length() if at end,
or -1 if no match
+ */
+ private def getNextOffset(path: String, offset: Int, component: String): Int
= {
Review Comment:
Possible for us to import this directly from java?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
object HoodieSchemaUtils {
private val log = LoggerFactory.getLogger(getClass)
- def getSchemaForField(schema: StructType, fieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private final val ARRAY_LIST = "list"
+ private final val ARRAY_ELEMENT = "element"
+ private final val ARRAY_SPARK = "array" // Spark writer uses this
+ private final val MAP_KEY_VALUE = "key_value"
+ private final val MAP_KEY = "key"
+ private final val MAP_VALUE = "value"
+
+ private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+ private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+ private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
Review Comment:
These these are reserved keywords now. Let's include a test case where users
are using these reserve keywords as column/field names.
For example:
```
// A struct containing an array of structs, where the inner struct has a
field named "array"
StructField("items", ArrayType(
StructType(Seq(
StructField("array", StringType) // <--- Field named "array"
))
))
```
We need to ensure that `items.array.array` is properly handled. The test
will most likely pass as we are only doing special handling for `ArrayTypes`,
but let's err on the side of caution to guard against this.
Let's also apply this to the other reserved keywords.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
object HoodieSchemaUtils {
private val log = LoggerFactory.getLogger(getClass)
- def getSchemaForField(schema: StructType, fieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private final val ARRAY_LIST = "list"
+ private final val ARRAY_ELEMENT = "element"
+ private final val ARRAY_SPARK = "array" // Spark writer uses this
+ private final val MAP_KEY_VALUE = "key_value"
+ private final val MAP_KEY = "key"
+ private final val MAP_VALUE = "value"
+
+ private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+ private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+ private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+ /**
+ * Advances offset past a component name in the path, handling end-of-path
and dot separator.
+ *
+ * @param path the full path string
+ * @param offset current position in path
+ * @param component the component name to match (e.g., "element", "key",
"value")
+ * @return new offset after component and dot, or path.length() if at end,
or -1 if no match
+ */
+ private def getNextOffset(path: String, offset: Int, component: String): Int
= {
+ if (!path.regionMatches(offset, component, 0, component.length)) {
+ -1
+ } else {
+ val next = offset + component.length
+ if (next == path.length) {
+ next
+ } else {
+ if (path.charAt(next) == '.') next + 1 else -1
+ }
+ }
}
- def getSchemaForField(schema: StructType, fieldName: String, prefix:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- if (!(fieldName.contains("."))) {
- org.apache.hudi.common.util.collection.Pair.of(prefix +
schema.fields(schema.fieldIndex(fieldName)).name,
schema.fields(schema.fieldIndex(fieldName)))
+ /**
+ * Handles navigation into ARRAY types using the Avro ".list.element"
pattern.
+ *
+ * @param arrayType the ArrayType to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to start of
"list")
+ * @param prefix the accumulated field path prefix
+ * @param rootFieldName the name of the array field
+ * @return Pair of canonical field name and the StructField
+ */
+ private def handleArrayNavigationAvro(arrayType: ArrayType, fullPath:
String, offset: Int,
+ prefix: String, rootFieldName:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+ val elementType = arrayType.elementType
+ val listOffset = getNextOffset(fullPath, offset, ARRAY_LIST)
+
+ if (listOffset == -1) {
+ throw new HoodieException(s"Array field requires
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+ }
+
+ val elementOffset = getNextOffset(fullPath, listOffset, ARRAY_ELEMENT)
+ if (elementOffset == -1) {
+ throw new HoodieException(s"Array field requires
.$ARRAY_LIST.$ARRAY_ELEMENT accessor pattern: $fullPath")
+ }
+
+ if (elementOffset == fullPath.length) {
+ // Terminal case: just accessing the element type
+ val elementField = StructField(ARRAY_ELEMENT, elementType,
arrayType.containsNull)
+ org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName +
"." + ARRAY_LIST_ELEMENT, elementField)
+ } else if (elementType.isInstanceOf[StructType]) {
+ // Recursive case: accessing fields within array elements
+ getSchemaForFieldInternal(elementType.asInstanceOf[StructType],
fullPath, elementOffset,
+ prefix + rootFieldName + "." + ARRAY_LIST_ELEMENT + ".")
+ } else {
+ throw new HoodieException(s"Invalid array navigation pattern: $fullPath")
+ }
+ }
+
+ /**
+ * Handles navigation into ARRAY types using the Spark ".array" pattern.
+ *
+ * @param arrayType the ArrayType to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to start of
"array")
+ * @param prefix the accumulated field path prefix
+ * @param rootFieldName the name of the array field
+ * @return Pair of canonical field name and the StructField
+ */
+ private def handleArrayNavigationSpark(arrayType: ArrayType, fullPath:
String, offset: Int,
+ prefix: String, rootFieldName:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+ val elementType = arrayType.elementType
+ val sparkArrayOffset = getNextOffset(fullPath, offset, ARRAY_SPARK)
+
+ if (sparkArrayOffset == -1) {
+ throw new HoodieException(s"Array field requires .$ARRAY_SPARK accessor
pattern: $fullPath")
+ }
+
+ if (sparkArrayOffset == fullPath.length) {
+ // Terminal case: just accessing the element type
+ val elementField = StructField(ARRAY_SPARK, elementType,
arrayType.containsNull)
+ org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName +
"." + ARRAY_SPARK, elementField)
+ } else if (elementType.isInstanceOf[StructType]) {
+ // Recursive case: accessing fields within array elements
+ getSchemaForFieldInternal(elementType.asInstanceOf[StructType],
fullPath, sparkArrayOffset,
+ prefix + rootFieldName + "." + ARRAY_SPARK + ".")
+ } else {
+ throw new HoodieException(s"Invalid array navigation pattern: $fullPath")
+ }
+ }
+
+ /**
+ * Handles navigation into MAP types using the Parquet-style
".key_value.key" or ".key_value.value" patterns.
+ *
+ * @param mapType the MapType to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to start of
"key_value")
+ * @param prefix the accumulated field path prefix
+ * @param rootFieldName the name of the map field
+ * @return Pair of canonical field name and the StructField
+ */
+ private def handleMapNavigation(mapType: MapType, fullPath: String, offset:
Int,
+ prefix: String, rootFieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
+ val keyValueOffset = getNextOffset(fullPath, offset, MAP_KEY_VALUE)
+ if (keyValueOffset == -1) {
+ throw new HoodieException(s"Invalid map navigation pattern: $fullPath.
Expected .$MAP_KEY_VALUE_KEY or .$MAP_KEY_VALUE_VALUE")
}
- else {
- val rootFieldIndex: Int = fieldName.indexOf(".")
- val rootField: StructField =
schema.fields(schema.fieldIndex(fieldName.substring(0, rootFieldIndex)))
+
+ // Check for .key pattern
+ val keyOffset = getNextOffset(fullPath, keyValueOffset, MAP_KEY)
+ if (keyOffset != -1) {
+ // Accessing map keys (always the key type)
+ val keyField = StructField(MAP_KEY, mapType.keyType, false)
+ if (keyOffset == fullPath.length) {
+ org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName
+ "." + MAP_KEY_VALUE_KEY, keyField)
+ } else {
+ throw new HoodieException(s"Cannot navigate beyond map key: $fullPath")
+ }
+ } else {
+ // Check for .value pattern
+ val valueOffset = getNextOffset(fullPath, keyValueOffset, MAP_VALUE)
+ if (valueOffset == -1) {
+ throw new HoodieException(s"Invalid map navigation pattern: $fullPath.
Expected .$MAP_KEY_VALUE_KEY or .$MAP_KEY_VALUE_VALUE")
+ }
+
+ val valueType = mapType.valueType
+ if (valueOffset == fullPath.length) {
+ // Terminal case: just accessing the value type
+ val valueField = StructField(MAP_VALUE, valueType,
mapType.valueContainsNull)
+ org.apache.hudi.common.util.collection.Pair.of(prefix + rootFieldName
+ "." + MAP_KEY_VALUE_VALUE, valueField)
+ } else if (valueType.isInstanceOf[StructType]) {
+ // Recursive case: accessing fields within map values
+ getSchemaForFieldInternal(valueType.asInstanceOf[StructType],
fullPath, valueOffset,
+ prefix + rootFieldName + "." + MAP_KEY_VALUE_VALUE + ".")
+ } else {
+ throw new HoodieException(s"Invalid map value navigation pattern:
$fullPath")
+ }
+ }
+ }
+
+ /**
+ * Internal helper method for retrieving nested fields using offset-based
navigation.
+ * This method uses offset arithmetic to navigate through the path string
instead of
+ * creating intermediate substring objects, improving performance and memory
efficiency.
+ *
+ * @param schema the schema to search in
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath
+ * @param prefix the accumulated field path prefix
+ * @return Pair of canonical field name and the StructField
+ * @throws HoodieException if field is not found or navigation pattern is
invalid
+ */
+ private def getSchemaForFieldInternal(schema: StructType, fullPath: String,
offset: Int, prefix: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
Review Comment:
We can make this code less java-like and more scala-like. Scala's `match`
expression is more readable and safer.
```scala
private def getSchemaForFieldInternal(schema: StructType, fullPath: String,
offset: Int, prefix: String): Pair[String, StructField] = {
val nextDot = fullPath.indexOf('.', offset)
// Terminal case
if (nextDot == -1) {
val field = schema(fullPath.substring(offset)) // StructType.apply
handles lookup
return Pair.of(prefix + field.name, field)
}
val rootFieldName = fullPath.substring(offset, nextDot)
val rootField = schema(rootFieldName)
val nextOffset = nextDot + 1
rootField.dataType match {
case at: ArrayType =>
// Cleanly separating Spark vs Avro logic using Pattern Matching
if (isNextToken(fullPath, nextOffset, SPARK_ARRAY_ACCESSOR)) {
handleArraySpark(at, fullPath, nextOffset, prefix, rootFieldName)
} else if (isNextToken(fullPath, nextOffset, ARRAY_LIST_FIELD_NAME)) {
handleArrayAvro(at, fullPath, nextOffset, prefix, rootFieldName)
} else {
throw new HoodieException(s"Invalid array path: $fullPath")
}
case mt: MapType =>
handleMapNavigation(mt, fullPath, nextOffset, prefix, rootFieldName)
case st: StructType =>
getSchemaForFieldInternal(st, fullPath, nextOffset, prefix +
rootFieldName + ".")
case _ =>
throw new HoodieException(s"Unsupported type: ${rootField.dataType}")
}
}
```
Above is just an example.
##########
hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/schema/TestHoodieSparkSchemaUtils.scala:
##########
@@ -55,6 +56,177 @@ class TestHoodieSparkSchemaUtils {
assertFieldType(schema, "nested_field.nested_string", StringType)
}
+ @Test
+ def testGetNestedFieldWithArraySpark(): Unit = {
Review Comment:
We can parameterize the tests here, instead of writing so many tests, we can
make it more maintainable like this:
```scala
@ParameterizedTest
@CsvSource(value = Array(
"items.array, items.list.element, string", // Input
Spark -> Expect Avro Key
"items.list.element, items.list.element, string", // Input Avro
-> Expect Avro Key
"items.array.nested, items.list.element.nested, int", // Nested
Spark
"items.list.element.nested, items.list.element.nested, int" // Nested Avro
))
def testPathNormalization(inputPath: String, expectedKey: String,
expectedType: String): Unit = {
val result = HoodieSchemaUtils.getSchemaForField(complexSchema, inputPath)
assertEquals(expectedKey, result.getKey) // Verifies normalization!
assertEquals(expectedType, result.getValue.dataType.simpleString)
}
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
object HoodieSchemaUtils {
private val log = LoggerFactory.getLogger(getClass)
- def getSchemaForField(schema: StructType, fieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private final val ARRAY_LIST = "list"
+ private final val ARRAY_ELEMENT = "element"
+ private final val ARRAY_SPARK = "array" // Spark writer uses this
+ private final val MAP_KEY_VALUE = "key_value"
+ private final val MAP_KEY = "key"
+ private final val MAP_VALUE = "value"
+
+ private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+ private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+ private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
+
+ /**
+ * Advances offset past a component name in the path, handling end-of-path
and dot separator.
+ *
+ * @param path the full path string
+ * @param offset current position in path
+ * @param component the component name to match (e.g., "element", "key",
"value")
+ * @return new offset after component and dot, or path.length() if at end,
or -1 if no match
+ */
+ private def getNextOffset(path: String, offset: Int, component: String): Int
= {
+ if (!path.regionMatches(offset, component, 0, component.length)) {
+ -1
+ } else {
+ val next = offset + component.length
+ if (next == path.length) {
+ next
+ } else {
+ if (path.charAt(next) == '.') next + 1 else -1
+ }
+ }
}
- def getSchemaForField(schema: StructType, fieldName: String, prefix:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- if (!(fieldName.contains("."))) {
- org.apache.hudi.common.util.collection.Pair.of(prefix +
schema.fields(schema.fieldIndex(fieldName)).name,
schema.fields(schema.fieldIndex(fieldName)))
+ /**
+ * Handles navigation into ARRAY types using the Avro ".list.element"
pattern.
+ *
+ * @param arrayType the ArrayType to navigate into
+ * @param fullPath the full field path string
+ * @param offset current position in fullPath (should point to start of
"list")
+ * @param prefix the accumulated field path prefix
+ * @param rootFieldName the name of the array field
+ * @return Pair of canonical field name and the StructField
+ */
+ private def handleArrayNavigationAvro(arrayType: ArrayType, fullPath:
String, offset: Int,
+ prefix: String, rootFieldName:
String): org.apache.hudi.common.util.collection.Pair[String, StructField] = {
Review Comment:
Possible for us to import `Pair`?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java:
##########
@@ -59,6 +59,41 @@ public final class HoodieSchemaUtils {
public static final HoodieSchema METADATA_FIELD_SCHEMA =
HoodieSchema.createNullable(HoodieSchemaType.STRING);
public static final HoodieSchema RECORD_KEY_SCHEMA = initRecordKeySchema();
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private static final String ARRAY_LIST = "list";
+ private static final String ARRAY_ELEMENT = "element";
+ private static final String ARRAY_SPARK = "array"; // Spark writer uses this
+ private static final String MAP_KEY_VALUE = "key_value";
+ private static final String MAP_KEY = "key";
+ private static final String MAP_VALUE = "value";
+
+ private static final String ARRAY_LIST_ELEMENT = ARRAY_LIST + "." +
ARRAY_ELEMENT;
Review Comment:
we can collapse this too so `HoodieSchemaUtils` in spark can use this.
```java
/**
* Returns the canonical path segment for a given input segment.
* e.g., input "array" -> returns "list.element"
* e.g., input "list.element" -> returns "list.element"
*/
public static String getCanonicalArrayPath(String segment) {
if (segment.equals(SPARK_ARRAY)) {
return ARRAY_LIST + "." + ARRAY_ELEMENT;
}
return segment;
}
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -46,24 +46,226 @@ import scala.collection.JavaConverters._
object HoodieSchemaUtils {
private val log = LoggerFactory.getLogger(getClass)
- def getSchemaForField(schema: StructType, fieldName: String):
org.apache.hudi.common.util.collection.Pair[String, StructField] = {
- getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING)
+ /**
+ * Constants for Parquet-style accessor patterns used in nested MAP and
ARRAY navigation.
+ * These patterns are specifically used for column stats generation and
differ from
+ * InternalSchema constants which are used in schema evolution contexts.
+ */
+ private final val ARRAY_LIST = "list"
+ private final val ARRAY_ELEMENT = "element"
+ private final val ARRAY_SPARK = "array" // Spark writer uses this
+ private final val MAP_KEY_VALUE = "key_value"
+ private final val MAP_KEY = "key"
+ private final val MAP_VALUE = "value"
+
+ private final val ARRAY_LIST_ELEMENT = ARRAY_LIST + "." + ARRAY_ELEMENT
+ private final val MAP_KEY_VALUE_KEY = MAP_KEY_VALUE + "." + MAP_KEY
+ private final val MAP_KEY_VALUE_VALUE = MAP_KEY_VALUE + "." + MAP_VALUE
Review Comment:
Let's also centralize these constants in Java and import them directly from
`HoodieSchemaUtils.java`.
We should split them up accordingly like this for better readability:
```java
// Physical Storage Constants (Parquet/Avro)
public static final String ARRAY_LIST_FIELD_NAME = "list";
public static final String ARRAY_ELEMENT_FIELD_NAME = "element";
public static final String MAP_KEY_VALUE_FIELD_NAME = "key_value";
public static final String MAP_KEY_FIELD_NAME = "key";
public static final String MAP_VALUE_FIELD_NAME = "value";
// Logical Accessor Constants (Spark)
public static final String SPARK_ARRAY_ACCESSOR = "array";
// Canonical Paths (For Output consistency)
public static final String CANONICAL_ARRAY_PATH = ARRAY_LIST_FIELD_NAME +
"." + ARRAY_ELEMENT_FIELD_NAME;
public static final String CANONICAL_MAP_KEY_PATH = MAP_KEY_VALUE_FIELD_NAME
+ "." + MAP_KEY_FIELD_NAME;
public static final String CANONICAL_MAP_VALUE_PATH =
MAP_KEY_VALUE_FIELD_NAME + "." + MAP_VALUE_FIELD_NAME;
```
Understand that we need to a separate traversal logic here. That's fine as
we can have different traversal logic in different areas, but we should reduce
code smell by centralizing the rules, e.g. common reserved keywords and format
of fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]