nsivabalan commented on code in PR #13495:
URL: https://github.com/apache/hudi/pull/13495#discussion_r2216330487


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -7,584 +7,63 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.serialization.DefaultSerializer;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieCorruptedDataException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileWriter;
-import org.apache.hudi.io.storage.HoodieFileWriterFactory;
-import org.apache.hudi.io.storage.HoodieIOFactory;
-import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Handle to merge incoming records to those in storage.
- * <p>
- * Simplified Logic:
- * For every existing record
- *     Check if there is a new record coming in. If yes, merge two records and 
write to file
- *     else write the record as is
- * For all pending records from incoming batch, write to file.
- *
- * <p>
- * Illustration with simple data.
- * Incoming data:
- *     rec1_2, rec4_2, rec5_1, rec6_1
- * Existing data:
- *     rec1_1, rec2_1, rec3_1, rec4_1
- * <p>
- * For every existing record, merge w/ incoming if required and write to 
storage.
- *    => rec1_1 and rec1_2 is merged to write rec1_2 to storage
- *    => rec2_1 is written as is
- *    => rec3_1 is written as is
- *    => rec4_2 and rec4_1 is merged to write rec4_2 to storage
- * Write all pending records from incoming set to storage
- *    => rec5_1 and rec6_1
- * <p>
- * Final snapshot in storage
- * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
- */
-@SuppressWarnings("Duplicates")
-@NotThreadSafe
-public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, 
O> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeHandle.class);
-
-  protected Map<String, HoodieRecord<T>> keyToNewRecords;
-  protected Set<String> writtenRecordKeys;
-  protected HoodieFileWriter fileWriter;
-
-  protected StoragePath newFilePath;
-  protected StoragePath oldFilePath;
-  protected long recordsWritten = 0;
-  protected long recordsDeleted = 0;
-  protected long updatedRecordsWritten = 0;
-  protected long insertRecordsWritten = 0;
-  protected Option<BaseKeyGenerator> keyGeneratorOpt;
-  protected HoodieBaseFile baseFileToMerge;
-
-  protected Option<String[]> partitionFields = Option.empty();
-  protected Object[] partitionValues = new Object[0];
 
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
-    this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier,
-        getLatestBaseFile(hoodieTable, partitionPath, fileId), 
keyGeneratorOpt);
-  }
-
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
-                           TaskContextSupplier taskContextSupplier, 
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, false);
-    init(recordItr);
-    init(fileId, partitionPath, baseFile);
-    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
-  }
-
-  /**
-   * Called by compactor code path.
-   */
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
-                           Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
-                           HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
-    this.keyToNewRecords = keyToNewRecords;
-    init(fileId, this.partitionPath, dataFileToBeMerged);
-    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
-  }
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieMergeHandle<T, I, K, O> {
 
   /**
-   * Used by `HoodieSparkFileGroupReaderBasedMergeHandle`.
-   *
-   * @param config              Hudi write config
-   * @param instantTime         Instant time to use
-   * @param partitionPath       Partition path
-   * @param fileId              File group ID for the merge handle to operate 
on
-   * @param hoodieTable         {@link HoodieTable} instance
-   * @param taskContextSupplier Task context supplier
-   */
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
-                           String fileId, HoodieTable<T, I, K, O> hoodieTable, 
TaskContextSupplier taskContextSupplier) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier, true);
-  }
-
-  private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> 
keyGeneratorOpt, boolean populateMetaFields) {
-    ValidationUtils.checkArgument(populateMetaFields == 
!keyGeneratorOpt.isPresent());
-    this.keyGeneratorOpt = keyGeneratorOpt;
-  }
-
-  public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> 
hoodieTable, String partitionPath, String fileId) {
-    Option<HoodieBaseFile> baseFileOp = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
-    if (!baseFileOp.isPresent()) {
-      throw new NoSuchElementException(String.format("FileID %s of partition 
path %s does not exist.", fileId, partitionPath));
-    }
-    return baseFileOp.get();
-  }
-
-  /**
-   * Extract old file path, initialize StorageWriter and WriteStatus.
-   */
-  private void init(String fileId, String partitionPath, HoodieBaseFile 
baseFileToMerge) {
-    LOG.info("partitionPath:{}, fileId to be merged:{}", partitionPath, 
fileId);
-    this.baseFileToMerge = baseFileToMerge;
-    this.writtenRecordKeys = new HashSet<>();
-    writeStatus.setStat(new HoodieWriteStat());
-    try {
-      String latestValidFilePath = baseFileToMerge.getFileName();
-      writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
-      // At the moment, we only support SI for overwrite with latest payload. 
So, we don't need to embed entire file slice here.
-      // HUDI-8518 will be taken up to fix it for any payload during which we 
might require entire file slice to be set here.
-      // Already AppendHandle adds all logs file from current file slice to 
HoodieDeltaWriteStat.
-      writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
-
-      HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(storage, instantTime,
-          new StoragePath(config.getBasePath()),
-          FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
-          hoodieTable.getPartitionMetafileFormat());
-      partitionMetadata.trySave();
-
-      String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken, 
fileId, hoodieTable.getBaseFileExtension());
-      makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
-
-      LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath, 
newFilePath);
-      // file name is same for all records, in this bunch
-      writeStatus.setFileId(fileId);
-      writeStatus.setPartitionPath(partitionPath);
-      writeStatus.getStat().setPartitionPath(partitionPath);
-      writeStatus.getStat().setFileId(fileId);
-      setWriteStatusPath();
-
-      // Create Marker file,
-      // uses name of `newFilePath` instead of `newFileName`
-      // in case the sub-class may roll over the file handle name.
-      createMarkerFile(partitionPath, newFilePath.getName());
-
-      // Create the writer for writing the new version file
-      fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, 
newFilePath, hoodieTable.getStorage(),
-          config, writeSchemaWithMetaFields, taskContextSupplier, 
recordMerger.getRecordType());
-    } catch (IOException io) {
-      LOG.error("Error in update task at commit {}", instantTime, io);
-      writeStatus.setGlobalError(io);
-      throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle 
for FileId: " + fileId + " on commit "
-          + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
-    }
-  }
-
-  protected void setWriteStatusPath() {
-    writeStatus.getStat().setPath(new StoragePath(config.getBasePath()), 
newFilePath);
-  }
-
-  protected void makeOldAndNewFilePaths(String partitionPath, String 
oldFileName, String newFileName) {
-    oldFilePath = makeNewFilePath(partitionPath, oldFileName);
-    newFilePath = makeNewFilePath(partitionPath, newFileName);
-  }
-
-  /**
-   * Initialize a spillable map for incoming records.
-   */
-  protected void initializeIncomingRecordsMap() {
-    try {
-      // Load the new records in a map
-      long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
-      LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge);
-      this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
-          new DefaultSizeEstimator<>(), new 
HoodieRecordSizeEstimator<>(writeSchema),
-          config.getCommonConfig().getSpillableDiskMapType(),
-          new DefaultSerializer<>(),
-          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled(),
-          getClass().getSimpleName());
-    } catch (IOException io) {
-      throw new HoodieIOException("Cannot instantiate an 
ExternalSpillableMap", io);
-    }
-  }
-
-  /**
-   * Whether there is need to update the record location.
-   */
-  boolean needsUpdateLocation() {
-    return true;
-  }
-
-  /**
-   * Load the new incoming records in a map and return partitionPath.
-   */
-  protected void init(Iterator<HoodieRecord<T>> newRecordsItr) {
-    initializeIncomingRecordsMap();
-    while (newRecordsItr.hasNext()) {
-      HoodieRecord<T> record = newRecordsItr.next();
-      // update the new location of the record, so we know where to find it 
next
-      if (needsUpdateLocation()) {
-        record.unseal();
-        record.setNewLocation(newRecordLocation);
-        record.seal();
-      }
-      // NOTE: Once Records are added to map (spillable-map), DO NOT change it 
as they won't persist
-      keyToNewRecords.put(record.getRecordKey(), record);
-    }
-    if (keyToNewRecords instanceof ExternalSpillableMap) {
-      ExternalSpillableMap<String, HoodieRecord<T>> spillableMap = 
(ExternalSpillableMap<String, HoodieRecord<T>>) keyToNewRecords;
-      LOG.info("Number of entries in MemoryBasedMap => {}, Total size in bytes 
of MemoryBasedMap => {}, "
-          + "Number of entries in BitCaskDiskMap => {}, Size of file spilled 
to disk => {}",
-          spillableMap.getInMemoryMapNumEntries(), 
spillableMap.getCurrentInMemoryMapSize(), 
spillableMap.getDiskBasedMapNumEntries(), 
spillableMap.getSizeOfFileOnDiskInBytes());
-    }
-  }
-
-  public boolean isEmptyNewRecords() {
-    return keyToNewRecords.isEmpty();
-  }
-
-  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema 
writerSchema) throws IOException {
-    boolean isDelete = false;
-    if (combineRecordOpt.isPresent()) {
-      if (oldRecord.getData() != combineRecordOpt.get().getData()) {
-        // the incoming record is chosen
-        isDelete = HoodieOperation.isDelete(newRecord.getOperation());
-      } else {
-        // the incoming record is dropped
-        return false;
-      }
-      updatedRecordsWritten++;
-    }
-    return writeRecord(newRecord, oldRecord, combineRecordOpt, writerSchema, 
config.getPayloadConfig().getProps(), isDelete);
-  }
-
-  protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = getNewSchema();
-    // just skip the ignored record
-    if (newRecord.shouldIgnore(schema, config.getProps())) {
-      return;
-    }
-    writeInsertRecord(newRecord, schema, config.getProps());
-  }
-
-  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, 
Properties prop) {
-    if (writeRecord(newRecord, null, Option.of(newRecord), schema, prop, 
HoodieOperation.isDelete(newRecord.getOperation()))) {
-      insertRecordsWritten++;
-    }
-  }
-
-  protected boolean writeRecord(HoodieRecord<T> newRecord, 
Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws 
IOException {
-    return writeRecord(newRecord, null, combineRecord, schema, prop, false);
-  }
-
-  /**
-   * The function takes the different versions of the record - old record, new 
incoming record and combined record
-   * created by merging the old record with the new incoming record. It 
decides whether the combined record needs to be
-   * written to the file and writes the record accordingly.
-   *
-   * @param newRecord     The new incoming record
-   * @param oldRecord     The value of old record
-   * @param combineRecord Record created by merging the old record with the 
new incoming record
-   * @param schema        Record schema
-   * @param prop          Properties
-   * @param isDelete      Whether the new record is a delete record
-   *
-   * @return true if the record was written successfully
+   * Called to read the base file, the incoming records, merge the records and 
write the final base file.
    * @throws IOException
    */
-  private boolean writeRecord(HoodieRecord<T> newRecord,
-                              @Nullable HoodieRecord<T> oldRecord,
-                              Option<HoodieRecord> combineRecord,
-                              Schema schema,
-                              Properties prop,
-                              boolean isDelete) {
-    Option recordMetadata = newRecord.getMetadata();
-    if (!partitionPath.equals(newRecord.getPartitionPath())) {
-      HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
partition path, record partition: "
-          + newRecord.getPartitionPath() + " but trying to insert into 
partition: " + partitionPath);
-      writeStatus.markFailure(newRecord, failureEx, recordMetadata);
-      return false;
-    }
-    try {
-      if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, 
config.getProps()) && !isDelete) {
-        // Last-minute check.
-        boolean decision = recordMerger.shouldFlush(combineRecord.get(), 
schema, config.getProps());
-
-        if (decision) {
-          // CASE (1): Flush the merged record.
-          HoodieKey hoodieKey = newRecord.getKey();
-          if (isSecondaryIndexStatsStreamingWritesEnabled) {
-            SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey, 
combineRecord, oldRecord, false, writeStatus,
-                writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
-          }
-          writeToFile(hoodieKey, combineRecord.get(), schema, prop, 
preserveMetadata);
-          recordsWritten++;
-        } else {
-          // CASE (2): A delete operation.
-          if (isSecondaryIndexStatsStreamingWritesEnabled) {
-            
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(), 
combineRecord, oldRecord, true, writeStatus,
-                writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
-          }
-          recordsDeleted++;
-        }
-      } else {
-        if (isSecondaryIndexStatsStreamingWritesEnabled) {
-          
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(), 
combineRecord, oldRecord, true, writeStatus,
-              writeSchemaWithMetaFields, this::getNewSchema, 
secondaryIndexDefns, keyGeneratorOpt, config);
-        }
-        recordsDeleted++;
-        // Clear the new location as the record was deleted
-        newRecord.unseal();
-        newRecord.clearNewLocation();
-        newRecord.seal();
-      }
-      writeStatus.markSuccess(newRecord, recordMetadata);
-      // deflate record payload after recording success. This will help users 
access payload as a
-      // part of marking
-      // record successful.
-      newRecord.deflate();
-      return true;
-    } catch (Exception e) {
-      LOG.error("Error writing record {}", newRecord, e);
-      writeStatus.markFailure(newRecord, e, recordMetadata);
-    }
-    return false;
-  }
-
-  /**
-   * Go through an old record. Here if we detect a newer version shows up, we 
write the new one to the file.
-   */
-  public void write(HoodieRecord<T> oldRecord) {
-    // Use schema with metadata files no matter whether 
'hoodie.populate.meta.fields' is enabled
-    // to avoid unnecessary rewrite. Even with metadata table(whereas the 
option 'hoodie.populate.meta.fields' is configured as false),
-    // the record is deserialized with schema including metadata fields,
-    // see HoodieMergeHelper#runMerge for more details.
-    Schema oldSchema = writeSchemaWithMetaFields;
-    Schema newSchema = getNewSchema();
-    boolean copyOldRecord = true;
-    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
-    TypedProperties props = config.getPayloadConfig().getProps();
-    if (keyToNewRecords.containsKey(key)) {
-      // If we have duplicate records that we are updating, then the hoodie 
record will be deflated after
-      // writing the first record. So make a copy of the record to be merged
-      HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
-      try {
-        Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
-        Schema combineRecordSchema = 
mergeResult.map(Pair::getRight).orElse(null);
-        Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
-        if (combinedRecord.isPresent() && 
combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
-          // If it is an IGNORE_RECORD, just copy the old record, and do not 
update the new record.
-          copyOldRecord = true;
-        } else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord, 
combineRecordSchema)) {
-          /*
-           * ONLY WHEN 1) we have an update for this key AND 2) We are able to 
successfully
-           * write the combined new value
-           *
-           * We no longer need to copy the old record over.
-           */
-          copyOldRecord = false;
-        }
-        writtenRecordKeys.add(key);
-      } catch (Exception e) {
-        throw new HoodieUpsertException("Failed to combine/merge new record 
with old value in storage, for new record {"
-            + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", 
e);
-      }
-    }
-
-    if (copyOldRecord) {
-      try {
-        // NOTE: We're enforcing preservation of the record metadata to keep 
existing semantic
-        writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, 
props, true);
-      } catch (IOException | RuntimeException e) {
-        String errMsg = String.format("Failed to merge old record into new 
file for key %s from old file %s to new file %s with writerSchema %s",
-            key, getOldFilePath(), newFilePath, 
writeSchemaWithMetaFields.toString(true));
-        LOG.debug("Old record is {}", oldRecord);
-        throw new HoodieUpsertException(errMsg, e);
-      }
-      recordsWritten++;
-    }
-  }
-
-  protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema 
schema, Properties prop, boolean shouldPreserveRecordMetadata) throws 
IOException {
-    if (shouldPreserveRecordMetadata) {
-      // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly 
point to the
-      //       file holding this record even in cases when overall metadata is 
preserved
-      HoodieRecord populatedRecord = record.updateMetaField(schema, 
HoodieRecord.FILENAME_META_FIELD_ORD, newFilePath.getName());
-      fileWriter.write(key.getRecordKey(), populatedRecord, 
writeSchemaWithMetaFields);
-    } else {
-      // rewrite the record to include metadata fields in schema, and the 
values will be set later.
-      record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new 
MetadataValues(), config.getProps());
-      fileWriter.writeWithMetadata(key, record, writeSchemaWithMetaFields);
-    }
-  }
-
-  protected void writeIncomingRecords() throws IOException {
-    // write out any pending records (this can happen when inserts are turned 
into updates)
-    Iterator<HoodieRecord<T>> newRecordsItr;
-    if (keyToNewRecords instanceof ExternalSpillableMap) {
-      newRecordsItr = ((ExternalSpillableMap) keyToNewRecords).iterator(key -> 
!writtenRecordKeys.contains(key));
-    } else {
-      newRecordsItr = keyToNewRecords.entrySet().stream()
-          .filter(e -> !writtenRecordKeys.contains(e.getKey()))
-          .map(Map.Entry::getValue)
-          .iterator();
-    }
-    while (newRecordsItr.hasNext()) {
-      HoodieRecord<T> hoodieRecord = newRecordsItr.next();
-      writeInsertRecord(hoodieRecord);
-    }
-  }
-
-  private Schema getNewSchema() {
-    return preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
-  }
-
-  @Override
-  public List<WriteStatus> close() {
-    try {
-      if (isClosed()) {
-        // Handle has already been closed
-        return Collections.emptyList();
-      }
-
-      markClosed();
-      writeIncomingRecords();
-
-      if (keyToNewRecords instanceof Closeable) {
-        ((Closeable) keyToNewRecords).close();
-      }
-
-      keyToNewRecords = null;
-      writtenRecordKeys = null;
-
-      fileWriter.close();
-      fileWriter = null;
-
-      long fileSizeInBytes = storage.getPathInfo(newFilePath).getLength();
-      HoodieWriteStat stat = writeStatus.getStat();
-
-      stat.setTotalWriteBytes(fileSizeInBytes);
-      stat.setFileSizeInBytes(fileSizeInBytes);
-      stat.setNumWrites(recordsWritten);
-      stat.setNumDeletes(recordsDeleted);
-      stat.setNumUpdateWrites(updatedRecordsWritten);
-      stat.setNumInserts(insertRecordsWritten);
-      stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
-      RuntimeStats runtimeStats = new RuntimeStats();
-      runtimeStats.setTotalUpsertTime(timer.endTimer());
-      stat.setRuntimeStats(runtimeStats);
-
-      performMergeDataValidationCheck(writeStatus);
-
-      LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.", 
stat.getPartitionPath(),
-          stat.getFileId(), runtimeStats.getTotalUpsertTime());
-
-      return Collections.singletonList(writeStatus);
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to close UpdateHandle", e);
-    }
-  }
-
-  public void performMergeDataValidationCheck(WriteStatus writeStatus) {
-    if (!config.isMergeDataValidationCheckEnabled() || baseFileToMerge == 
null) {
-      return;
-    }
-
-    long oldNumWrites = 0;
-    try (HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
-        .getReaderFactory(this.recordMerger.getRecordType())
-        .getFileReader(config, oldFilePath)) {
-      oldNumWrites = reader.getTotalRecords();
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to check for merge data 
validation", e);
-    }
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  void doMerge() throws IOException;

Review Comment:
   I see based on 
   
https://github.com/apache/hudi/blob/caa7c6191982c5ae667fa3f28278522c0e4c4897/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java#L133
   and 
   
https://github.com/apache/hudi/blob/caa7c6191982c5ae667fa3f28278522c0e4c4897/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java#L54
 
   
   write(oldRecord) also needs to be part of the interface. 
   we could give it a default empty impl if need be, but it has to be part of 
the interface for any public apis from merge handles. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to