vinothchandar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r487541865



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -95,6 +93,13 @@ public HoodieWriteMetadata execute(JavaRDD<HoodieRecord<T>> 
inputRecordsRDD) {
       saveWorkloadProfileMetadataToInflight(profile, instantTime);
     }
 
+    JavaRDD<WriteStatus> writeStatusRDD = processInputRecords(inputRecordsRDD, 
profile);
+    HoodieWriteMetadata result = new HoodieWriteMetadata();
+    updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+    return result;
+  }
+
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> 
inputRecordsRDD, WorkloadProfile profile) {

Review comment:
       rename: writeInputRecords() 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata 
insertPrepped(JavaSparkContext jsc, String i
   public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, 
String instantTime,
       JavaRDD<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner> 
bulkInsertPartitioner);
 
+  /**
+   * Logically delete all existing records and Insert a batch of new records 
into Hoodie table at the supplied instantTime.
+   */
+  public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, 
String instantTime,

Review comment:
       I think at the HoodieTable level, the API has to be about replacing file 
groups and not insertOverwrite (which can be limited to the WriteClient level). 
This way clustering can also use the same method, to build on top. 

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -378,9 +398,9 @@ public static void createCompactionAuxiliaryMetadata(String 
basePath, HoodieInst
         new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + 
"/" + instant.getFileName());
     FileSystem fs = FSUtils.getFs(basePath, configuration);
     try (FSDataOutputStream os = fs.create(commitFile, true)) {
-      HoodieCompactionPlan workload = new HoodieCompactionPlan();
+      HoodieCompactionPlan workload = 
HoodieCompactionPlan.newBuilder().setVersion(1).build();

Review comment:
       why is this change necessayr? 

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##########
@@ -102,7 +104,9 @@ public void commit(WriterCommitMessage[] messages) {
             .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> 
m2.getStat())).collect(Collectors.toList());
 
     try {
-      writeClient.commitStats(instantTime, writeStatList, Option.empty());
+      writeClient.commitStats(instantTime, writeStatList, Option.empty(),

Review comment:
       better approach for these situations generally is to introduce a 
`commitStats(.)` that does not take the last argument and deal with it 
internally inside WriteClient. 
   
   This way the code will remain more readable, without having to reference 
replace stats in bulk_insert, which have nothing to do with each other. hope 
that makes sense

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,57 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, 
HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);
+  }
+
+  /**
+   * Complete changes performed at the given instantTime marker with specified 
action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, String commitActionType) {
+    return commit(instantTime, writeStatuses, Option.empty(), 
commitActionType);
   }
 
   /**
+   *
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) {
-    List<HoodieWriteStat> stats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, stats, extraMetadata);
+                        Option<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, extraMetadata, actionType);
   }
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata) {
-    LOG.info("Committing " + instantTime);
+  /**
+   * Complete changes performed at the given instantTime marker with specified 
action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,

Review comment:
       note to self: lets see if we can simply the commit() overloaded methods. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,57 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, 
HoodieIndex index, Hoo
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses) {
-    return commit(instantTime, writeStatuses, Option.empty());
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, Option.empty(), actionType);
+  }
+
+  /**
+   * Complete changes performed at the given instantTime marker with specified 
action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, String commitActionType) {
+    return commit(instantTime, writeStatuses, Option.empty(), 
commitActionType);
   }
 
   /**
+   *
    * Commit changes performed at the given instantTime marker.
    */
   public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) {
-    List<HoodieWriteStat> stats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, stats, extraMetadata);
+                        Option<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = createMetaClient(false);
+    String actionType = metaClient.getCommitActionType();
+    return commit(instantTime, writeStatuses, extraMetadata, actionType);
   }
 
-  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata) {
-    LOG.info("Committing " + instantTime);
+  /**
+   * Complete changes performed at the given instantTime marker with specified 
action.
+   */
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
+      Option<Map<String, String>> extraMetadata, String commitActionType) {
+    List<HoodieWriteStat> writeStats = writeStatuses.filter(w -> 
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    List<HoodieWriteStat> replaceStats = writeStatuses.filter(w -> 
w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, replaceStats);
+  }
+
+  public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata,
+                             String commitActionType, List<HoodieWriteStat> 
replaceStats) {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieTableMetaClient metaClient = createMetaClient(false);
-    String actionType = metaClient.getCommitActionType();
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
 
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
-    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), 
stat));
-
+    HoodieCommitMetadata metadata = 
CommitUtils.buildWriteActionMetadata(stats, replaceStats, extraMetadata, 
operationType, config.getSchema(), commitActionType);

Review comment:
       rename: to just buildMetdata() , Commit is already  implicit from 
context. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata 
result) {
   }
 
   protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result) {
-    commit(extraMetadata, result, 
result.getWriteStatuses().map(WriteStatus::getStat).collect());
+    List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w -> 
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();

Review comment:
       note to self: see if there is a way to avoid repeating this filtering 
here again 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata 
result) {
   }
 
   protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result) {
-    commit(extraMetadata, result, 
result.getWriteStatuses().map(WriteStatus::getStat).collect());
+    List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w -> 
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    List<HoodieWriteStat> replacedStats = result.getWriteStatuses().filter(w 
-> w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    commit(extraMetadata, result, writeStats, replacedStats);
   }
 
-  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result, List<HoodieWriteStat> stats) {
-    String actionType = table.getMetaClient().getCommitActionType();
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result, List<HoodieWriteStat> writeStats, 
List<HoodieWriteStat> replaceStats) {
+    String actionType = getCommitActionType();
     LOG.info("Committing " + instantTime + ", action Type " + actionType);
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
 
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
 
     result.setCommitted(true);
-    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), 
stat));
-    result.setWriteStats(stats);
+    result.setWriteStats(writeStats);
+    result.setReplaceStats(replaceStats);
 
     // Finalize write
-    finalizeWrite(instantTime, stats, result);
-
-    // add in extra metadata
-    if (extraMetadata.isPresent()) {
-      extraMetadata.get().forEach(metadata::addMetadata);
-    }
-    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
getSchemaToStoreInCommit());
-    metadata.setOperationType(operationType);
+    finalizeWrite(instantTime, writeStats, result);
 
     try {
-      activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, 
instantTime),
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-      LOG.info("Committed " + instantTime);
+      HoodieCommitMetadata metadata = writeInstant(writeStats, replaceStats, 
extraMetadata);
+      result.setCommitMetadata(Option.of(metadata));
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime,
           e);
     }
-    result.setCommitMetadata(Option.of(metadata));
+  }
+
+  private HoodieCommitMetadata writeInstant(List<HoodieWriteStat>  writeStats, 
List<HoodieWriteStat> replaceStats, Option<Map<String, String>> extraMetadata) 
throws IOException {

Review comment:
       rename: completeInstant(), its better to stay close to what the method 
is doing; using just one terminology

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {

Review comment:
       rename: deleteReplacedFileGroups() , to be consistent with our 
terminology 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata 
result) {
   }
 
   protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result) {
-    commit(extraMetadata, result, 
result.getWriteStatuses().map(WriteStatus::getStat).collect());
+    List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w -> 
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    List<HoodieWriteStat> replacedStats = result.getWriteStatuses().filter(w 
-> w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+    commit(extraMetadata, result, writeStats, replacedStats);
   }
 
-  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result, List<HoodieWriteStat> stats) {
-    String actionType = table.getMetaClient().getCommitActionType();
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata result, List<HoodieWriteStat> writeStats, 
List<HoodieWriteStat> replaceStats) {
+    String actionType = getCommitActionType();
     LOG.info("Committing " + instantTime + ", action Type " + actionType);
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
 
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
 
     result.setCommitted(true);
-    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), 
stat));
-    result.setWriteStats(stats);
+    result.setWriteStats(writeStats);
+    result.setReplaceStats(replaceStats);
 
     // Finalize write
-    finalizeWrite(instantTime, stats, result);
-
-    // add in extra metadata
-    if (extraMetadata.isPresent()) {
-      extraMetadata.get().forEach(metadata::addMetadata);
-    }
-    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
getSchemaToStoreInCommit());
-    metadata.setOperationType(operationType);

Review comment:
       nts: need to ensure the operation type  is properly set 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {

Review comment:
       we need to do the deletion in parallel. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> 
deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> 
deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in 
this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, 
TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {

Review comment:
       This seems like a check for whether the instant is a replacecommit or 
not. if the instant time is a completed instant and replacecommit type, then we 
must find the instant here, right? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> 
deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> 
deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in 
this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, 
TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieReplaceCommitMetadata metadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieReplaceCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().keySet().forEach(partition -> 
fileSystemView.getAllFileGroups(partition));
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot 
delete replace files", e);
+      }
+    });
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+    try {
+      LOG.info("Deleting " + path + " before archiving " + instant);
+      metaClient.getFs().delete(path);

Review comment:
       you probably dont want to fetch the fs object each time? Also lets 
delete in parallel, from the get go?  we will invariably need to do this, much 
like parallelizing cleaning.  

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {

Review comment:
       probably a check that this is a replace instant as well? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -576,7 +592,8 @@ public String startCommit() {
       rollbackPendingCommits();
     }
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    startCommit(instantTime);
+    HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       you can just call startCommitWithTime(instantTime) from here? 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       Do we need to ask the file system view for all the replace file groups? 
this must be in the metadata already right? As long as we can get the 
HoodieFileGroup objects corresponding to the filegroup ids in the metadata, we 
can go ahead? What I am suggesting in an alternative and subjectively cleaner 
replacement for `ensureReplaced...` above, which seems to make a dummy read to 
warm up the datastuctures. I prefer to let that happen naturally on its own as 
opposed to having this "special" call

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteActionTimeline();
+    resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);
   }
 
   /**
    * Adds the provided statuses into the file system view, and also caches it 
inside this object.
    */
   protected List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
     HoodieTimer timer = new HoodieTimer().startTimer();
+

Review comment:
       nit: remove extra line?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = 
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, 
HoodieTable table,
+                                             String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, 
WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, 
(HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> 
inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
+    // do necessary inserts into new file groups
+    JavaRDD<WriteStatus> writeStatuses = 
super.processInputRecords(inputRecordsRDD, profile);
+    return writeStatuses.union(replaceStatuses);
+  }
+
+  private JavaRDD<WriteStatus> getAllReplaceWriteStatus(WorkloadProfile 
profile) {
+    JavaRDD<String> partitions = jsc.parallelize(new 
ArrayList<>(profile.getPartitionPaths()));
+    JavaRDD<WriteStatus> replaceStatuses = partitions.flatMap(partition ->
+        getAllExistingFileIds(partition).map(fileId -> 
getReplaceWriteStatus(partition, fileId)).iterator());
+
+    return replaceStatuses;
+  }
+
+  private Stream<String> getAllExistingFileIds(String partitionPath) {
+    // because new commit is not complete. it is safe to mark all base files 
as old files
+    return 
table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile -> 
baseFile.getFileId());
+  }
+
+  private WriteStatus getReplaceWriteStatus(String partitionPath, String 
fileId) {
+    // mark file as 'replaced' in metadata. the actual file will be removed 
later by cleaner to provide snapshot isolation
+    WriteStatus status = new WriteStatus(false, 0.0);
+    status.setReplacedFileId(true);
+    status.setFileId(fileId);
+    status.setTotalErrorRecords(0);
+    status.setPartitionPath(partitionPath);
+    HoodieWriteStat replaceStat = new HoodieWriteStat();
+    status.setStat(replaceStat);
+    replaceStat.setPartitionPath(partitionPath);
+    replaceStat.setFileId(fileId);
+    
replaceStat.setPath(table.getBaseFileOnlyView().getLatestBaseFile(partitionPath,
 fileId).get().getPath());
+    status.getStat().setNumDeletes(Integer.MAX_VALUE);//token to indicate all 
rows are deleted

Review comment:
       Seeing such large values in the metadata can be bit confusing. can we 
set it to -1 instead for now

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = 
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, 
HoodieTable table,
+                                             String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, 
WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, 
(HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> 
inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);

Review comment:
       So, this creates a dependency on the workloadProfile for doing insert 
overwrite. While we always provide a WorkloadProfile for now, in the future we 
would like to remove this need for caching data in memory and building the 
profile. 
   Can we try to reimplement this such that 
   
    - processInputRecords(..) just writes the new records and returns 
WriteStatus for the new file groups alone.
    - During commit time, after we collect the WriteStatus, we can obtain the 
`replaceStatuses` based on the partitions that were actually written to during 
step above. 
    
    This also gives us a cleaner solution for avoiding the boolean flag we 
discussed. API is also consistent now, that writeClient.insertOverwrite() only 
returns the WriteStatus for the new file group IDs. 
   
   

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
##########
@@ -94,6 +94,14 @@ public void setWriteStats(List<HoodieWriteStat> writeStats) {
     this.writeStats = Option.of(writeStats);
   }
 
+  public Option<List<HoodieWriteStat>> getReplacetats() {

Review comment:
       typo: getReplaceStats

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,15 +603,23 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    startCommitWithTime(instantTime, metaClient.getCommitActionType());
+  }
+
+  /**
+   * Completes a new commit time for a write operation (insert/update/delete) 
with specified action.
+   */
+  public void startCommitWithTime(String instantTime, String actionType) {
     // NOTE : Need to ensure that rollback is done before a new commit is 
started
     if (rollbackPending) {
       // Only rollback inflight commit/delta-commits. Do not touch compaction 
commits
       rollbackPendingCommits();
     }
-    startCommit(instantTime);
+    startCommit(instantTime, actionType);
   }
 
-  private void startCommit(String instantTime) {
+  private void startCommit(String instantTime, String actionType) {
     LOG.info("Generate a new instant time " + instantTime);
     HoodieTableMetaClient metaClient = createMetaClient(true);

Review comment:
       can we pass in the `metaClient` from caller. This seems to introduce 
additional creations, which all list .hoodie again 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteActionTimeline();
+    resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);

Review comment:
       I think its sufficient todo this reset in the init() method above, much 
like pendingCompaction and bootstreap handling. This method is simply used to 
refresh the timeline i.e the instants that are visible. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
##########
@@ -46,11 +46,12 @@
   public static final String SCHEMA_KEY = "schema";
   private static final Logger LOG = 
LogManager.getLogger(HoodieCommitMetadata.class);
   protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
+

Review comment:
       nit: extra line

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends CommitActionExecutor<T> {
+
+  private static final Logger LOG = 
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+  private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+  public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+                                             HoodieWriteConfig config, 
HoodieTable table,
+                                             String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(jsc, config, table, instantTime, 
WriteOperationType.INSERT_OVERWRITE);
+    this.inputRecordsRDD = inputRecordsRDD;
+  }
+
+  @Override
+  public HoodieWriteMetadata execute() {
+    return WriteHelper.write(instantTime, inputRecordsRDD, jsc, 
(HoodieTable<T>) table,
+        config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>> 
inputRecordsRDD, WorkloadProfile profile) {
+    // get all existing fileIds to mark them as replaced
+    JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
+    // do necessary inserts into new file groups
+    JavaRDD<WriteStatus> writeStatuses = 
super.processInputRecords(inputRecordsRDD, profile);
+    return writeStatuses.union(replaceStatuses);
+  }
+
+  private JavaRDD<WriteStatus> getAllReplaceWriteStatus(WorkloadProfile 
profile) {
+    JavaRDD<String> partitions = jsc.parallelize(new 
ArrayList<>(profile.getPartitionPaths()));
+    JavaRDD<WriteStatus> replaceStatuses = partitions.flatMap(partition ->
+        getAllExistingFileIds(partition).map(fileId -> 
getReplaceWriteStatus(partition, fileId)).iterator());
+
+    return replaceStatuses;
+  }
+
+  private Stream<String> getAllExistingFileIds(String partitionPath) {
+    // because new commit is not complete. it is safe to mark all base files 
as old files
+    return 
table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile -> 
baseFile.getFileId());

Review comment:
       why limit to just base files. we may have log files without base files. 
i.e insert to log files code path

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient, 
HoodieTimeline visibleActi
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
-    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+    this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteActionTimeline();

Review comment:
       now that replace also is `replacecommit`. should we just leave the 
`getCommitsAndCompactionTimeline()` be? 

##########
File path: hudi-common/src/main/avro/HoodieReplaceCommitMetadata.avsc
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+   "namespace":"org.apache.hudi.avro.model",
+   "type":"record",
+   "name":"HoodieReplaceCommitMetadata",
+   "fields":[
+      {
+         "name":"partitionToWriteStats",

Review comment:
       for my own understanding, copying the fields from CommitMetadata is the 
only way to "inherit" the avro schema, I guess? 
   also, to be consistent. should we first place the base fields from commit 
metadata first, and add `partitionToReplaceStats` at the end? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/client/WriteStatus.java
##########
@@ -52,6 +52,9 @@
 
   private HoodieWriteStat stat = null;
 
+  // if true, indicates the fileId in this WriteStatus is being replaced
+  private boolean isReplacedFileId;

Review comment:
       rename: isReplaced

##########
File path: 
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -880,6 +880,89 @@ public void testDeletesWithDeleteApi() throws Exception {
     testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, 
keysSoFar);
   }
 
+  /**
+   * Test scenario of writing more file groups than existing number of file 
groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithMoreRecords() throws 
Exception {
+    verifyInsertOverwritePartitionHandling(1000, 3000);
+  }
+
+  /**
+   * Test scenario of writing fewer file groups than existing number of file 
groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlingWithFewerRecords() throws 
Exception {
+    verifyInsertOverwritePartitionHandling(3000, 1000);
+  }
+
+  /**
+   * Test scenario of writing similar number file groups in partition.
+   */
+  @Test
+  public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() 
throws Exception {
+    verifyInsertOverwritePartitionHandling(3000, 3000);
+  }
+
+  /**
+   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
+   *  2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of 
records.
+   *
+   *  Verify that all records in step1 are overwritten
+   */
+  private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, 
int batch2RecordsCount) throws Exception {
+    final String testPartitionPath = "americas";
+    HoodieWriteConfig config = getSmallInsertWriteConfig(2000);
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
+    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+
+    // Do Inserts
+    String commitTime1 = "001";
+    client.startCommitWithTime(commitTime1);
+    List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 
batch1RecordsCount);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime1).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> batch1Buckets = statuses.stream().map(s -> 
s.getFileId()).collect(Collectors.toSet());
+    verifyParquetFileData(commitTime1, inserts1, statuses);
+
+    // Do Insert Overwrite
+    String commitTime2 = "002";
+    client.startCommitWithTime(commitTime2, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+    List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 
batch2RecordsCount);
+    List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
+    insertsAndUpdates2.addAll(inserts2);
+    JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = 
jsc.parallelize(insertsAndUpdates2, 2);
+    statuses = client.insertOverwrite(insertAndUpdatesRDD2, 
commitTime2).collect();
+    assertNoWriteErrors(statuses);
+    Set<String> replacedBuckets = statuses.stream().filter(s -> 
s.isReplacedFileId())
+        .map(s -> s.getFileId()).collect(Collectors.toSet());
+    assertEquals(batch1Buckets, replacedBuckets);
+    List<WriteStatus> newBuckets = statuses.stream().filter(s -> 
!(s.isReplacedFileId()))
+        .collect(Collectors.toList());
+    verifyParquetFileData(commitTime2, inserts2, newBuckets);
+  }
+
+  /**
+   * Verify data in parquet files matches expected records and commit time.
+   */
+  private void verifyParquetFileData(String commitTime, List<HoodieRecord> 
expectedRecords, List<WriteStatus> allStatus) {

Review comment:
       please keep test/naming to base and log files. and not leak parquet to 
the test? Also can you please see if this test can be authored by reusing 
existing helpers. Its often bit hard to read and reuse the exisiting helpers, 
but hte more one-offs we introduce, the worse this situation becomes. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice 
abstraction.
+ */
+public class CommitUtils {
+
+  private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
+
+  public static HoodieCommitMetadata 
buildWriteActionMetadata(List<HoodieWriteStat> writeStats,

Review comment:
       please add a simple unit tests for this . testing for e.g that the 
schema is set, op type is set etc

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -196,6 +205,32 @@ protected void refreshTimeline(HoodieTimeline 
visibleActiveTimeline) {
     return fileGroups;
   }
 
+  /**
+   * Get replaced instant for each file group by looking at all commit 
instants.
+   */
+  private void resetFileGroupsReplaced(HoodieTimeline timeline) {
+    Instant indexStartTime = Instant.now();

Review comment:
       please use HoodieTimer to time code segments.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
##########
@@ -22,5 +22,5 @@
  * The supported action types.
  */
 public enum ActionType {
-  commit, savepoint, compaction, clean, rollback
+  commit, savepoint, compaction, clean, rollback, replacecommit

Review comment:
       deltacommit is not here. huh. file a "code cleanup" JIRA for later? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -880,6 +957,30 @@ private FileSlice fetchMergedFileSlice(HoodieFileGroup 
fileGroup, FileSlice file
         .fromJavaOptional(fetchLatestFileSlices(partitionPath).filter(fs -> 
fs.getFileId().equals(fileId)).findFirst());
   }
 
+  private boolean isFileGroupReplaced(HoodieFileGroup fileGroup) {
+    Option<HoodieInstant> hoodieInstantOption = 
getReplacedInstant(fileGroup.getFileGroupId());
+    return hoodieInstantOption.isPresent();
+  }
+
+  private boolean isFileGroupReplacedBeforeAny(HoodieFileGroup fileGroup, 
List<String> instants) {
+    Option<HoodieInstant> hoodieInstantOption = 
getReplacedInstant(fileGroup.getFileGroupId());
+    if (!hoodieInstantOption.isPresent()) {
+      return false;
+    }
+
+    return 
HoodieTimeline.compareTimestamps(instants.stream().max(Comparator.naturalOrder()).get(),

Review comment:
       can you just call isFileGroupReplacedBeforeOrOn(fileGroup, 
max(instants)) ? without having to implement this again

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map<String, List<HoodieWriteStat>> partitionToReplaceStats;

Review comment:
       these are the file groups being replaced? I thought we were going to 
just track the file ids? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -727,6 +775,21 @@ private String formatPartitionKey(String partitionStr) {
    */
   abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups();
 
+  /**
+   * Track instant time for file groups replaced.
+   */
+  protected abstract void resetReplacedFileGroups(final Map<HoodieFileGroupId, 
HoodieInstant> replacedFileGroups);
+
+  /**
+   * Track instant time for new file groups replaced.
+   */
+  protected abstract void addReplacedFileGroups(final Map<HoodieFileGroupId, 
HoodieInstant> replacedFileGroups);
+
+  /**
+   * Track instant time for file groups replaced.
+   */
+  protected abstract Option<HoodieInstant> getReplacedInstant(final 
HoodieFileGroupId fileGroupId);

Review comment:
       rename: getReplaceInstant() 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -113,6 +112,18 @@ public HoodieDefaultTimeline 
getCommitsAndCompactionTimeline() {
     return new HoodieDefaultTimeline(instants.stream().filter(s -> 
validActions.contains(s.getAction())), details);
   }
 
+  @Override
+  public HoodieDefaultTimeline getWriteActionTimeline() {
+    Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
+    return new HoodieDefaultTimeline(instants.stream().filter(s -> 
validActions.contains(s.getAction())), details);
+  }
+
+  @Override
+  public HoodieTimeline getCompletedAndReplaceTimeline() {

Review comment:
       rename: getCompletedReplaceTimeline()  current naming gives the 
impression that its either completed or replacecommit

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
##########
@@ -200,6 +199,14 @@ public static void createInflightCommitFiles(String 
basePath, String... instantT
     }
   }
 
+  public static HoodieWriteStat createReplaceStat(final String partitionPath, 
final String fileId1) {

Review comment:
       there is nothing specific about replace in this method? should we move 
this to the test class itself. inline? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
##########
@@ -64,6 +64,11 @@
    */
   protected Map<HoodieFileGroupId, BootstrapBaseFileMapping> 
fgIdToBootstrapBaseFile;
 
+  /**
+   * Track replace time for replaced file groups.
+   */
+  protected Map<HoodieFileGroupId, HoodieInstant> fgIdToReplaceInstant;

Review comment:
       rename: fgIdToReplaceInstants
   

##########
File path: hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
##########
@@ -36,6 +36,14 @@
          ],
          "default": null
       },
+      {

Review comment:
       should we just add at the end? not sure if it will be backwards 
compatible nicely otherwise.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -366,6 +378,14 @@ public static void createCompactionRequestedFile(String 
basePath, String instant
     createEmptyFile(basePath, commitFile, configuration);
   }
 
+  public static void createDataFile(String basePath, String partitionPath, 
String instantTime, String fileID, Configuration configuration)

Review comment:
       for tests, I suggest using the `HoodieWritableTestTable` etc instead of 
introducing new methods. Also please check other utilities to avoid writing a 
new method here

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -376,31 +379,35 @@ private[hudi] object HoodieSparkSqlWriter {
     metaSyncSuccess
   }
 
+  /**
+   * Scala says method cannot have more than 7 arguments. So group all 
table/action specific information into a case class.

Review comment:
       please remove the scala specific comment. its good to encapsulate like 
this anyway

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void 
removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
         schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath, 
fileId)).map(Pair::getValue)).findFirst());
   }
 
+  @Override
+  protected void resetReplacedFileGroups(final Map<HoodieFileGroupId, 
HoodieInstant> replacedFileGroups) {

Review comment:
       @bvaradar if you can take a pass at these, that would be great 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       One more consideration, as I went through the remainder of the PR. if 
there was an pending compaction for the replaced file group, then the file 
group metadata we encode may miss new base files produced as a result of the 
compaction. This scenario needs to be thought thru.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -133,6 +137,21 @@
    */
   HoodieTimeline getCommitsAndCompactionTimeline();
 
+  /**
+   * Timeline to just include commits (commit/deltacommit), replace and 
compaction actions.
+   *
+   * @return
+   */
+  HoodieDefaultTimeline getWriteActionTimeline();

Review comment:
       see earlier comment on whether we need this method. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       As an after thought, I also realize that if we just encoded the entire 
file group being replaced into the metadata, (as opposed to just encoding the 
file ids), we can simply delete the file groups without any interaction with 
tableFileSytemView at all. Probably a simpler solution even? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -133,6 +137,21 @@
    */
   HoodieTimeline getCommitsAndCompactionTimeline();
 
+  /**
+   * Timeline to just include commits (commit/deltacommit), replace and 
compaction actions.
+   *
+   * @return
+   */
+  HoodieDefaultTimeline getWriteActionTimeline();

Review comment:
       To expand, I am wondering if we should just include replacecommit within 
`getCommitsAndCompactionTimeline()`. Most of its callers are around 
compaction/savepoint/restore etc. So we may not be seeing some cases here. 
   
   Things work for now, since filterCompletedInstants() etc are including 
replace commit in the timeline when filtering for queries. Semantically, if 
replace is a commit level action that can add new data to the timeline, then we 
should just treat it like delta commit IMO. 
   
   Would anything break if we did do that? 
   
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map<String, List<HoodieWriteStat>> partitionToReplaceStats;

Review comment:
       if these are the file groups being replaced, then does this contain all 
the file slices (see my comment around deleting the replaced file groups in 
timeline archive log) 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata 
insertPrepped(JavaSparkContext jsc, String i
   public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, 
String instantTime,
       JavaRDD<HoodieRecord<T>> preppedRecords,  Option<BulkInsertPartitioner> 
bulkInsertPartitioner);
 
+  /**
+   * Logically delete all existing records and Insert a batch of new records 
into Hoodie table at the supplied instantTime.
+   */
+  public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, 
String instantTime,

Review comment:
       On second thoughts, I am okay leaving this as-is for now as well. and 
reeval when acutally implementing clustering

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -111,6 +111,9 @@ private[hudi] object HoodieSparkSqlWriter {
         tableConfig = tableMetaClient.getTableConfig
       }
 
+      val metaClient = new 
HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get)
+      val commitActionType = DataSourceUtils.getCommitActionType(operation, 
metaClient)

Review comment:
       should we have a better way of getting the commit action type? I am bit 
concerned about creating new metaClient just for this. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void 
removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
         schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath, 
fileId)).map(Pair::getValue)).findFirst());
   }
 
+  @Override
+  protected void resetReplacedFileGroups(final Map<HoodieFileGroupId, 
HoodieInstant> replacedFileGroups) {

Review comment:
       General question I have around incremental file system view and rocksDB 
like persistent file system view storage is whether we will keep this list 
updated. i.e when the archival/cleaning runs, how do we ensure the deleted 
replaced file groups are no longer tracked inside rocksdb. 
   
   I guess the lines below, are doing a bulk delete and insert to achieve the 
same?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to