This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b513232449 [HUDI-4458] Add a converter cache for flink
ColumnStatsIndices (#6205)
b513232449 is described below
commit b513232449a4d85648dfb4675c0bdb1073efbcf2
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jul 25 17:49:01 2022 +0800
[HUDI-4458] Add a converter cache for flink ColumnStatsIndices (#6205)
---
.../hudi/source/stats/ColumnStatsIndices.java | 26 +++++++++++++++-------
1 file changed, 18 insertions(+), 8 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
index d033c1d874..8ec0eafde3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
@@ -53,6 +53,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -161,6 +162,7 @@ public class ColumnStatsIndices {
.filter(indexedColumns::contains)
.collect(Collectors.toCollection(TreeSet::new));
+ final Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter>
converters = new ConcurrentHashMap<>();
Map<StringData, List<RowData>> fileNameToRows =
colStats.stream().parallel()
.filter(row ->
sortedTargetColumns.contains(row.getString(ORD_COL_NAME).toString()))
.map(row -> {
@@ -172,7 +174,7 @@ public class ColumnStatsIndices {
} else {
String colName = row.getString(ORD_COL_NAME).toString();
LogicalType colType = tableFieldTypeMap.get(colName);
- return unpackMinMaxVal(row, colType);
+ return unpackMinMaxVal(row, colType, converters);
}
}).collect(Collectors.groupingBy(rowData ->
rowData.getString(ORD_FILE_NAME)));
@@ -222,7 +224,8 @@ public class ColumnStatsIndices {
private static RowData unpackMinMaxVal(
RowData row,
- LogicalType colType) {
+ LogicalType colType,
+ Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter>
converters) {
RowData minValueStruct = row.getRow(ORD_MIN_VAL, 1);
RowData maxValueStruct = row.getRow(ORD_MAX_VAL, 1);
@@ -230,8 +233,8 @@ public class ColumnStatsIndices {
checkState(minValueStruct != null && maxValueStruct != null,
"Invalid Column Stats record: either both min/max have to be null, or
both have to be non-null");
- Object minValue = tryUnpackNonNullVal(minValueStruct, colType);
- Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType);
+ Object minValue = tryUnpackNonNullVal(minValueStruct, colType, converters);
+ Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType, converters);
// the column schema:
// |- file_name: string
@@ -252,18 +255,24 @@ public class ColumnStatsIndices {
return unpackedRow;
}
- private static Object tryUnpackNonNullVal(RowData rowData, LogicalType
colType) {
+ private static Object tryUnpackNonNullVal(
+ RowData rowData,
+ LogicalType colType,
+ Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter>
converters) {
for (int i = 0; i < rowData.getArity(); i++) {
// row data converted from avro is definitely generic.
Object nested = ((GenericRowData) rowData).getField(i);
if (nested != null) {
- return doUnpack(nested, colType);
+ return doUnpack(nested, colType, converters);
}
}
return null;
}
- private static Object doUnpack(Object rawVal, LogicalType logicalType) {
+ private static Object doUnpack(
+ Object rawVal,
+ LogicalType logicalType,
+ Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter>
converters) {
// fix time unit
switch (logicalType.getTypeRoot()) {
case TIME_WITHOUT_TIME_ZONE:
@@ -287,7 +296,8 @@ public class ColumnStatsIndices {
default:
// no operation
}
- AvroToRowDataConverters.AvroToRowDataConverter converter =
AvroToRowDataConverters.createConverter(logicalType);
+ AvroToRowDataConverters.AvroToRowDataConverter converter =
+ converters.computeIfAbsent(logicalType, k ->
AvroToRowDataConverters.createConverter(logicalType));
return converter.convert(rawVal);
}