nsivabalan commented on code in PR #13495:
URL: https://github.com/apache/hudi/pull/13495#discussion_r2216796077


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java:
##########
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.serialization.DefaultSerializer;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Handle to merge incoming records to those in storage row-by-row.
+ * <p>
+ * Simplified Logic:
+ * For every existing record
+ *     Check if there is a new record coming in. If yes, merge two records and 
write to file
+ *     else write the record as is
+ * For all pending records from incoming batch, write to file.
+ *
+ * Illustration with simple data.
+ * Incoming data:
+ *     rec1_2, rec4_2, rec5_1, rec6_1
+ * Existing data:
+ *     rec1_1, rec2_1, rec3_1, rec4_1
+ *
+ * For every existing record, merge w/ incoming if required and write to 
storage.
+ *    => rec1_1 and rec1_2 is merged to write rec1_2 to storage
+ *    => rec2_1 is written as is
+ *    => rec3_1 is written as is
+ *    => rec4_2 and rec4_1 is merged to write rec4_2 to storage
+ * Write all pending records from incoming set to storage
+ *    => rec5_1 and rec6_1
+ *
+ * Final snapshot in storage
+ * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
+ *
+ * </p>
+ */
+@SuppressWarnings("Duplicates")
+@NotThreadSafe
+public class HoodieWriteMergeHandle<T, I, K, O> extends 
HoodieAbstractMergeHandle<T, I, K, O> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieWriteMergeHandle.class);
+
+  protected Map<String, HoodieRecord<T>> keyToNewRecords;
+  protected Set<String> writtenRecordKeys;
+  protected HoodieFileWriter fileWriter;
+
+  protected long recordsWritten = 0;
+  protected long recordsDeleted = 0;
+  protected long updatedRecordsWritten = 0;
+  protected long insertRecordsWritten = 0;
+
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                                Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
+                                TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
+    this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier,
+        getLatestBaseFile(hoodieTable, partitionPath, fileId), 
keyGeneratorOpt);
+  }
+
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                                Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
+                                TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, 
taskContextSupplier, baseFile, keyGeneratorOpt, false);
+    populateIncomingRecordsMap(recordItr);
+    initMarkerFileAndFileWriter(fileId, partitionPath);
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                                Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
+                                HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier,
+                                Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, 
taskContextSupplier, dataFileToBeMerged, keyGeneratorOpt,
+        // preserveMetadata is disabled by default for MDT but enabled 
otherwise
+        !HoodieTableMetadata.isMetadataTable(config.getBasePath()));
+    this.keyToNewRecords = keyToNewRecords;
+    initMarkerFileAndFileWriter(fileId, this.partitionPath);
+  }
+
+  /**
+   * Used by `FileGroupReaderBasedMergeHandle`.
+   *
+   * @param config              Hudi write config
+   * @param instantTime         Instant time to use
+   * @param partitionPath       Partition path
+   * @param fileId              File group ID for the merge handle to operate 
on
+   * @param hoodieTable         {@link HoodieTable} instance
+   * @param taskContextSupplier Task context supplier
+   */
+  public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
+                                String fileId, HoodieTable<T, I, K, O> 
hoodieTable, TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
+  }
+
+  @Override
+  public void doMerge() throws IOException {
+    HoodieMergeHelper.newInstance().runMerge(hoodieTable, this);
+  }
+
+  /**
+   * Initialize marker file and file writer.
+   */
+  private void initMarkerFileAndFileWriter(String fileId, String 
partitionPath) {
+    this.writtenRecordKeys = new HashSet<>();
+    try {
+      // Create Marker file,
+      // uses name of `newFilePath` instead of `newFileName`
+      // in case the subclass may roll over the file handle name.
+      createMarkerFile(partitionPath, newFilePath.getName());
+      // Create the writer for writing the new version file
+      fileWriter = HoodieFileWriterFactory.getFileWriter(
+          instantTime, newFilePath, hoodieTable.getStorage(),
+          config, writeSchemaWithMetaFields, taskContextSupplier, 
getRecordType());
+    } catch (IOException io) {
+      LOG.error("Error in update task at commit {}", instantTime, io);
+      writeStatus.setGlobalError(io);
+      throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle 
for FileId: " + fileId + " on commit "
+          + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
+    }
+  }
+
+  protected HoodieRecord.HoodieRecordType getRecordType() {
+    return recordMerger.getRecordType();
+  }
+
+  /**
+   * Initialize a spillable map for incoming records.
+   */
+  protected void initIncomingRecordsMap() {
+    try {
+      // Load the new records in a map
+      long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
+      LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge);
+      this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator<>(), new 
HoodieRecordSizeEstimator<>(writeSchema),
+          config.getCommonConfig().getSpillableDiskMapType(),
+          new DefaultSerializer<>(),
+          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled(),
+          getClass().getSimpleName());
+    } catch (IOException io) {
+      throw new HoodieIOException("Cannot instantiate an 
ExternalSpillableMap", io);
+    }
+  }
+
+  /**
+   * Whether there is need to update the record location.
+   */
+  boolean needsUpdateLocation() {
+    return true;
+  }
+
+  /**
+   * Load the new incoming records in a map and return partitionPath.
+   */
+  protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>> 
newRecordsItr) {
+    initIncomingRecordsMap();
+    while (newRecordsItr.hasNext()) {
+      HoodieRecord<T> record = newRecordsItr.next();
+      // update the new location of the record, so we know where to find it 
next
+      if (needsUpdateLocation()) {
+        record.unseal();
+        record.setNewLocation(newRecordLocation);
+        record.seal();
+      }
+      // NOTE: Once Records are added to map (spillable-map), DO NOT change it 
as they won't persist
+      keyToNewRecords.put(record.getRecordKey(), record);
+    }
+    if (keyToNewRecords instanceof ExternalSpillableMap) {
+      ExternalSpillableMap<String, HoodieRecord<T>> spillableMap = 
(ExternalSpillableMap<String, HoodieRecord<T>>) keyToNewRecords;
+      LOG.info("Number of entries in MemoryBasedMap => {}, Total size in bytes 
of MemoryBasedMap => {}, "
+          + "Number of entries in BitCaskDiskMap => {}, Size of file spilled 
to disk => {}",
+          spillableMap.getInMemoryMapNumEntries(), 
spillableMap.getCurrentInMemoryMapSize(), 
spillableMap.getDiskBasedMapNumEntries(), 
spillableMap.getSizeOfFileOnDiskInBytes());
+    }
+  }
+
+  public boolean isEmptyNewRecords() {
+    return keyToNewRecords.isEmpty();
+  }
+
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema 
writerSchema) throws IOException {
+    boolean isDelete = false;
+    if (combineRecordOpt.isPresent()) {
+      if (oldRecord.getData() != combineRecordOpt.get().getData()) {
+        // the incoming record is chosen
+        isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+      } else {
+        // the incoming record is dropped
+        return false;
+      }
+      updatedRecordsWritten++;
+    }
+    return writeRecord(newRecord, oldRecord, combineRecordOpt, writerSchema, 
config.getPayloadConfig().getProps(), isDelete);
+  }
+
+  protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
+    Schema schema = getNewSchema();
+    // just skip the ignored record
+    if (newRecord.shouldIgnore(schema, config.getProps())) {
+      return;
+    }
+    writeInsertRecord(newRecord, schema, config.getProps());
+  }
+
+  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, 
Properties prop)
+      throws IOException {
+    if (writeRecord(newRecord, null, Option.of(newRecord), schema, prop, 
HoodieOperation.isDelete(newRecord.getOperation()))) {
+      insertRecordsWritten++;
+    }
+  }
+
+  protected boolean writeRecord(HoodieRecord<T> newRecord, 
Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws 
IOException {
+    return writeRecord(newRecord, null, combineRecord, schema, prop, false);
+  }
+
+  /**
+   * The function takes the different versions of the record - old record, new 
incoming record and combined record
+   * created by merging the old record with the new incoming record. It 
decides whether the combined record needs to be
+   * written to the file and writes the record accordingly.
+   *
+   * @param newRecord     The new incoming record
+   * @param oldRecord     The value of old record
+   * @param combineRecord Record created by merging the old record with the 
new incoming record
+   * @param schema        Record schema
+   * @param prop          Properties
+   * @param isDelete      Whether the new record is a delete record
+   *
+   * @return true if the record was written successfully
+   * @throws IOException
+   */
+  private boolean writeRecord(HoodieRecord<T> newRecord,
+                              @Nullable HoodieRecord<T> oldRecord,
+                              Option<HoodieRecord> combineRecord,
+                              Schema schema,
+                              Properties prop,
+                              boolean isDelete) {
+    Option recordMetadata = newRecord.getMetadata();
+    if (!partitionPath.equals(newRecord.getPartitionPath())) {
+      HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
partition path, record partition: "
+          + newRecord.getPartitionPath() + " but trying to insert into 
partition: " + partitionPath);
+      writeStatus.markFailure(newRecord, failureEx, recordMetadata);
+      return false;
+    }
+    try {
+      if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, 
config.getProps()) && !isDelete) {
+        // Last-minute check.
+        boolean decision = recordMerger.shouldFlush(combineRecord.get(), 
schema, config.getProps());
+
+        if (decision) {
+          // CASE (1): Flush the merged record.
+          HoodieKey hoodieKey = newRecord.getKey();
+          if (isSecondaryIndexStatsStreamingWritesEnabled) {
+            SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, 
combineRecord, oldRecord, false, writeStatus,
+                writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
+          }
+          writeToFile(hoodieKey, combineRecord.get(), schema, prop, 
preserveMetadata);
+          recordsWritten++;
+        } else {
+          // CASE (2): A delete operation.
+          if (isSecondaryIndexStatsStreamingWritesEnabled) {
+            
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(), 
combineRecord, oldRecord, true, writeStatus,
+                writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
+          }
+          recordsDeleted++;
+        }
+      } else {
+        if (isSecondaryIndexStatsStreamingWritesEnabled) {
+          
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(), 
combineRecord, oldRecord, true, writeStatus,
+              writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
+        }
+        recordsDeleted++;
+        // Clear the new location as the record was deleted
+        newRecord.unseal();
+        newRecord.clearNewLocation();
+        newRecord.seal();
+      }
+      writeStatus.markSuccess(newRecord, recordMetadata);
+      // deflate record payload after recording success. This will help users 
access payload as a
+      // part of marking
+      // record successful.
+      newRecord.deflate();
+      return true;
+    } catch (Exception e) {
+      LOG.error("Error writing record {}", newRecord, e);
+      writeStatus.markFailure(newRecord, e, recordMetadata);
+    }
+    return false;
+  }
+
+  /**
+   * Go through an old record. Here if we detect a newer version shows up, we 
write the new one to the file.
+   */
+  public void write(HoodieRecord<T> oldRecord) {

Review Comment:
   lets add documentation that incase of COW merges, this public method will be 
used. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to