nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1027179438
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
}
@Override
- public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
- HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException
{
- final boolean externalSchemaTransformation =
table.getConfig().shouldUseExternalSchemaTransformation();
- Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+ public void runMerge(HoodieTable<?, ?, ?, ?> table,
+ HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws
IOException {
+ HoodieWriteConfig writeConfig = table.getConfig();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
- final GenericDatumWriter<GenericRecord> gWriter;
- final GenericDatumReader<GenericRecord> gReader;
- Schema readSchema;
- if (externalSchemaTransformation ||
baseFile.getBootstrapBaseFile().isPresent()) {
- readSchema =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(),
mergeHandle.getOldFilePath()).getSchema();
- gWriter = new GenericDatumWriter<>(readSchema);
- gReader = new GenericDatumReader<>(readSchema,
mergeHandle.getWriterSchemaWithMetaFields());
- } else {
- gReader = null;
- gWriter = null;
- readSchema = mergeHandle.getWriterSchemaWithMetaFields();
- }
+ Configuration hadoopConf = new Configuration(table.getHadoopConf());
+ HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
- HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
- HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile,
mergeHandle.getOldFilePath());
+ Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+ Schema readerSchema = reader.getSchema();
- Option<InternalSchema> querySchemaOpt =
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
- boolean needToReWriteRecord = false;
- Map<String, String> renameCols = new HashMap<>();
- // TODO support bootstrap
- if (querySchemaOpt.isPresent() &&
!baseFile.getBootstrapBaseFile().isPresent()) {
- // check implicitly add columns, and position reorder(spark sql may
change cols order)
- InternalSchema querySchema =
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
- long commitInstantTime =
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
- InternalSchema writeInternalSchema =
InternalSchemaCache.searchSchemaAndCache(commitInstantTime,
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
- if (writeInternalSchema.isEmptySchema()) {
- throw new HoodieException(String.format("cannot find file schema for
current commit %s", commitInstantTime));
- }
- List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
- List<String> colNamesFromWriteSchema =
writeInternalSchema.getAllColsFullName();
- List<String> sameCols = colNamesFromWriteSchema.stream()
- .filter(f -> colNamesFromQuerySchema.contains(f)
- && writeInternalSchema.findIdByName(f) ==
querySchema.findIdByName(f)
- && writeInternalSchema.findIdByName(f) != -1
- &&
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
- readSchema = AvroInternalSchemaConverter
- .convert(new InternalSchemaMerger(writeInternalSchema, querySchema,
true, false, false).mergeSchema(), readSchema.getName());
- Schema writeSchemaFromFile =
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
- needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
- ||
SchemaCompatibility.checkReaderWriterCompatibility(readSchema,
writeSchemaFromFile).getType() ==
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
- if (needToReWriteRecord) {
- renameCols =
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
- }
- }
+ // In case Advanced Schema Evolution is enabled we might need to rewrite
currently
+ // persisted records to adhere to an evolved schema
+ Option<Function<GenericRecord, GenericRecord>>
schemaEvolutionTransformerOpt =
+ composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig,
table.getMetaClient());
+
+ // Check whether the writer schema is simply a projection of the file's
one, ie
+ // - Its field-set is a proper subset (of the reader schema)
+ // - There's no schema evolution transformation necessary
+ boolean isPureProjection = isProjectionOf(readerSchema, writerSchema)
+ && !schemaEvolutionTransformerOpt.isPresent();
+ // Check whether we will need to rewrite target (already merged) records
into the
+ // writer's schema
+ boolean shouldRewriteInWriterSchema =
writeConfig.shouldUseExternalSchemaTransformation()
+ || !isPureProjection
+ || baseFile.getBootstrapBaseFile().isPresent();
+
+ HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try {
- final Iterator<GenericRecord> readerIterator;
+ Iterator<GenericRecord> recordIterator;
+
+ // In case writer's schema is simply a projection of the reader's one we
can read
+ // the records in the projected schema directly
+ ClosableIterator<GenericRecord> baseFileRecordIterator =
+ reader.getRecordIterator(isPureProjection ? writerSchema :
readerSchema);
Review Comment:
if we maintain the same terminology, we are good.
thought we might have some confusion in HoodieMergeHandler, since we read
from one file and write into another file. guess the read from old file is
handled w/n MergeHelper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]