alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1030809543


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
+    // persisted records to adhere to an evolved schema
+    Option<Function<GenericRecord, GenericRecord>> 
schemaEvolutionTransformerOpt =
+        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, 
table.getMetaClient());
+
+    // Check whether the writer schema is simply a projection of the file's 
one, ie
+    //   - Its field-set is a proper subset (of the reader schema)
+    //   - There's no schema evolution transformation necessary
+    boolean isPureProjection = isStrictProjectionOf(readerSchema, writerSchema)
+        && !schemaEvolutionTransformerOpt.isPresent();
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isPureProjection
+        || baseFile.getBootstrapBaseFile().isPresent();
+
+    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
 
     try {
-      final Iterator<GenericRecord> readerIterator;
+      Iterator<GenericRecord> recordIterator;
+
+      // In case writer's schema is simply a projection of the reader's one we 
can read
+      // the records in the projected schema directly
+      ClosableIterator<GenericRecord> baseFileRecordIterator =
+          reader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+        Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+        recordIterator = getMergingIterator(table, mergeHandle, 
bootstrapFilePath, baseFileRecordIterator);
+      } else if (schemaEvolutionTransformerOpt.isPresent()) {
+        recordIterator = new MappingIterator<>(baseFileRecordIterator,
+            schemaEvolutionTransformerOpt.get());
       } else {
-        if (needToReWriteRecord) {
-          readerIterator = 
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), 
readSchema, renameCols);
-        } else {
-          readerIterator = reader.getRecordIterator(readSchema);
-        }
+        recordIterator = baseFileRecordIterator;
       }
 
-      ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
-      ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
-      wrapper = QueueBasedExecutorFactory.create(table.getConfig(), 
readerIterator, new UpdateHandler(mergeHandle), record -> {
-        if (!externalSchemaTransformation) {
+      wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator, 
new UpdateHandler(mergeHandle), record -> {
+        if (shouldRewriteInWriterSchema) {
+          return rewriteRecordWithNewSchema(record, writerSchema);
+        } else {
           return record;
         }
-        return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, 
decoderCache, record);
       }, table.getPreExecuteRunnable());
-      
+
       wrapper.execute();
     } catch (Exception e) {
       throw new HoodieException(e);
     } finally {
       // HUDI-2875: mergeHandle is not thread safe, we should totally 
terminate record inputting
       // and executor firstly and then close mergeHandle.
-      if (reader != null) {
-        reader.close();
-      }
+      reader.close();
       if (null != wrapper) {
         wrapper.shutdownNow();
         wrapper.awaitTermination();
       }
       mergeHandle.close();
     }
   }
+
+  private Option<Function<GenericRecord, GenericRecord>> 
composeSchemaEvolutionTransformer(Schema writerSchema,
+                                                                               
            HoodieBaseFile baseFile,

Review Comment:
   There's a comment explaining what this is used for where it's used (L86)
   LMK if you'd want to clarify that



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java:
##########
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.hudi.common.util.Either;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * Evaluate the compatibility between a reader schema and a writer schema. A
+ * reader and a writer schema are declared compatible if all datum instances of
+ * the writer schema can be successfully decoded using the specified reader
+ * schema.
+ *
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ *       This code is borrowed from Avro 1.10, with the following 
modifications:
+ *       <ol>
+ *         <li>Compatibility checks ignore schema name, unless schema is held 
inside
+ *         a union</li>
+ *       </ol>
+ *
+ */
+public class AvroSchemaCompatibility {

Review Comment:
   I'm not a big fan of such borrowings myself, but unfortunately there's no 
other way for us to extend it to modify it's behavior: we need to avoid 
namespace matching, b/c we do numerous rounds of conversions Avro > Spark  > 
InternalSchema and back and b/c of that we can't reconstruct the namespaces 
that sometimes used/gen'd by Avro.



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -44,31 +61,65 @@ public static String getAvroRecordQualifiedName(String 
tableName) {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
+  // TODO java-doc
+  public static boolean isCompatibleProjectionOf(Schema sourceSchema, Schema 
targetSchema) {
+    return isProjectionOfInternal(sourceSchema, targetSchema, 
AvroSchemaUtils::isSchemaCompatible);
+  }
+
   /**
-   * Validate whether the {@code targetSchema} is a projection of {@code 
sourceSchema}.
+   * Validate whether the {@code targetSchema} is a strict projection of 
{@code sourceSchema}.
    *
-   * Schema B is considered a projection of schema A iff
+   * Schema B is considered a strict projection of schema A iff
    * <ol>
    *   <li>Schemas A and B are equal, or</li>
+   *   <li>Schemas A and B are array schemas and element-type of B is a strict 
projection
+   *   of the element-type of A, or</li>
+   *   <li>Schemas A and B are map schemas and value-type of B is a strict 
projection
+   *   of the value-type of A, or</li>
+   *   <li>Schemas A and B are union schemas (of the same size) and every 
element-type of B
+   *   is a strict projection of the corresponding element-type of A, or</li>
    *   <li>Schemas A and B are record schemas and every field of the record B 
has corresponding
    *   counterpart (w/ the same name) in the schema A, such that the schema of 
the field of the schema
-   *   B is also a projection of the A field's schema</li>
+   *   B is also a strict projection of the A field's schema</li>
    * </ol>
    */
-  public static boolean isProjectionOf(Schema sourceSchema, Schema 
targetSchema) {
-    if (sourceSchema.getType() != Schema.Type.RECORD
-        || targetSchema.getType() != Schema.Type.RECORD) {
-      return Objects.equals(sourceSchema, targetSchema);
-    }
+  public static boolean isStrictProjectionOf(Schema sourceSchema, Schema 
targetSchema) {
+    return isProjectionOfInternal(sourceSchema, targetSchema, Objects::equals);
+  }
 
-    for (Schema.Field targetField : targetSchema.getFields()) {
-      Schema.Field sourceField = sourceSchema.getField(targetField.name());
-      if (sourceField == null || !isProjectionOf(sourceField.schema(), 
targetField.schema())) {
-        return false;
+  private static boolean isProjectionOfInternal(Schema sourceSchema,
+                                                Schema targetSchema,
+                                                BiFunction<Schema, Schema, 
Boolean> atomicTypeEqualityPredicate) {
+    if (sourceSchema.getType() == targetSchema.getType()) {
+      if (sourceSchema.getType() == Schema.Type.RECORD) {
+        for (Schema.Field targetField : targetSchema.getFields()) {
+          Schema.Field sourceField = sourceSchema.getField(targetField.name());
+          if (sourceField == null || 
!isProjectionOfInternal(sourceField.schema(), targetField.schema(), 
atomicTypeEqualityPredicate)) {
+            return false;
+          }
+        }
+        return true;
+      } else if (sourceSchema.getType() == Schema.Type.ARRAY) {
+        return isProjectionOfInternal(sourceSchema.getElementType(), 
targetSchema.getElementType(), atomicTypeEqualityPredicate);
+      } else if (sourceSchema.getType() == Schema.Type.MAP) {
+        return isProjectionOfInternal(sourceSchema.getValueType(), 
targetSchema.getValueType(), atomicTypeEqualityPredicate);
+      } else if (sourceSchema.getType() == Schema.Type.UNION) {
+        List<Schema> sourceNestedSchemas = sourceSchema.getTypes();
+        List<Schema> targetNestedSchemas = targetSchema.getTypes();
+        if (sourceNestedSchemas.size() != targetNestedSchemas.size()) {
+          return false;
+        }
+
+        for (int i = 0; i < sourceNestedSchemas.size(); ++i) {

Review Comment:
   Yes. We can relax it in the future if there would be a use-case for it



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -243,6 +243,18 @@ private HoodieLogBlock readBlock() throws IOException {
     }
   }
 
+  private Option<Schema> getTargetReaderSchemaForBlock() {
+    // we should use write schema to read log file,
+    // since when we have done some DDL operation, the readerSchema maybe 
different from writeSchema, avro reader will throw exception.
+    // eg: origin writeSchema is: "a String, b double" then we add a new 
column now the readerSchema will be: "a string, c int, b double". it's wrong to 
use readerSchema to read old log file.
+    // after we read those record by writeSchema,  we rewrite those record 
with readerSchema in AbstractHoodieLogRecordReader
+    if (internalSchema.isEmptySchema()) {
+      return Option.ofNullable(this.readerSchema);
+    } else {
+      return Option.empty();

Review Comment:
   By minor evolutions i referred to schema evolutions that don't require 
enabling support for Comprehensive Schema Evolution (aka "schema.on.read")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to