This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fb612bef734cbf8aef94f714f8bde24688b82add
Author: voonhous <[email protected]>
AuthorDate: Tue Jan 31 15:04:36 2023 +0800

    [MINOR] Standardise schema concepts on Flink Engine (#7761)
---
 .../internal/schema/utils/InternalSchemaUtils.java |  4 +-
 .../hudi/table/format/InternalSchemaManager.java   | 57 +++++++++++++---------
 .../apache/hudi/table/format/RecordIterators.java  |  8 +--
 3 files changed, 41 insertions(+), 28 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
index 4c926f9f293..cf66986e155 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
@@ -278,9 +278,9 @@ public class InternalSchemaUtils {
   public static Map<String, String> collectRenameCols(InternalSchema 
oldSchema, InternalSchema newSchema) {
     List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
     return colNamesFromWriteSchema.stream().filter(f -> {
-      int filedIdFromWriteSchema = oldSchema.findIdByName(f);
+      int fieldIdFromWriteSchema = oldSchema.findIdByName(f);
       // try to find the cols which has the same id, but have different 
colName;
-      return newSchema.getAllIds().contains(filedIdFromWriteSchema) && 
!newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
+      return newSchema.getAllIds().contains(fieldIdFromWriteSchema) && 
!newSchema.findfullName(fieldIdFromWriteSchema).equalsIgnoreCase(f);
     }).collect(Collectors.toMap(e -> 
newSchema.findfullName(oldSchema.findIdByName(e)), e -> {
       int lastDotIndex = e.lastIndexOf(".");
       return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
index 7fa598bc834..3783e642c8d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -93,28 +93,39 @@ public class InternalSchemaManager implements Serializable {
     return querySchema;
   }
 
-  InternalSchema getFileSchema(String fileName) {
+  /**
+   * Attempts to merge the file and query schema to produce a mergeSchema, 
prioritising the use of fileSchema types.
+   * An emptySchema is returned if:
+   * <ul>
+   * <li>1. An empty querySchema is provided</li>
+   * <li>2. querySchema is equal to fileSchema</li>
+   * </ul>
+   * Note that this method returns an emptySchema if merging is not required 
to be performed.
+   * @param fileName Name of file to fetch commitTime/versionId for
+   * @return mergeSchema, i.e. the schema on which the file should be read with
+   */
+  InternalSchema getMergeSchema(String fileName) {
     if (querySchema.isEmptySchema()) {
       return querySchema;
     }
     long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
-    InternalSchema fileSchemaUnmerged = 
InternalSchemaCache.getInternalSchemaByVersionId(
+    InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(
         commitInstantTime, tablePath, getHadoopConf(), validCommits);
-    if (querySchema.equals(fileSchemaUnmerged)) {
+    if (querySchema.equals(fileSchema)) {
       return InternalSchema.getEmptyInternalSchema();
     }
-    return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, 
true).mergeSchema();
+    return new InternalSchemaMerger(fileSchema, querySchema, true, 
true).mergeSchema();
   }
 
   /**
-   * This method returns a mapping of columns that have type inconsistencies 
between the fileSchema and querySchema.
+   * This method returns a mapping of columns that have type inconsistencies 
between the mergeSchema and querySchema.
    * This is done by:
    * <li>1. Finding the columns with type changes</li>
    * <li>2. Get a map storing the index of these columns with type changes; 
Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)</li>
    * <li>3. For each selectedField with type changes, build a castMap 
containing the cast/conversion details;
    * Map of -> (selectedPos, Cast([from] fileType, [to] queryType))</li>
    *
-   * @param fileSchema InternalSchema representation of the file's schema 
(acquired from commit/.schema metadata)
+   * @param mergeSchema InternalSchema representation of mergeSchema 
(prioritise use of fileSchemaType) that is used for reading base parquet files
    * @param queryFieldNames array containing the columns of a Hudi Flink table
    * @param queryFieldTypes array containing the field types of the columns of 
a Hudi Flink table
    * @param selectedFields array containing the index of the columns of 
interest required (indexes are based on queryFieldNames and queryFieldTypes)
@@ -122,31 +133,33 @@ public class InternalSchemaManager implements 
Serializable {
    *
    * @see CastMap
    */
-  CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
+  CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
     Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
-    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
+    Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema 
cannot be empty");
 
     CastMap castMap = new CastMap();
     // map storing the indexes of columns with type changes Map of -> 
(colIdxInQueryFieldNames, colIdxInQuerySchema)
-    Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames);
+    Map<Integer, Integer> posProxy = getPosProxy(mergeSchema, queryFieldNames);
     if (posProxy.isEmpty()) {
       // no type changes
       castMap.setFileFieldTypes(queryFieldTypes);
       return castMap;
     }
     List<Integer> selectedFieldList = 
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
-    List<DataType> fileSchemaAsDataTypes = 
AvroSchemaConverter.convertToDataType(
-        AvroInternalSchemaConverter.convert(fileSchema, 
"tableName")).getChildren();
+    // mergeSchema is built with useColumnTypeFromFileSchema = true
+    List<DataType> mergeSchemaAsDataTypes = 
AvroSchemaConverter.convertToDataType(
+        AvroInternalSchemaConverter.convert(mergeSchema, 
"tableName")).getChildren();
     DataType[] fileFieldTypes = new DataType[queryFieldTypes.length];
     for (int i = 0; i < queryFieldTypes.length; i++) {
+      // position of ChangedType in querySchema
       Integer posOfChangedType = posProxy.get(i);
       if (posOfChangedType == null) {
         // no type change for column; fileFieldType == queryFieldType
         fileFieldTypes[i] = queryFieldTypes[i];
       } else {
         // type change detected for column;
-        DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType);
-        // update fileFieldType match the type found in fileSchema
+        DataType fileType = mergeSchemaAsDataTypes.get(posOfChangedType);
+        // update fileFieldType match the type found in mergeSchema
         fileFieldTypes[i] = fileType;
         int selectedPos = selectedFieldList.indexOf(i);
         if (selectedPos != -1) {
@@ -162,34 +175,34 @@ public class InternalSchemaManager implements 
Serializable {
 
   /**
    * For columns that have been modified via the column renaming operation, 
the column name might be inconsistent
-   * between querySchema and fileSchema.
+   * between querySchema and mergeSchema.
    * <p>
    * As such, this method will identify all columns that have been renamed, 
and return a string array of column names
-   * corresponding to the column names found in the fileSchema.
+   * corresponding to the column names found in the mergeSchema.
    * <p>
    * This is done by:
    * <li>1. Get the rename mapping of -> (colNameFromNewSchema, 
colNameLastPartFromOldSchema)</li>
    * <li>2. For columns that have been renamed, replace them with the old 
column name</li>
    *
-   * @param fileSchema InternalSchema representation of the file's schema 
(acquired from commit/.schema metadata)
+   * @param mergeSchema InternalSchema representation of mergeSchema 
(prioritise use of fileSchemaType) that is used for reading base parquet files
    * @param queryFieldNames array containing the columns of a Hudi Flink table
-   * @return String array containing column names corresponding to the column 
names found in the fileSchema
+   * @return String array containing column names corresponding to the column 
names found in the mergeSchema
    *
    * @see InternalSchemaUtils#collectRenameCols(InternalSchema, InternalSchema)
    */
-  String[] getFileFieldNames(InternalSchema fileSchema, String[] 
queryFieldNames) {
+  String[] getMergeFieldNames(InternalSchema mergeSchema, String[] 
queryFieldNames) {
     Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
-    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
+    Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema 
cannot be empty");
 
-    Map<String, String> renamedCols = 
InternalSchemaUtils.collectRenameCols(fileSchema, querySchema);
+    Map<String, String> renamedCols = 
InternalSchemaUtils.collectRenameCols(mergeSchema, querySchema);
     if (renamedCols.isEmpty()) {
       return queryFieldNames;
     }
     return Arrays.stream(queryFieldNames).map(name -> 
renamedCols.getOrDefault(name, name)).toArray(String[]::new);
   }
 
-  private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, 
String[] queryFieldNames) {
-    Map<Integer, Pair<Type, Type>> changedCols = 
InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema);
+  private Map<Integer, Integer> getPosProxy(InternalSchema mergeSchema, 
String[] queryFieldNames) {
+    Map<Integer, Pair<Type, Type>> changedCols = 
InternalSchemaUtils.collectTypeChangedCols(querySchema, mergeSchema);
     HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size());
     List<String> fieldNameList = Arrays.asList(queryFieldNames);
     List<Types.Field> columns = querySchema.columns();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 8657f16ddc9..1bc02bcad40 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -50,8 +50,8 @@ public abstract class RecordIterators {
       Path path,
       long splitStart,
       long splitLength) throws IOException {
-    InternalSchema fileSchema = 
internalSchemaManager.getFileSchema(path.getName());
-    if (fileSchema.isEmptySchema()) {
+    InternalSchema mergeSchema = 
internalSchemaManager.getMergeSchema(path.getName());
+    if (mergeSchema.isEmptySchema()) {
       return new ParquetSplitRecordIterator(
           ParquetSplitReaderUtil.genPartColumnarRowReader(
               utcTimestamp,
@@ -66,14 +66,14 @@ public abstract class RecordIterators {
               splitStart,
               splitLength));
     } else {
-      CastMap castMap = internalSchemaManager.getCastMap(fileSchema, 
fieldNames, fieldTypes, selectedFields);
+      CastMap castMap = internalSchemaManager.getCastMap(mergeSchema, 
fieldNames, fieldTypes, selectedFields);
       Option<RowDataProjection> castProjection = 
castMap.toRowDataProjection(selectedFields);
       ClosableIterator<RowData> itr = new ParquetSplitRecordIterator(
           ParquetSplitReaderUtil.genPartColumnarRowReader(
               utcTimestamp,
               caseSensitive,
               conf,
-              internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), 
// the reconciled field names
+              internalSchemaManager.getMergeFieldNames(mergeSchema, 
fieldNames), // the reconciled field names
               castMap.getFileFieldTypes(),                                     
// the reconciled field types
               partitionSpec,
               selectedFields,

Reply via email to