alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1025788061


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java:
##########
@@ -18,91 +18,47 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.utils.MergingIterator;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
 /**
  * Helper to read records from previous version of base file and run Merge.
  */
-public abstract class BaseMergeHelper<T extends HoodieRecordPayload, I, K, O> {
+public abstract class BaseMergeHelper {
 
   /**
    * Read records from previous version of base file and merge.
    * @param table Hoodie Table
    * @param upsertHandle Merge Handle
    * @throws IOException in case of error
    */
-  public abstract void runMerge(HoodieTable<T, I, K, O> table, 
HoodieMergeHandle<T, I, K, O> upsertHandle) throws IOException;
-
-  protected GenericRecord 
transformRecordBasedOnNewSchema(GenericDatumReader<GenericRecord> gReader, 
GenericDatumWriter<GenericRecord> gWriter,

Review Comment:
   1. Yes, this method was rewriting records, but it was using ser/de for that. 
Now, records are being rewritten using `rewriteRecord*` utils.
   
   2. `composeSchemaEvolutionTransformer` only refers to schema evolution case. 
Bootstrap case is handled as before (it wasn't calling into this method 
previously)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
+    // persisted records to adhere to an evolved schema
+    Option<Function<GenericRecord, GenericRecord>> 
schemaEvolutionTransformerOpt =
+        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, 
table.getMetaClient());
+
+    // Check whether the writer schema is simply a projection of the file's 
one, ie
+    //   - Its field-set is a proper subset (of the reader schema)
+    //   - There's no schema evolution transformation necessary
+    boolean isPureProjection = isProjectionOf(readerSchema, writerSchema)
+        && !schemaEvolutionTransformerOpt.isPresent();
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isPureProjection
+        || baseFile.getBootstrapBaseFile().isPresent();
+
+    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
 
     try {
-      final Iterator<GenericRecord> readerIterator;
+      Iterator<GenericRecord> recordIterator;
+
+      // In case writer's schema is simply a projection of the reader's one we 
can read
+      // the records in the projected schema directly
+      ClosableIterator<GenericRecord> baseFileRecordIterator =
+          reader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+        Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+        recordIterator = getMergingIterator(table, mergeHandle, 
bootstrapFilePath, baseFileRecordIterator);
+      } else if (schemaEvolutionTransformerOpt.isPresent()) {
+        recordIterator = new MappingIterator<>(baseFileRecordIterator,
+            schemaEvolutionTransformerOpt.get());
       } else {
-        if (needToReWriteRecord) {
-          readerIterator = 
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), 
readSchema, renameCols);
-        } else {
-          readerIterator = reader.getRecordIterator(readSchema);
-        }
+        recordIterator = baseFileRecordIterator;
       }
 
-      ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
-      ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
-      wrapper = QueueBasedExecutorFactory.create(table.getConfig(), 
readerIterator, new UpdateHandler(mergeHandle), record -> {
-        if (!externalSchemaTransformation) {
+      wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator, 
new UpdateHandler(mergeHandle), record -> {
+        if (shouldRewriteInWriterSchema) {
+          return rewriteRecordWithNewSchema(record, writerSchema);
+        } else {

Review Comment:
   Yes, it was removed intentionally. Now, rewriting of records is controlled 
by a single flag `shouldRewriteInWriterSchema`



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -867,12 +870,8 @@ private static String createFullName(Deque<String> 
fieldNames) {
   }
 
   private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
-    Schema realOldSchema = oldSchema;
-    if (realOldSchema.getType() == UNION) {
-      realOldSchema = getActualSchemaFromUnion(oldSchema, oldValue);
-    }
-    if (realOldSchema.getType() == newSchema.getType()) {
-      switch (realOldSchema.getType()) {
+    if (oldSchema.getType() == newSchema.getType()) {
+      switch (oldSchema.getType()) {

Review Comment:
   There's no need for this anymore



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -76,6 +101,19 @@ public static Schema resolveUnionSchema(Schema schema, 
String fieldSchemaFullNam
     return nonNullType;
   }
 
+  /**
+   * Returns true in case provided {@link Schema} is nullable (ie accepting 
null values),
+   * returns false otherwise
+   */
+  public static boolean isNullable(Schema schema) {
+    if (schema.getType() != Schema.Type.UNION) {
+      return false;
+    }
+
+    List<Schema> innerTypes = schema.getTypes();
+    return innerTypes.size() > 1 && innerTypes.stream().anyMatch(it -> 
it.getType() == Schema.Type.NULL);
+  }
+

Review Comment:
   Good call, would prefer to take this as a follow-up to not overload this PR 
even further



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -31,6 +31,31 @@ public class AvroSchemaUtils {
 
   private AvroSchemaUtils() {}
 
+  /**
+   * Generates fully-qualified name for the Avro's schema based on the Table's 
name
+   *
+   * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+   *       This method should not change for compatibility reasons as older 
versions
+   *       of Avro might be comparing fully-qualified names rather than just 
the record
+   *       names
+   */
+  public static String getAvroRecordQualifiedName(String tableName) {
+    String sanitizedTableName = HoodieAvroUtils.sanitizeName(tableName);
+    return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
+  }
+
+  // TODO java-doc, test
+  public static boolean isProjectionOf(Schema sourceSchema, Schema 
targetSchema) {
+    for (Schema.Field targetField : targetSchema.getFields()) {
+      Schema.Field sourceField = sourceSchema.getField(targetField.name());

Review Comment:
   Not strictly required here, but a good catch! Will update to make sure it's 
handled as well



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java:
##########
@@ -87,4 +96,44 @@ public void close() {
   public long getTotalRecords() {
     return parquetUtils.getRowCount(conf, path);
   }
+
+  private static Configuration tryOverrideDefaultConfigs(Configuration conf) {
+    // NOTE: Parquet uses elaborate encoding of the arrays/lists with optional 
types,
+    //       following structure will be representing such list in Parquet:
+    //
+    //       optional group tip_history (LIST) {
+    //         repeated group list {
+    //           optional group element {
+    //             optional double amount;
+    //             optional binary currency (STRING);
+    //           }
+    //         }
+    //       }
+    //
+    //       To designate list, special logical-type annotation (`LIST`) is 
used,
+    //       as well additional [[GroupType]] with the name "list" is wrapping
+    //       the "element" type (representing record stored inside the list 
itself).
+    //
+    //       By default [[AvroSchemaConverter]] would be interpreting any 
{@code REPEATED}
+    //       Parquet [[GroupType]] as list, skipping the checks whether 
additional [[GroupType]]
+    //       (named "list") is actually wrapping the "element" type therefore 
incorrectly
+    //       converting it into an additional record-wrapper (instead of 
simply omitting it).
+    //       To work this around we're
+    //          - Checking whether 
[[AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS]] has been
+    //          explicitly set in the Hadoop Config
+    //          - In case it's not, we override the default value from "true" 
to "false"
+    //
+    if (conf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS) == null) {

Review Comment:
   This is new



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
+    // persisted records to adhere to an evolved schema
+    Option<Function<GenericRecord, GenericRecord>> 
schemaEvolutionTransformerOpt =
+        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, 
table.getMetaClient());
+
+    // Check whether the writer schema is simply a projection of the file's 
one, ie
+    //   - Its field-set is a proper subset (of the reader schema)
+    //   - There's no schema evolution transformation necessary
+    boolean isPureProjection = isProjectionOf(readerSchema, writerSchema)
+        && !schemaEvolutionTransformerOpt.isPresent();
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isPureProjection
+        || baseFile.getBootstrapBaseFile().isPresent();
+
+    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
 
     try {
-      final Iterator<GenericRecord> readerIterator;
+      Iterator<GenericRecord> recordIterator;
+
+      // In case writer's schema is simply a projection of the reader's one we 
can read
+      // the records in the projected schema directly
+      ClosableIterator<GenericRecord> baseFileRecordIterator =
+          reader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);

Review Comment:
   I didn't add the docs, just b/c now these vars are immutable and are easily 
traceable (L81-82).
   
   In this context:
    - Reader is the schema with which we're reading the existing file
    - Writer schema is the schema with which we're gonna be writing files



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -515,7 +521,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), 
// set the default parallelism to 200 for sql
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
-        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
+        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+
+        // NOTE: We have to explicitly override following configs to make sure 
no schema validation is performed
+        //       as schema of the incoming dataset might be diverging from the 
table's schema (full schemas'
+        //       compatibility b/w table's schema and incoming one is not 
necessary in this case since we can
+        //       be cherry-picking only selected columns from the incoming 
dataset to be inserted/updated in the
+        //       target table, ie partially updating)
+        AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
+        RECONCILE_SCHEMA.key -> "false",

Review Comment:
   Yeah, we can't support reconciliation due to the nature of the feature 
(incoming batch is essentially an inner-join of existing table and the incoming 
records)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to