voonhous commented on code in PR #7761:
URL: https://github.com/apache/hudi/pull/7761#discussion_r1090132321
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java:
##########
@@ -93,60 +93,62 @@ public InternalSchema getQuerySchema() {
return querySchema;
}
- InternalSchema getFileSchema(String fileName) {
+ InternalSchema getMergeSchema(String fileName) {
if (querySchema.isEmptySchema()) {
return querySchema;
}
long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
- InternalSchema fileSchemaUnmerged =
InternalSchemaCache.getInternalSchemaByVersionId(
+ InternalSchema fileSchema =
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, getHadoopConf(), validCommits);
- if (querySchema.equals(fileSchemaUnmerged)) {
+ if (querySchema.equals(fileSchema)) {
return InternalSchema.getEmptyInternalSchema();
}
- return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true,
true).mergeSchema();
+ return new InternalSchemaMerger(fileSchema, querySchema, true,
true).mergeSchema();
}
/**
- * This method returns a mapping of columns that have type inconsistencies
between the fileSchema and querySchema.
+ * This method returns a mapping of columns that have type inconsistencies
between the mergeSchema and querySchema.
* This is done by:
* <li>1. Finding the columns with type changes</li>
* <li>2. Get a map storing the index of these columns with type changes;
Map of -> (colIdxInQueryFieldNames, colIdxInQuerySchema)</li>
* <li>3. For each selectedField with type changes, build a castMap
containing the cast/conversion details;
* Map of -> (selectedPos, Cast([from] fileType, [to] queryType))</li>
*
- * @param fileSchema InternalSchema representation of the file's schema
(acquired from commit/.schema metadata)
+ * @param mergeSchema InternalSchema representation of mergeSchema
(prioritise use of fileSchemaType) that is used for reading base parquet files
* @param queryFieldNames array containing the columns of a Hudi Flink table
* @param queryFieldTypes array containing the field types of the columns of
a Hudi Flink table
* @param selectedFields array containing the index of the columns of
interest required (indexes are based on queryFieldNames and queryFieldTypes)
* @return a castMap containing the information of how to cast a
selectedField from the fileType to queryType.
*
* @see CastMap
*/
- CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames,
DataType[] queryFieldTypes, int[] selectedFields) {
+ CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames,
DataType[] queryFieldTypes, int[] selectedFields) {
Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema
cannot be empty");
- Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema
cannot be empty");
+ Preconditions.checkArgument(!mergeSchema.isEmptySchema(), "mergeSchema
cannot be empty");
CastMap castMap = new CastMap();
// map storing the indexes of columns with type changes Map of ->
(colIdxInQueryFieldNames, colIdxInQuerySchema)
- Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames);
+ Map<Integer, Integer> posProxy = getPosProxy(mergeSchema, queryFieldNames);
if (posProxy.isEmpty()) {
// no type changes
castMap.setFileFieldTypes(queryFieldTypes);
return castMap;
}
List<Integer> selectedFieldList =
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
- List<DataType> fileSchemaAsDataTypes =
AvroSchemaConverter.convertToDataType(
- AvroInternalSchemaConverter.convert(fileSchema,
"tableName")).getChildren();
+ // mergeSchema is built with useColumnTypeFromFileSchema = true
Review Comment:
Hmmm, I added this comment when tracing the code to remind myself WHY
fetching `fileFieldTypes` from `mergeSchema` works.
On top of that, from line 106, it is pretty clear that the code is building
the `mergeSchema` with `useColumnTypeFromFileSchema = true`.
Moving this comment from line 138 to line 106 would change the intention
behind the comment.
If this line is at line 106, it would be telling the readers "WHAT" the code
is doing.
If it is at line 138, it would be telling the readers "WHY" fetching file
types from `mergeSchema` works.
So, i don't really think moving this comment is necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]