yihua commented on code in PR #13711:
URL: https://github.com/apache/hudi/pull/13711#discussion_r2352774263
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java:
##########
@@ -56,17 +68,59 @@ public BaseSparkInternalRecordContext(HoodieTableConfig
tableConfig) {
}
public static Object getFieldValueFromInternalRow(InternalRow row, Schema
recordSchema, String fieldName) {
+ return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName,
false);
+ }
+
+ public static Object getFieldValueFromInternalRowAsJava(InternalRow row,
Schema recordSchema, String fieldName) {
+ return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName,
true);
+ }
+
+ private static Object getFieldValueFromInternalRowInternal(InternalRow row,
Schema recordSchema, String fieldName, boolean convertToJavaType) {
StructType structType = getCachedSchema(recordSchema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
if (cachedNestedFieldPath.isDefined()) {
HoodieUnsafeRowUtils.NestedFieldPath nestedFieldPath =
cachedNestedFieldPath.get();
- return HoodieUnsafeRowUtils.getNestedInternalRowValue(row,
nestedFieldPath);
+ Object value = HoodieUnsafeRowUtils.getNestedInternalRowValue(row,
nestedFieldPath);
+ if (!convertToJavaType) {
+ return value;
+ }
+ return sparkTypeToJavaType(value,
nestedFieldPath.parts()[nestedFieldPath.parts().length - 1]._2.dataType());
} else {
return null;
}
}
+
+ public static Object sparkTypeToJavaType(Object value, DataType dataType) {
+ if (value == null) {
+ return null;
+ } else if (value instanceof UTF8String) {
+ return ((UTF8String) value).toString();
+ } else if (value instanceof Decimal) {
+ return ((Decimal) value).toJavaBigDecimal();
+ } else if (value instanceof byte[]) {
+ return ByteBuffer.wrap((byte[]) value);
+ } else if (value instanceof org.apache.spark.sql.catalyst.util.ArrayData) {
+ Comparable[] values =
Arrays.stream(((org.apache.spark.sql.catalyst.util.ArrayData) value)
+ .toObjectArray(((org.apache.spark.sql.types.ArrayType)
dataType).elementType()))
+ .map(v -> {
+ if (v instanceof ArrayData) {
+ throw new UnsupportedOperationException("cannot have nested
arrays?");
+ } else {
+ return (Comparable) sparkTypeToJavaType(v, null);
+ }
+ }).toArray(Comparable[]::new);
+ return new ArrayComparable(values);
Review Comment:
I think there is no standard way to store min and max values of Array or
List type field and how the min and max should be leveraged when doing data
skipping (i.e., what should be expected ordering when comparing arrays or
lists), so I suggest that for column stats index, such fields should be
excluded from generating the min and max to avoid confusion.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java:
##########
@@ -56,17 +68,59 @@ public BaseSparkInternalRecordContext(HoodieTableConfig
tableConfig) {
}
public static Object getFieldValueFromInternalRow(InternalRow row, Schema
recordSchema, String fieldName) {
+ return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName,
false);
+ }
+
+ public static Object getFieldValueFromInternalRowAsJava(InternalRow row,
Schema recordSchema, String fieldName) {
Review Comment:
Is `getFieldValueFromInternalRowAsJava` only used by column stat, secondary
index, and ordering value / event time?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java:
##########
@@ -56,17 +68,59 @@ public BaseSparkInternalRecordContext(HoodieTableConfig
tableConfig) {
}
public static Object getFieldValueFromInternalRow(InternalRow row, Schema
recordSchema, String fieldName) {
+ return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName,
false);
+ }
+
+ public static Object getFieldValueFromInternalRowAsJava(InternalRow row,
Schema recordSchema, String fieldName) {
+ return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName,
true);
+ }
+
+ private static Object getFieldValueFromInternalRowInternal(InternalRow row,
Schema recordSchema, String fieldName, boolean convertToJavaType) {
StructType structType = getCachedSchema(recordSchema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
if (cachedNestedFieldPath.isDefined()) {
HoodieUnsafeRowUtils.NestedFieldPath nestedFieldPath =
cachedNestedFieldPath.get();
- return HoodieUnsafeRowUtils.getNestedInternalRowValue(row,
nestedFieldPath);
+ Object value = HoodieUnsafeRowUtils.getNestedInternalRowValue(row,
nestedFieldPath);
+ if (!convertToJavaType) {
+ return value;
+ }
+ return sparkTypeToJavaType(value,
nestedFieldPath.parts()[nestedFieldPath.parts().length - 1]._2.dataType());
Review Comment:
Could `HoodieUnsafeRowUtils.getNestedInternalRowValue(row, nestedFieldPath)`
or a new method return both the value and the data type to avoid invoking
`nestedFieldPath.parts()[nestedFieldPath.parts().length - 1]._2.dataType()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]