This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b513232449 [HUDI-4458] Add a converter cache for flink 
ColumnStatsIndices (#6205)
b513232449 is described below

commit b513232449a4d85648dfb4675c0bdb1073efbcf2
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jul 25 17:49:01 2022 +0800

    [HUDI-4458] Add a converter cache for flink ColumnStatsIndices (#6205)
---
 .../hudi/source/stats/ColumnStatsIndices.java      | 26 +++++++++++++++-------
 1 file changed, 18 insertions(+), 8 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
index d033c1d874..8ec0eafde3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
@@ -53,6 +53,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -161,6 +162,7 @@ public class ColumnStatsIndices {
         .filter(indexedColumns::contains)
         .collect(Collectors.toCollection(TreeSet::new));
 
+    final Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> 
converters = new ConcurrentHashMap<>();
     Map<StringData, List<RowData>> fileNameToRows = 
colStats.stream().parallel()
         .filter(row -> 
sortedTargetColumns.contains(row.getString(ORD_COL_NAME).toString()))
         .map(row -> {
@@ -172,7 +174,7 @@ public class ColumnStatsIndices {
           } else {
             String colName = row.getString(ORD_COL_NAME).toString();
             LogicalType colType = tableFieldTypeMap.get(colName);
-            return unpackMinMaxVal(row, colType);
+            return unpackMinMaxVal(row, colType, converters);
           }
         }).collect(Collectors.groupingBy(rowData -> 
rowData.getString(ORD_FILE_NAME)));
 
@@ -222,7 +224,8 @@ public class ColumnStatsIndices {
 
   private static RowData unpackMinMaxVal(
       RowData row,
-      LogicalType colType) {
+      LogicalType colType,
+      Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> 
converters) {
 
     RowData minValueStruct = row.getRow(ORD_MIN_VAL, 1);
     RowData maxValueStruct = row.getRow(ORD_MAX_VAL, 1);
@@ -230,8 +233,8 @@ public class ColumnStatsIndices {
     checkState(minValueStruct != null && maxValueStruct != null,
         "Invalid Column Stats record: either both min/max have to be null, or 
both have to be non-null");
 
-    Object minValue = tryUnpackNonNullVal(minValueStruct, colType);
-    Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType);
+    Object minValue = tryUnpackNonNullVal(minValueStruct, colType, converters);
+    Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType, converters);
 
     // the column schema:
     // |- file_name: string
@@ -252,18 +255,24 @@ public class ColumnStatsIndices {
     return unpackedRow;
   }
 
-  private static Object tryUnpackNonNullVal(RowData rowData, LogicalType 
colType) {
+  private static Object tryUnpackNonNullVal(
+      RowData rowData,
+      LogicalType colType,
+      Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> 
converters) {
     for (int i = 0; i < rowData.getArity(); i++) {
       // row data converted from avro is definitely generic.
       Object nested = ((GenericRowData) rowData).getField(i);
       if (nested != null) {
-        return doUnpack(nested, colType);
+        return doUnpack(nested, colType, converters);
       }
     }
     return null;
   }
 
-  private static Object doUnpack(Object rawVal, LogicalType logicalType) {
+  private static Object doUnpack(
+      Object rawVal,
+      LogicalType logicalType,
+      Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> 
converters) {
     // fix time unit
     switch (logicalType.getTypeRoot()) {
       case TIME_WITHOUT_TIME_ZONE:
@@ -287,7 +296,8 @@ public class ColumnStatsIndices {
       default:
         // no operation
     }
-    AvroToRowDataConverters.AvroToRowDataConverter converter = 
AvroToRowDataConverters.createConverter(logicalType);
+    AvroToRowDataConverters.AvroToRowDataConverter converter =
+        converters.computeIfAbsent(logicalType, k -> 
AvroToRowDataConverters.createConverter(logicalType));
     return converter.convert(rawVal);
   }
 

Reply via email to