nsivabalan commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2277620789


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -492,10 +487,13 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
         // the record was deleted
         return Option.empty();
       }
-      String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
+      Schema recordSchema = 
incomingRecordContext.getSchemaFromBufferRecord(mergeResult);
+      String partitionPath = inferPartitionPath(incoming, existing, 
recordSchema, keyGenerator, existingRecordContext, mergeResult);
       HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
+      HoodieRecord<R> withMeta = result.prependMetaFields(recordSchema, 
writeSchemaWithMetaFields,

Review Comment:
   prior to this patch, where are we prepending the meta fields? or are we 
fetching it from the hoodie meta fields?   
   
   ```
   HoodieRecordCompatibilityInterface.wrapIntoHoodieRecordPayloadWithParams() 
-> HoodieAvroUtils.createHoodieRecordFromAvro(...) -> 
SpillableMapUtils.convertToHoodieRecordPayload(...)
   ```
   
   was this the path taken? 
   does that mean, the `result` already has meta fields populated before this 
patch? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -526,14 +524,14 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
     // define the buffered record merger.
     ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
-        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps(), true);

Review Comment:
   why setting last arg (outputsCustomPayloads) to true in all cases? not all 
cases require payload based HoodieRecord is what I was assuming. may be I am 
missing something. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -156,40 +215,53 @@ private void init(CompactionOperation operation, String 
partitionPath) {
       fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, 
newFilePath, hoodieTable.getStorage(),
           config, writeSchemaWithMetaFields, taskContextSupplier, recordType);
     } catch (IOException io) {
-      LOG.error("Error in update task at commit {}", instantTime, io);
       writeStatus.setGlobalError(io);
       throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle 
for FileId: " + fileId + " on commit "
           + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
     }
   }
 
+  @Override
+  protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>> 
newRecordsItr) {
+    // no op.
+  }
+
+  /**
+   * This is only for spark, the engine context fetched from a serialized 
hoodie table is always local,
+   * overrides it to spark specific reader context.
+   */
+  public void setReaderContext(HoodieReaderContext<T> readerContext) {
+    this.readerContext = readerContext;
+  }
+
   /**
    * Reads the file slice of a compaction operation using a file group reader,
    * by getting an iterator of the records; then writes the records to a new 
base file.
    */
   @Override
   public void doMerge() {
+    // For non-compaction operations, the merger needs to be initialized with 
the writer properties to handle cases like Merge-Into commands
+    if (operation.isEmpty()) {
+      this.readerContext.initRecordMergerForIngestion(config.getProps());
+    }
     boolean usePosition = 
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
-    Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema());
-    TypedProperties props = TypedProperties.copy(config.getProps());
+    Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema())
+        .map(internalSchema -> 
AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields, 
internalSchema,
+            
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
     long maxMemoryPerCompaction = 
IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
     props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(maxMemoryPerCompaction));
-    Stream<HoodieLogFile> logFiles = 
operation.getDeltaFileNames().stream().map(logFileName ->
+    Option<Stream<HoodieLogFile>> logFilesStreamOpt = operation.map(op -> 
op.getDeltaFileNames().stream().map(logFileName ->
         new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
-            config.getBasePath(), operation.getPartitionPath()), 
logFileName)));
+            config.getBasePath(), op.getPartitionPath()), logFileName))));
     // Initializes file group reader
-    try (HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
-        
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge)).withLogFiles(logFiles)
-        
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props)
-        
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
-        .withFileGroupUpdateCallback(cdcLogger.map(logger -> new 
CDCCallback(logger, 
readerContext))).withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan()).build())
 {
+    try (HoodieFileGroupReader<T> fileGroupReader = 
getFileGroupReader(usePosition, internalSchemaOption, props, logFilesStreamOpt, 
incomingRecordsItr)) {
       // Reads the records from the file slice
       try (ClosableIterator<HoodieRecord<T>> recordIterator = 
fileGroupReader.getClosableHoodieRecordIterator()) {
         while (recordIterator.hasNext()) {
           HoodieRecord<T> record = recordIterator.next();
+          Option<Map<String, String>> recordMetadata = 
getRecordMetadata(record, writeSchema, props);

Review Comment:
   Note to reviewer: This fixes the event time metadata tracking for FG reader 
based merge handle



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -397,6 +401,9 @@ protected HoodieMergeHandle getUpdateHandle(String 
partitionPath, String fileId,
       mergeHandle.setPartitionFields(partitionFields);
       mergeHandle.setPartitionValues(partitionValues);
     }
+    if (readerContextFactory != null && mergeHandle instanceof 
FileGroupReaderBasedMergeHandle) {

Review Comment:
   anyways, constructor for FileGroupReaderBasedMergeHandle is diff from 
HoodieWriteMergeHandle. 
   so, do you think we can add a new arg to constructor as 
Option<SerializableFunc> which can assist with setting the reader context from 
within FileGroupReaderBasedMergeHandle class only, rather than special casing 
here at L405. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,17 +101,56 @@ public BufferedRecord<T> processUpdate(String recordKey, 
BufferedRecord<T> previ
         }
         return null;
       } else {
-        T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
-        T mergedRow = mergedRecord.getRecord();
-        if (prevRow != null && prevRow != mergedRow) {
-          mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
-          readStats.incrementNumUpdates();
-        } else if (prevRow == null) {
-          mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
-          readStats.incrementNumInserts();
+        return handleNonDeletes(previousRecord, mergedRecord);
+      }
+    }
+
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {

Review Comment:
   did we add good UTs against these new code



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java:
##########
@@ -124,4 +159,390 @@ public void 
testMergeHandleRLIAndSIStatsWithUpdatesAndDeletes() throws Exception
     assertEquals(2 * numUpdates + numDeletes, 
writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size());
     validateSecondaryIndexStatsContent(writeStatus, numUpdates, numDeletes);
   }
+
+  @ParameterizedTest // TODO add CUSTOM_MERGER once deletes are handled 
properly
+  @ValueSource(strings = {"EVENT_TIME_ORDERING", "COMMIT_TIME_ORDERING", 
"CUSTOM"})

Review Comment:
   where are we testing CUSTOM_MERGER ? I only see 3 being tested here. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -1280,4 +1281,10 @@ private Set<String> getDropPartitionColNames() {
     }
     return new HashSet<>(Arrays.asList(partitionFields.get()));
   }
+
+  public ReaderContextFactory<T> getReaderContextFactoryForWrite() {
+    // question: should we just return null when context is serialized as 
null? the mismatch reader context would throw anyway.
+    return (ReaderContextFactory<T>) 
getContext().getReaderContextFactoryForWrite(metaClient, 
config.getRecordMerger().getRecordType(),

Review Comment:
   I feel the spark reader context usage in write path as a one off thing. and 
we can leave it as is. apart from this use-case, we don't forsee more use-cases 
in near future. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -526,14 +524,14 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
     // define the buffered record merger.
     ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
-        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps(), true);
     HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
     RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
     readerContext.initRecordMergerForIngestion(config.getProps());
     // Create a reader context for the existing records. In the case of 
merge-into commands, the incoming records
     // can be using an expression payload so here we rely on the table's 
configured payload class if it is required.
     ReaderContextFactory<R> readerContextFactoryForExistingRecords = 
(ReaderContextFactory<R>) hoodieTable.getContext()
-        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), 
hoodieTable.getMetaClient().getTableConfig().getProps());
+        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), 
hoodieTable.getMetaClient().getTableConfig().getProps(), true);

Review Comment:
   same q as above



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java:
##########
@@ -109,6 +109,17 @@ public void markSuccess(HoodieRecord record, 
Option<Map<String, String>> optiona
     updateStatsForSuccess(optionalRecordMetadata);
   }
 
+  /**
+   * Allows the writer to manually add record delegates to the index stats.
+   */
+  public void manuallyTrackSuccess() {

Review Comment:
   I would have probably moved 
   ```
   this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
           hoodieTable.shouldTrackSuccessRecords(), 
config.getWriteStatusFailureFraction(), hoodieTable.isMetadataTable());
   ```
   
   to a protected method in HoodieWriteHandle and override it from 
FileGroupReaderBasedMergeHandle and override this behavior just for cow merge 
cases. 
   
   Just that it does not look neat when write handle initiates w/ 
`trackSuccessRecords` as true, and later we disable it. 
   
   not really a blocker comment. will leave it to you to address if possible. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -251,6 +344,31 @@ public List<WriteStatus> close() {
     }
   }
 
+  private Option<BaseFileUpdateCallback<T>> createCallback() {
+    List<BaseFileUpdateCallback<T>> callbacks = new ArrayList<>();
+    // Handle CDC workflow.
+    if (cdcLogger.isPresent()) {
+      callbacks.add(new CDCCallback<>(cdcLogger.get(), readerContext));
+    }
+    // Indexes are not updated during compaction
+    if (operation.isEmpty()) {
+      // record index callback
+      if (this.writeStatus.isTrackingSuccessfulWrites()) {
+        writeStatus.manuallyTrackSuccess();
+        callbacks.add(new RecordLevelIndexCallback<>(writeStatus, 
newRecordLocation, partitionPath));
+      }
+      // Stream secondary index stats.
+      if (isSecondaryIndexStatsStreamingWritesEnabled) {
+        callbacks.add(new SecondaryIndexCallback<>(

Review Comment:
   I thought we had a discussion to have just 1 callback for RLI and SI. did 
you attempt and had to revert due to some issues? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -198,8 +270,10 @@ public void doMerge() {
           }
           // Writes the record
           try {
-            writeToFile(record.getKey(), record, writeSchemaWithMetaFields,
-                config.getPayloadConfig().getProps(), preserveMetadata);
+            // if the record is not being updated and is not a new insert for 
the file group, we must preserve the existing record metadata.
+            boolean shouldPreserveRecordMetadata = preserveMetadata || 
record.getOperation() == null;

Review Comment:
   can you help me understand how the record.operation is being set. 
   
   I see that we set it to HoodieOperation.UPDATE_BEFORE where we don't need to 
update the index . otherwise, its the operation that we set in driver when we 
create the HoodieRecords which is likely null right?



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java:
##########
@@ -551,7 +565,7 @@ protected List<HoodieRecord<RawTripTestPayload>> 
dedupForCopyOnWriteStorage(Hood
     int dedupParallelism = records.getNumPartitions() + additionalParallelism;
     BaseHoodieWriteClient writeClient = getHoodieWriteClient(writeConfig);
     HoodieReaderContext readerContext = writeClient.getEngineContext()
-        .getReaderContextFactoryForWrite(metaClient, 
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
+        .getReaderContextFactoryForWrite(metaClient, 
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps(), true).getContext();

Review Comment:
   all these places where we are setting to true for `outputsCustomPayloads` 
will be fixed in your next patch where we will be looking to move away from 
payload based HoodieRecord ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -39,8 +52,16 @@ public interface UpdateProcessor<T> {
   BufferedRecord<T> processUpdate(String recordKey, BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord, boolean isDelete);
 
   static <T> UpdateProcessor<T> create(HoodieReadStats readStats, 
HoodieReaderContext<T> readerContext,
-                                       boolean emitDeletes, 
Option<BaseFileUpdateCallback<T>> updateCallback) {
-    UpdateProcessor<T> handler = new StandardUpdateProcessor<>(readStats, 
readerContext, emitDeletes);
+                                       boolean emitDeletes, 
Option<BaseFileUpdateCallback<T>> updateCallback,
+                                       TypedProperties properties) {
+    UpdateProcessor<T> handler;
+    Option<String> payloadClass = 
readerContext.getPayloadClasses(properties).map(Pair::getRight);

Review Comment:
   `HoodeReaderContext.getPayloadClasses()` will not return payload class 
unless the merge strategyId is = PAYLOAD_BASED_MERGE_STRATEGY_UUID right. 
   so, once we land w/ table v9 table creation patch, and rebase this patch, we 
do not need to make any additional fixes in these lines? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java:
##########
@@ -203,7 +203,7 @@ public void processDeleteBlock(HoodieDeleteBlock 
deleteBlock) throws IOException
           // this delete-vector could be kept in the records cache(see the 
check in #fallbackToKeyBasedBuffer),
           // and these keys would be deleted no matter whether there are 
following-up inserts/updates.
           DeleteRecord deleteRecord = 
deleteRecords[commitTimeBasedRecordIndex++];
-          BufferedRecord<T> record = 
BufferedRecords.fromDeleteRecord(deleteRecord, deleteRecord.getOrderingValue());
+          BufferedRecord<T> record = 
BufferedRecords.fromDeleteRecord(deleteRecord, 
readerContext.getRecordContext());

Review Comment:
   can we update PR description to call out what changes we are doing wrt 
delete records and ordering values



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,17 +101,56 @@ public BufferedRecord<T> processUpdate(String recordKey, 
BufferedRecord<T> previ
         }
         return null;
       } else {
-        T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
-        T mergedRow = mergedRecord.getRecord();
-        if (prevRow != null && prevRow != mergedRow) {
-          mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
-          readStats.incrementNumUpdates();
-        } else if (prevRow == null) {
-          mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
-          readStats.incrementNumInserts();
+        return handleNonDeletes(previousRecord, mergedRecord);
+      }
+    }
+
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+      T mergedRow = mergedRecord.getRecord();
+      if (prevRow != null && prevRow != mergedRow) {
+        mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+        readStats.incrementNumUpdates();
+      } else if (prevRow == null) {
+        mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+        readStats.incrementNumInserts();
+      }
+      return mergedRecord.seal(readerContext.getRecordContext());
+    }
+  }
+
+  class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {

Review Comment:
   same as above. did we add good UTs against these new code



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SecondaryIndexStreamingTracker.java:
##########
@@ -226,6 +228,81 @@ static <T> void trackSecondaryIndexStats(@Nullable 
HoodieKey hoodieKey, Option<H
     });
   }
 
+  /**
+   * The utility function used by Merge Handle to generate secondary index 
stats for the corresponding record.
+   * It considers the new merged version of the record and compares it with 
the older version of the record to generate
+   * secondary index stats.
+   *
+   * @param hoodieKey                 The hoodie key
+   * @param combinedRecordOpt         New record merged with the old record
+   * @param oldRecord                 The old record
+   * @param isDelete                  Whether the record is a DELETE
+   * @param writeStatus               The Write status
+   * @param secondaryIndexDefns       Definitions for secondary index which 
need to be updated
+   */
+  static <T> void trackSecondaryIndexStats(HoodieKey hoodieKey, 
Option<BufferedRecord<T>> combinedRecordOpt, @Nullable BufferedRecord<T> 
oldRecord, boolean isDelete,

Review Comment:
   cool. have we added UTs for this



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -76,7 +72,7 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
   protected final RecordMergeMode recordMergeMode;
   protected final PartialUpdateMode partialUpdateMode;
   protected final Option<HoodieRecordMerger> recordMerger;
-  protected final Option<String> payloadClass;
+  protected final Option<Pair<String, String>> payloadClasses;

Review Comment:
   can we add java docs as to why we need a Pair of payload classes instead of 
just 1 entry



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -426,6 +428,16 @@ protected Pair<HoodieRecord, HoodieRecord> 
getFinalMergeRecords(BufferedRecord<T
       HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext, 
newerRecord, payloadClass);
       return Pair.of(oldHoodieRecord, newHoodieRecord);
     }
+
+    @Override
+    protected Option<Pair<HoodieRecord, Schema>> 
getMergedRecord(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord, 
boolean isFinalMerge) throws IOException {

Review Comment:
   did we add UTs for this at BufferedRecordMerger layer. essentially for the 
ExpressionPayloadRecordMerger



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -198,8 +270,10 @@ public void doMerge() {
           }
           // Writes the record
           try {
-            writeToFile(record.getKey(), record, writeSchemaWithMetaFields,
-                config.getPayloadConfig().getProps(), preserveMetadata);
+            // if the record is not being updated and is not a new insert for 
the file group, we must preserve the existing record metadata.
+            boolean shouldPreserveRecordMetadata = preserveMetadata || 
record.getOperation() == null;

Review Comment:
   I understand the java docs. but trying to tie the code to that 
   for compaction, its straight forward since `preserveMetadata` is true. 
   incase of cow merge case, we might get 3 diff kind of records. 
   A : records copied over from previous base file w/o any new updates. 
   B: records being updated. 
   C: new inserts which was never present in prev base file. 
   
   w/o understanding what the code is doing, I would assume we might wanna 
preserve metadata for case A only. for other two cases, we wanted to override 
the metadata. 
   
   Even within B, we could have a branching, where the updated version has 
lower ordering value and so the previous version of the record is chosen. but 
lets just keep it to A, B and C.
   
   Can you help me understand what the patch does for these 3 cases. 
   
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable2.scala:
##########
@@ -838,72 +839,72 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
     }
   }
 
-  test("Test only insert for source table in dup key without preCombineField") 
{
-    spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 
${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.defaultValue()}")
-    Seq("cow", "mor").foreach {
-      tableType => {
-        withTempDir { tmp =>
-          val tableName = generateTableName
-          spark.sql(
-            s"""
-               | create table $tableName (
-               |  id int,
-               |  name string,
-               |  price double,
-               |  ts int,
-               |  dt string
-               | ) using hudi
-               | tblproperties (
-               |  type = '$tableType',
-               |  primaryKey = 'id'
-               | )
-               | partitioned by(dt)
-               | location '${tmp.getCanonicalPath}'
-         """.stripMargin)
-          // append records to small file is use update bucket, set this conf 
use concat handler
-          spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true")
-
-          // Insert data without matched condition
-          spark.sql(
-            s"""
-               | merge into $tableName as t0
-               | using (
-               |  select 1 as id, 'a1' as name, 10.1 as price, 1000 as ts, 
'2021-03-21' as dt
-               |  union all
-               |  select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, 
'2021-03-21' as dt
-               | ) as s0
-               | on t0.id = s0.id
-               | when not matched then insert *
-         """.stripMargin
-          )
-          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-            Seq(1, "a1", 10.1, 1000, "2021-03-21"),
-            Seq(1, "a2", 10.2, 1002, "2021-03-21")
-          )
-
-          // Insert data with matched condition
-          spark.sql(
-            s"""
-               | merge into $tableName as t0
-               | using (
-               |  select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, 
'2021-03-21' as dt
-               |  union all
-               |  select 1 as id, 'a2' as name, 10.4 as price, 1004 as ts, 
'2021-03-21' as dt
-               | ) as s0
-               | on t0.id = s0.id
-               | when matched then update set *
-               | when not matched then insert *
-         """.stripMargin
-          )
-          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-            Seq(1, "a2", 10.4, 1004, "2021-03-21"),
-            Seq(1, "a2", 10.4, 1004, "2021-03-21"),
-            Seq(3, "a3", 10.3, 1003, "2021-03-21")
-          )
-        }
-      }
-    }
-  }
+//  test("Test only insert for source table in dup key without 
preCombineField") {

Review Comment:
   why disabled? do we plan to revisit this later and fix it. 
   if its not a valid use case anymore, why not remove it fully?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -492,10 +487,13 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
         // the record was deleted
         return Option.empty();
       }
-      String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
+      Schema recordSchema = 
incomingRecordContext.getSchemaFromBufferRecord(mergeResult);
+      String partitionPath = inferPartitionPath(incoming, existing, 
recordSchema, keyGenerator, existingRecordContext, mergeResult);
       HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
+      HoodieRecord<R> withMeta = result.prependMetaFields(recordSchema, 
writeSchemaWithMetaFields,

Review Comment:
   or is it that, the `result` had meta fields before this patch (AvroRecord in 
payload format), where as w/ this patch, we are converting it to 
AvroIndexedRecord and it does not contain the meta fields? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -397,6 +401,9 @@ protected HoodieMergeHandle getUpdateHandle(String 
partitionPath, String fileId,
       mergeHandle.setPartitionFields(partitionFields);
       mergeHandle.setPartitionValues(partitionValues);
     }
+    if (readerContextFactory != null && mergeHandle instanceof 
FileGroupReaderBasedMergeHandle) {

Review Comment:
   not to strong on the suggestion though. Will let you decide. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to