nsivabalan commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2279080060


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -198,8 +270,10 @@ public void doMerge() {
           }
           // Writes the record
           try {
-            writeToFile(record.getKey(), record, writeSchemaWithMetaFields,
-                config.getPayloadConfig().getProps(), preserveMetadata);
+            // if the record is not being updated and is not a new insert for 
the file group, we must preserve the existing record metadata.
+            boolean shouldPreserveRecordMetadata = preserveMetadata || 
record.getOperation() == null;

Review Comment:
   thanks



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -526,14 +524,14 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
     // define the buffered record merger.
     ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
-        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+        .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps(), true);

Review Comment:
   sg



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,17 +101,56 @@ public BufferedRecord<T> processUpdate(String recordKey, 
BufferedRecord<T> previ
         }
         return null;
       } else {
-        T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
-        T mergedRow = mergedRecord.getRecord();
-        if (prevRow != null && prevRow != mergedRow) {
-          mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
-          readStats.incrementNumUpdates();
-        } else if (prevRow == null) {
-          mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
-          readStats.incrementNumInserts();
+        return handleNonDeletes(previousRecord, mergedRecord);
+      }
+    }
+
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+      T mergedRow = mergedRecord.getRecord();
+      if (prevRow != null && prevRow != mergedRow) {
+        mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+        readStats.incrementNumUpdates();
+      } else if (prevRow == null) {
+        mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+        readStats.incrementNumInserts();
+      }
+      return mergedRecord.seal(readerContext.getRecordContext());
+    }
+  }
+
+  class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {

Review Comment:
   In the interest of time to land the patch, we can go ahead if CI is green. 
But would recommend filing one follow up jira and adding all these minor 
things. we can take it up during bug fixes period as well. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -251,6 +344,31 @@ public List<WriteStatus> close() {
     }
   }
 
+  private Option<BaseFileUpdateCallback<T>> createCallback() {
+    List<BaseFileUpdateCallback<T>> callbacks = new ArrayList<>();
+    // Handle CDC workflow.
+    if (cdcLogger.isPresent()) {
+      callbacks.add(new CDCCallback<>(cdcLogger.get(), readerContext));
+    }
+    // Indexes are not updated during compaction
+    if (operation.isEmpty()) {
+      // record index callback
+      if (this.writeStatus.isTrackingSuccessfulWrites()) {
+        writeStatus.manuallyTrackSuccess();
+        callbacks.add(new RecordLevelIndexCallback<>(writeStatus, 
newRecordLocation, partitionPath));
+      }
+      // Stream secondary index stats.
+      if (isSecondaryIndexStatsStreamingWritesEnabled) {
+        callbacks.add(new SecondaryIndexCallback<>(

Review Comment:
   I see. but it should be possible to have just 1 callback for both RLI and SI 
right. I see its simpler and maintainable w/ two diff callbacks for now. So, 
not really a blocking comment. we can revisit this in future if this causes any 
perf issues. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -397,6 +401,9 @@ protected HoodieMergeHandle getUpdateHandle(String 
partitionPath, String fileId,
       mergeHandle.setPartitionFields(partitionFields);
       mergeHandle.setPartitionValues(partitionValues);
     }
+    if (readerContextFactory != null && mergeHandle instanceof 
FileGroupReaderBasedMergeHandle) {

Review Comment:
   gotcha. thanks for the context. Spark specific requirement is what makes 
this tricky. 
   I am good then.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -76,7 +72,7 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
   protected final RecordMergeMode recordMergeMode;
   protected final PartialUpdateMode partialUpdateMode;
   protected final Option<HoodieRecordMerger> recordMerger;
-  protected final Option<String> payloadClass;
+  protected final Option<Pair<String, String>> payloadClasses;

Review Comment:
   thanks. 
   
   trying to think through how our path forward will look like where we are 
looking to completely move away from payloads. until we retire expression 
payloads from Merge Into, we have to assume all records will have some payload 
associated with it right? curious to know how are you tackling this in your 
next patch. Essentially, what changes we intend to do with 
ExpressionPayloadRecordMerger. 
   
   For eg, if someone creates a new table in v9 today w/o setting any payload 
class (after your next patch), but just setting the merge mode. and if they 
issue MIT writes, what will happen w/ ExpressionPayloadRecordMerger. Currently, 
it converts both base record and incoming record to HoodieAvroRecord which is 
in payload format before it can actually merge them. 
   
   may be you thought through this already or have fixed this in your draft 
patch. was just curious to understand. we can jam f2f to align if need be. 
   
   note: this comment is not a blocking comment to land this patch. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -156,40 +215,53 @@ private void init(CompactionOperation operation, String 
partitionPath) {
       fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, 
newFilePath, hoodieTable.getStorage(),
           config, writeSchemaWithMetaFields, taskContextSupplier, recordType);
     } catch (IOException io) {
-      LOG.error("Error in update task at commit {}", instantTime, io);
       writeStatus.setGlobalError(io);
       throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle 
for FileId: " + fileId + " on commit "
           + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
     }
   }
 
+  @Override
+  protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>> 
newRecordsItr) {
+    // no op.
+  }
+
+  /**
+   * This is only for spark, the engine context fetched from a serialized 
hoodie table is always local,
+   * overrides it to spark specific reader context.
+   */
+  public void setReaderContext(HoodieReaderContext<T> readerContext) {
+    this.readerContext = readerContext;
+  }
+
   /**
    * Reads the file slice of a compaction operation using a file group reader,
    * by getting an iterator of the records; then writes the records to a new 
base file.
    */
   @Override
   public void doMerge() {
+    // For non-compaction operations, the merger needs to be initialized with 
the writer properties to handle cases like Merge-Into commands
+    if (operation.isEmpty()) {
+      this.readerContext.initRecordMergerForIngestion(config.getProps());
+    }
     boolean usePosition = 
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
-    Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema());
-    TypedProperties props = TypedProperties.copy(config.getProps());
+    Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema())
+        .map(internalSchema -> 
AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields, 
internalSchema,
+            
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
     long maxMemoryPerCompaction = 
IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
     props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
String.valueOf(maxMemoryPerCompaction));
-    Stream<HoodieLogFile> logFiles = 
operation.getDeltaFileNames().stream().map(logFileName ->
+    Option<Stream<HoodieLogFile>> logFilesStreamOpt = operation.map(op -> 
op.getDeltaFileNames().stream().map(logFileName ->
         new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
-            config.getBasePath(), operation.getPartitionPath()), 
logFileName)));
+            config.getBasePath(), op.getPartitionPath()), logFileName))));
     // Initializes file group reader
-    try (HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
-        
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge)).withLogFiles(logFiles)
-        
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props)
-        
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
-        .withFileGroupUpdateCallback(cdcLogger.map(logger -> new 
CDCCallback(logger, 
readerContext))).withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan()).build())
 {
+    try (HoodieFileGroupReader<T> fileGroupReader = 
getFileGroupReader(usePosition, internalSchemaOption, props, logFilesStreamOpt, 
incomingRecordsItr)) {
       // Reads the records from the file slice
       try (ClosableIterator<HoodieRecord<T>> recordIterator = 
fileGroupReader.getClosableHoodieRecordIterator()) {
         while (recordIterator.hasNext()) {
           HoodieRecord<T> record = recordIterator.next();
+          Option<Map<String, String>> recordMetadata = 
getRecordMetadata(record, writeSchema, props);

Review Comment:
   oh, we should avoid the event time metadata tracking for compaction commits 
right. How are we ensuring that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to