xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r1016055346
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -78,12 +90,41 @@ public void runMerge(HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<Hood
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile,
mergeHandle.getOldFilePath());
+
+ Option<InternalSchema> querySchemaOpt =
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
+ boolean needToReWriteRecord = false;
+ // TODO support bootstrap
+ if (querySchemaOpt.isPresent() &&
!baseFile.getBootstrapBaseFile().isPresent()) {
+ // check implicitly add columns, and position reorder(spark sql may
change cols order)
+ InternalSchema querySchema =
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(readSchema,
querySchemaOpt.get(), true);
+ long commitInstantTime =
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
+ InternalSchema writeInternalSchema =
InternalSchemaCache.searchSchemaAndCache(commitInstantTime,
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
+ if (writeInternalSchema.isEmptySchema()) {
+ throw new HoodieException(String.format("cannot find file schema for
current commit %s", commitInstantTime));
+ }
+ List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
+ List<String> colNamesFromWriteSchema =
writeInternalSchema.getAllColsFullName();
+ List<String> sameCols = colNamesFromWriteSchema.stream()
+ .filter(f -> colNamesFromQuerySchema.contains(f)
+ && writeInternalSchema.findIdByName(f) ==
querySchema.findIdByName(f)
+ && writeInternalSchema.findIdByName(f) != -1
+ &&
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
+ readSchema = AvroInternalSchemaConverter.convert(new
InternalSchemaMerger(writeInternalSchema, querySchema, true,
false).mergeSchema(), readSchema.getName());
+ Schema writeSchemaFromFile =
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
Review Comment:
good question!
question1:
S4 is not same as S1. S4 is the really schema from parquet file, if we do
lots of DDL opertion on current table, S4 and s1 may differ greatly.
eg:
tableA: a int, b string, c double and there exist three files in this
table: f1, f2, f3
1) drop column from tableA and add new column d, and then we update tableA,
but we only update f2,and f3, f1 is not touched
now schema
```
tableA: a int, b string, d long.
S1: a int, b string, d long
S4 from f1 is: a int, b string , c double
```
question2:
no we supporting delete of columns, Let's use the above example to
illustrate:
line 112, we merge S3 and S4 to get the final read Schema,
```
tableA: a int, b string, d long.
S3: a int, b string, d long
S4 from f1 is: a int, b string , c double
merge S3 and S4: a int, b string, d long column c is dropped,
```
the values read from parquet f1 will be
```
a b d
1 'test' null
```
d is null, since f1 is not contains column d. column c is dropped, since
current table is not contains column c.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]