alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1030809543


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
+    // persisted records to adhere to an evolved schema
+    Option<Function<GenericRecord, GenericRecord>> 
schemaEvolutionTransformerOpt =
+        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, 
table.getMetaClient());
+
+    // Check whether the writer schema is simply a projection of the file's 
one, ie
+    //   - Its field-set is a proper subset (of the reader schema)
+    //   - There's no schema evolution transformation necessary
+    boolean isPureProjection = isStrictProjectionOf(readerSchema, writerSchema)
+        && !schemaEvolutionTransformerOpt.isPresent();
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isPureProjection
+        || baseFile.getBootstrapBaseFile().isPresent();
+
+    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
 
     try {
-      final Iterator<GenericRecord> readerIterator;
+      Iterator<GenericRecord> recordIterator;
+
+      // In case writer's schema is simply a projection of the reader's one we 
can read
+      // the records in the projected schema directly
+      ClosableIterator<GenericRecord> baseFileRecordIterator =
+          reader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+        Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+        recordIterator = getMergingIterator(table, mergeHandle, 
bootstrapFilePath, baseFileRecordIterator);
+      } else if (schemaEvolutionTransformerOpt.isPresent()) {
+        recordIterator = new MappingIterator<>(baseFileRecordIterator,
+            schemaEvolutionTransformerOpt.get());
       } else {
-        if (needToReWriteRecord) {
-          readerIterator = 
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), 
readSchema, renameCols);
-        } else {
-          readerIterator = reader.getRecordIterator(readSchema);
-        }
+        recordIterator = baseFileRecordIterator;
       }
 
-      ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
-      ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
-      wrapper = QueueBasedExecutorFactory.create(table.getConfig(), 
readerIterator, new UpdateHandler(mergeHandle), record -> {
-        if (!externalSchemaTransformation) {
+      wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator, 
new UpdateHandler(mergeHandle), record -> {
+        if (shouldRewriteInWriterSchema) {
+          return rewriteRecordWithNewSchema(record, writerSchema);
+        } else {
           return record;
         }
-        return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, 
decoderCache, record);
       }, table.getPreExecuteRunnable());
-      
+
       wrapper.execute();
     } catch (Exception e) {
       throw new HoodieException(e);
     } finally {
       // HUDI-2875: mergeHandle is not thread safe, we should totally 
terminate record inputting
       // and executor firstly and then close mergeHandle.
-      if (reader != null) {
-        reader.close();
-      }
+      reader.close();
       if (null != wrapper) {
         wrapper.shutdownNow();
         wrapper.awaitTermination();
       }
       mergeHandle.close();
     }
   }
+
+  private Option<Function<GenericRecord, GenericRecord>> 
composeSchemaEvolutionTransformer(Schema writerSchema,
+                                                                               
            HoodieBaseFile baseFile,

Review Comment:
   This method is how Schema Evolution is handled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to