nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1025339063


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
+    // persisted records to adhere to an evolved schema
+    Option<Function<GenericRecord, GenericRecord>> 
schemaEvolutionTransformerOpt =
+        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, 
table.getMetaClient());
+
+    // Check whether the writer schema is simply a projection of the file's 
one, ie
+    //   - Its field-set is a proper subset (of the reader schema)
+    //   - There's no schema evolution transformation necessary
+    boolean isPureProjection = isProjectionOf(readerSchema, writerSchema)
+        && !schemaEvolutionTransformerOpt.isPresent();
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isPureProjection
+        || baseFile.getBootstrapBaseFile().isPresent();
+
+    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
 
     try {
-      final Iterator<GenericRecord> readerIterator;
+      Iterator<GenericRecord> recordIterator;
+
+      // In case writer's schema is simply a projection of the reader's one we 
can read
+      // the records in the projected schema directly
+      ClosableIterator<GenericRecord> baseFileRecordIterator =
+          reader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);

Review Comment:
   can we add docs on whats reader schema referring to and whats writer schema 
referring to.
   wrt reading one avro file eg, usual nomenclature is, writer schema refers to 
the schema w/ which the file was written, and reader schema refers to the new 
schema w/ which we are trying to read the file back. 
   
   here we have 3 things in play. 
   we are reading an existing file using schema Y which could have been written 
using schema X, and we are passing it to UpsertPartitioner w/ possible schema 
evolution. 
   
   So, is schema X the writer schema and schema Y refers to reader schema? 
   may be we should add docs whereever we deal w/ both these schema and 
maintain uniformity across the code. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
+    // persisted records to adhere to an evolved schema
+    Option<Function<GenericRecord, GenericRecord>> 
schemaEvolutionTransformerOpt =
+        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, 
table.getMetaClient());
+
+    // Check whether the writer schema is simply a projection of the file's 
one, ie
+    //   - Its field-set is a proper subset (of the reader schema)
+    //   - There's no schema evolution transformation necessary
+    boolean isPureProjection = isProjectionOf(readerSchema, writerSchema)
+        && !schemaEvolutionTransformerOpt.isPresent();
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isPureProjection
+        || baseFile.getBootstrapBaseFile().isPresent();
+
+    HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
 
     try {
-      final Iterator<GenericRecord> readerIterator;
+      Iterator<GenericRecord> recordIterator;
+
+      // In case writer's schema is simply a projection of the reader's one we 
can read
+      // the records in the projected schema directly
+      ClosableIterator<GenericRecord> baseFileRecordIterator =
+          reader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);
       if (baseFile.getBootstrapBaseFile().isPresent()) {
-        readerIterator = getMergingIterator(table, mergeHandle, baseFile, 
reader, readSchema, externalSchemaTransformation);
+        Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+        recordIterator = getMergingIterator(table, mergeHandle, 
bootstrapFilePath, baseFileRecordIterator);
+      } else if (schemaEvolutionTransformerOpt.isPresent()) {
+        recordIterator = new MappingIterator<>(baseFileRecordIterator,
+            schemaEvolutionTransformerOpt.get());
       } else {
-        if (needToReWriteRecord) {
-          readerIterator = 
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), 
readSchema, renameCols);
-        } else {
-          readerIterator = reader.getRecordIterator(readSchema);
-        }
+        recordIterator = baseFileRecordIterator;
       }
 
-      ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
-      ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
-      wrapper = QueueBasedExecutorFactory.create(table.getConfig(), 
readerIterator, new UpdateHandler(mergeHandle), record -> {
-        if (!externalSchemaTransformation) {
+      wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator, 
new UpdateHandler(mergeHandle), record -> {
+        if (shouldRewriteInWriterSchema) {
+          return rewriteRecordWithNewSchema(record, writerSchema);
+        } else {

Review Comment:
   what hpnd to transformRecordBasedOnNewSchema call when external schema 
transformation was enabled (as per master). I don't see it in this patch. was 
it removed intentionally. can you help clarify please.
   
   ```
         wrapper = new 
BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), 
readerIterator,
             new UpdateHandler(mergeHandle), record -> {
           if (!externalSchemaTransformation) {
             return record;
           }
           return transformRecordBasedOnNewSchema(gReader, gWriter, 
encoderCache, decoderCache, (GenericRecord) record);
         }, table.getPreExecuteRunnable());
   ```
   
   



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -515,7 +521,16 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), 
// set the default parallelism to 200 for sql
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
-        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
+        SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+
+        // NOTE: We have to explicitly override following configs to make sure 
no schema validation is performed
+        //       as schema of the incoming dataset might be diverging from the 
table's schema (full schemas'
+        //       compatibility b/w table's schema and incoming one is not 
necessary in this case since we can
+        //       be cherry-picking only selected columns from the incoming 
dataset to be inserted/updated in the
+        //       target table, ie partially updating)
+        AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
+        RECONCILE_SCHEMA.key -> "false",

Review Comment:
   does this mean that, w/ MERGE INTO, we don't support reconciling schema ? 
   I am w/ you on the intent. trying to gauge what all are not supported w/ 
MERGE INTO wrt schema evolution. do we expect users to always go through ALTER 
TABLE then? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -74,90 +75,103 @@ public static HoodieMergeHelper newInstance() {
   @Override
   public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
                        HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+    HoodieWriteConfig writeConfig = table.getConfig();

Review Comment:
   +1 def looks better now. 



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java:
##########
@@ -87,4 +96,44 @@ public void close() {
   public long getTotalRecords() {
     return parquetUtils.getRowCount(conf, path);
   }
+
+  private static Configuration tryOverrideDefaultConfigs(Configuration conf) {
+    // NOTE: Parquet uses elaborate encoding of the arrays/lists with optional 
types,
+    //       following structure will be representing such list in Parquet:
+    //
+    //       optional group tip_history (LIST) {
+    //         repeated group list {
+    //           optional group element {
+    //             optional double amount;
+    //             optional binary currency (STRING);
+    //           }
+    //         }
+    //       }
+    //
+    //       To designate list, special logical-type annotation (`LIST`) is 
used,
+    //       as well additional [[GroupType]] with the name "list" is wrapping
+    //       the "element" type (representing record stored inside the list 
itself).
+    //
+    //       By default [[AvroSchemaConverter]] would be interpreting any 
{@code REPEATED}
+    //       Parquet [[GroupType]] as list, skipping the checks whether 
additional [[GroupType]]
+    //       (named "list") is actually wrapping the "element" type therefore 
incorrectly
+    //       converting it into an additional record-wrapper (instead of 
simply omitting it).
+    //       To work this around we're
+    //          - Checking whether 
[[AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS]] has been
+    //          explicitly set in the Hadoop Config
+    //          - In case it's not, we override the default value from "true" 
to "false"
+    //
+    if (conf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS) == null) {

Review Comment:
   is this something new we are adding in this patch? or was in moved from else 
where. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java:
##########
@@ -18,91 +18,47 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.utils.MergingIterator;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
 /**
  * Helper to read records from previous version of base file and run Merge.
  */
-public abstract class BaseMergeHelper<T extends HoodieRecordPayload, I, K, O> {
+public abstract class BaseMergeHelper {
 
   /**
    * Read records from previous version of base file and merge.
    * @param table Hoodie Table
    * @param upsertHandle Merge Handle
    * @throws IOException in case of error
    */
-  public abstract void runMerge(HoodieTable<T, I, K, O> table, 
HoodieMergeHandle<T, I, K, O> upsertHandle) throws IOException;
-
-  protected GenericRecord 
transformRecordBasedOnNewSchema(GenericDatumReader<GenericRecord> gReader, 
GenericDatumWriter<GenericRecord> gWriter,

Review Comment:
   what was the purpose of this method? 
   my understanding is. 
   incase of Bootstrapping feature, we need to read records from old file using 
the schema w/ which it was written and then read back using newer schema thats 
of interest to us. 
   
   also, 
   Within composeSchemaEvolutionTransformer() as per this patch, we could 
return Option.empty() when there is no InternalSchema even for bootstrap 
scenarios. So, in that case, there is no rewrite of records happening. can you 
help throw some light please.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to