xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r1014978749
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -78,12 +90,41 @@ public void runMerge(HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<Hood
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile,
mergeHandle.getOldFilePath());
+
+ Option<InternalSchema> querySchemaOpt =
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
+ boolean needToReWriteRecord = false;
+ // TODO support bootstrap
+ if (querySchemaOpt.isPresent() &&
!baseFile.getBootstrapBaseFile().isPresent()) {
+ // check implicitly add columns, and position reorder(spark sql may
change cols order)
+ InternalSchema querySchema =
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(readSchema,
querySchemaOpt.get(), true);
+ long commitInstantTime =
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
+ InternalSchema writeInternalSchema =
InternalSchemaCache.searchSchemaAndCache(commitInstantTime,
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
+ if (writeInternalSchema.isEmptySchema()) {
+ throw new HoodieException(String.format("cannot find file schema for
current commit %s", commitInstantTime));
+ }
+ List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
+ List<String> colNamesFromWriteSchema =
writeInternalSchema.getAllColsFullName();
+ List<String> sameCols = colNamesFromWriteSchema.stream()
+ .filter(f -> colNamesFromQuerySchema.contains(f)
+ && writeInternalSchema.findIdByName(f) ==
querySchema.findIdByName(f)
+ && writeInternalSchema.findIdByName(f) != -1
+ &&
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
+ readSchema = AvroInternalSchemaConverter.convert(new
InternalSchemaMerger(writeInternalSchema, querySchema, true,
false).mergeSchema(), readSchema.getName());
+ Schema writeSchemaFromFile =
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
Review Comment:
line112: we need to convert interalSchema to avroSchema and pass it to
readSchema, and HoodieAvroUtils.rewriteRecordWithNewSchema will use reaSchema
to get correct GenericRecord from parquet file.
eg: old parquet schema is: a int, b double , and genericRecord1 is data
read from old parquet
but now incoming schema is: a long, c int, b string, and genericRecord2
is incoming data
we cannot merge genericRecord1 and genericRecord2, so we need rewrite
genericRecord1 with new schema: a long, c int, b string
line113 is only used to check SchemaCompatibilityType
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]