This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fb612bef734cbf8aef94f714f8bde24688b82add Author: voonhous <[email protected]> AuthorDate: Tue Jan 31 15:04:36 2023 +0800 [MINOR] Standardise schema concepts on Flink Engine (#7761) --- .../internal/schema/utils/InternalSchemaUtils.java | 4 +- .../hudi/table/format/InternalSchemaManager.java | 57 +++++++++++++--------- .../apache/hudi/table/format/RecordIterators.java | 8 +-- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java index 4c926f9f293..cf66986e155 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java @@ -278,9 +278,9 @@ public class InternalSchemaUtils { public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) { List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName(); return colNamesFromWriteSchema.stream().filter(f -> { - int filedIdFromWriteSchema = oldSchema.findIdByName(f); + int fieldIdFromWriteSchema = oldSchema.findIdByName(f); // try to find the cols which has the same id, but have different colName; - return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f); + return newSchema.getAllIds().contains(fieldIdFromWriteSchema) && !newSchema.findfullName(fieldIdFromWriteSchema).equalsIgnoreCase(f); }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> { int lastDotIndex = e.lastIndexOf("."); return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java index 7fa598bc834..3783e642c8d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java @@ -93,28 +93,39 @@ public class InternalSchemaManager implements Serializable { return querySchema; } - InternalSchema getFileSchema(String fileName) { + /** + * Attempts to merge the file and query schema to produce a mergeSchema, prioritising the use of fileSchema types. + * An emptySchema is returned if: + * <ul> + * <li>1. An empty querySchema is provided</li> + * <li>2. querySchema is equal to fileSchema</li> + * </ul> + * Note that this method returns an emptySchema if merging is not required to be performed. + * @param fileName Name of file to fetch commitTime/versionId for + * @return mergeSchema, i.e. the schema on which the file should be read with + */ + InternalSchema getMergeSchema(String fileName) { if (querySchema.isEmptySchema()) { return querySchema; } long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName)); - InternalSchema fileSchemaUnmerged = InternalSchemaCache.getInternalSchemaByVersionId( + InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, getHadoopConf(), validCommits); - if (querySchema.equals(fileSchemaUnmerged)) { + if (querySchema.equals(fileSchema)) { return InternalSchema.getEmptyInternalSchema(); } - return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, true).mergeSchema(); + return new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema(); } /** - * This method returns a mapping of columns that have type inconsistencies between the fileSchema and querySchema. + * This method returns a mapping of columns that have type inconsistencies between the mergeSchema and querySchema. * This is done by: * <li>1. Finding the columns with type changes</li> * <li>2. Get a map storing the index of these columns with type changes; Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)</li> * <li>3. For each selectedField with type changes, build a castMap containing the cast/conversion details; * Map of -> (selectedPos, Cast([from] fileType, [to] queryType))</li> * - * @param fileSchema InternalSchema representation of the file's schema (acquired from commit/.schema metadata) + * @param mergeSchema InternalSchema representation of mergeSchema (prioritise use of fileSchemaType) that is used for reading base parquet files * @param queryFieldNames array containing the columns of a Hudi Flink table * @param queryFieldTypes array containing the field types of the columns of a Hudi Flink table * @param selectedFields array containing the index of the columns of interest required (indexes are based on queryFieldNames and queryFieldTypes) @@ -122,31 +133,33 @@ public class InternalSchemaManager implements Serializable { * * @see CastMap */ - CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) { + CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames, DataType[] queryFieldTypes, int[] selectedFields) { Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); - Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); + Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema cannot be empty"); CastMap castMap = new CastMap(); // map storing the indexes of columns with type changes Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema) - Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames); + Map<Integer, Integer> posProxy = getPosProxy(mergeSchema, queryFieldNames); if (posProxy.isEmpty()) { // no type changes castMap.setFileFieldTypes(queryFieldTypes); return castMap; } List<Integer> selectedFieldList = IntStream.of(selectedFields).boxed().collect(Collectors.toList()); - List<DataType> fileSchemaAsDataTypes = AvroSchemaConverter.convertToDataType( - AvroInternalSchemaConverter.convert(fileSchema, "tableName")).getChildren(); + // mergeSchema is built with useColumnTypeFromFileSchema = true + List<DataType> mergeSchemaAsDataTypes = AvroSchemaConverter.convertToDataType( + AvroInternalSchemaConverter.convert(mergeSchema, "tableName")).getChildren(); DataType[] fileFieldTypes = new DataType[queryFieldTypes.length]; for (int i = 0; i < queryFieldTypes.length; i++) { + // position of ChangedType in querySchema Integer posOfChangedType = posProxy.get(i); if (posOfChangedType == null) { // no type change for column; fileFieldType == queryFieldType fileFieldTypes[i] = queryFieldTypes[i]; } else { // type change detected for column; - DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType); - // update fileFieldType match the type found in fileSchema + DataType fileType = mergeSchemaAsDataTypes.get(posOfChangedType); + // update fileFieldType match the type found in mergeSchema fileFieldTypes[i] = fileType; int selectedPos = selectedFieldList.indexOf(i); if (selectedPos != -1) { @@ -162,34 +175,34 @@ public class InternalSchemaManager implements Serializable { /** * For columns that have been modified via the column renaming operation, the column name might be inconsistent - * between querySchema and fileSchema. + * between querySchema and mergeSchema. * <p> * As such, this method will identify all columns that have been renamed, and return a string array of column names - * corresponding to the column names found in the fileSchema. + * corresponding to the column names found in the mergeSchema. * <p> * This is done by: * <li>1. Get the rename mapping of -> (colNameFromNewSchema, colNameLastPartFromOldSchema)</li> * <li>2. For columns that have been renamed, replace them with the old column name</li> * - * @param fileSchema InternalSchema representation of the file's schema (acquired from commit/.schema metadata) + * @param mergeSchema InternalSchema representation of mergeSchema (prioritise use of fileSchemaType) that is used for reading base parquet files * @param queryFieldNames array containing the columns of a Hudi Flink table - * @return String array containing column names corresponding to the column names found in the fileSchema + * @return String array containing column names corresponding to the column names found in the mergeSchema * * @see InternalSchemaUtils#collectRenameCols(InternalSchema, InternalSchema) */ - String[] getFileFieldNames(InternalSchema fileSchema, String[] queryFieldNames) { + String[] getMergeFieldNames(InternalSchema mergeSchema, String[] queryFieldNames) { Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema cannot be empty"); - Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema cannot be empty"); + Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema cannot be empty"); - Map<String, String> renamedCols = InternalSchemaUtils.collectRenameCols(fileSchema, querySchema); + Map<String, String> renamedCols = InternalSchemaUtils.collectRenameCols(mergeSchema, querySchema); if (renamedCols.isEmpty()) { return queryFieldNames; } return Arrays.stream(queryFieldNames).map(name -> renamedCols.getOrDefault(name, name)).toArray(String[]::new); } - private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, String[] queryFieldNames) { - Map<Integer, Pair<Type, Type>> changedCols = InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema); + private Map<Integer, Integer> getPosProxy(InternalSchema mergeSchema, String[] queryFieldNames) { + Map<Integer, Pair<Type, Type>> changedCols = InternalSchemaUtils.collectTypeChangedCols(querySchema, mergeSchema); HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size()); List<String> fieldNameList = Arrays.asList(queryFieldNames); List<Types.Field> columns = querySchema.columns(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java index 8657f16ddc9..1bc02bcad40 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java @@ -50,8 +50,8 @@ public abstract class RecordIterators { Path path, long splitStart, long splitLength) throws IOException { - InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName()); - if (fileSchema.isEmptySchema()) { + InternalSchema mergeSchema = internalSchemaManager.getMergeSchema(path.getName()); + if (mergeSchema.isEmptySchema()) { return new ParquetSplitRecordIterator( ParquetSplitReaderUtil.genPartColumnarRowReader( utcTimestamp, @@ -66,14 +66,14 @@ public abstract class RecordIterators { splitStart, splitLength)); } else { - CastMap castMap = internalSchemaManager.getCastMap(fileSchema, fieldNames, fieldTypes, selectedFields); + CastMap castMap = internalSchemaManager.getCastMap(mergeSchema, fieldNames, fieldTypes, selectedFields); Option<RowDataProjection> castProjection = castMap.toRowDataProjection(selectedFields); ClosableIterator<RowData> itr = new ParquetSplitRecordIterator( ParquetSplitReaderUtil.genPartColumnarRowReader( utcTimestamp, caseSensitive, conf, - internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), // the reconciled field names + internalSchemaManager.getMergeFieldNames(mergeSchema, fieldNames), // the reconciled field names castMap.getFileFieldTypes(), // the reconciled field types partitionSpec, selectedFields,
