This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ab4a7b0  [HUDI-1788] Insert overwrite (table) for Flink writer (#2808)
ab4a7b0 is described below

commit ab4a7b0b4afc66d2123c7f63fdab77b925a8a7f1
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 14 10:23:37 2021 +0800

    [HUDI-1788] Insert overwrite (table) for Flink writer (#2808)
    
    Supports `INSERT OVERWRITE` and `INSERT OVERWRITE TABLE` for Flink
    writer.
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 122 +++++++++++++++------
 .../hudi/table/ExplicitWriteHandleTable.java       |  32 ++++++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  24 +++-
 .../FlinkInsertOverwriteCommitActionExecutor.java  |  69 ++++++++++++
 ...nkInsertOverwriteTableCommitActionExecutor.java |  50 +++++++++
 .../hudi/common/model/WriteOperationType.java      |   4 +
 .../org/apache/hudi/common/util/CommitUtils.java   |  11 ++
 .../apache/hudi/sink/InstantGenerateOperator.java  |   4 +-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  21 +++-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  52 ++++++++-
 .../sink/partitioner/BucketAssignFunction.java     |   5 +-
 .../hudi/sink/partitioner/BucketAssigners.java     |  13 ++-
 ...Assigners.java => OverwriteBucketAssigner.java} |  43 +++-----
 .../org/apache/hudi/table/HoodieTableSink.java     |  23 +++-
 .../sink/TestStreamWriteOperatorCoordinator.java   |  10 +-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  27 +++--
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |   4 +-
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |   4 +-
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  50 ++++++++-
 .../test/java/org/apache/hudi/utils/TestData.java  |  20 ++++
 .../main/java/org/apache/hudi/DataSourceUtils.java |  11 --
 .../internal/DataSourceInternalWriterHelper.java   |   4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   5 +-
 23 files changed, 490 insertions(+), 118 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7eeec4c..a84e116 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -22,6 +22,8 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -139,10 +141,8 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
-    final HoodieRecord<T> record = records.get(0);
-    final boolean isDelta = 
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
-    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = 
getOrCreateWriteHandle(record, isDelta, getConfig(),
-        instantTime, table, record.getPartitionPath(), records.listIterator());
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = 
getOrCreateWriteHandle(records.get(0), getConfig(),
+        instantTime, table, records.listIterator());
     HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) 
table).upsert(context, writeHandle, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
@@ -162,10 +162,8 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
     // create the write handle if not exists
-    final HoodieRecord<T> record = records.get(0);
-    final boolean isDelta = 
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
-    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = 
getOrCreateWriteHandle(record, isDelta, getConfig(),
-        instantTime, table, record.getPartitionPath(), records.listIterator());
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = 
getOrCreateWriteHandle(records.get(0), getConfig(),
+        instantTime, table, records.listIterator());
     HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) 
table).insert(context, writeHandle, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
@@ -173,6 +171,45 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     return postWrite(result, instantTime, table);
   }
 
+  /**
+   * Removes all existing records from the partitions affected and inserts the 
given HoodieRecords, into the table.
+   *
+   * @param records HoodieRecords to insert
+   * @param instantTime Instant time of the commit
+   * @return list of WriteStatus to inspect errors and counts
+   */
+  public List<WriteStatus> insertOverwrite(
+      List<HoodieRecord<T>> records, final String instantTime) {
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
+        getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
+    table.validateInsertSchema();
+    preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, 
table.getMetaClient());
+    // create the write handle if not exists
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = 
getOrCreateWriteHandle(records.get(0), getConfig(),
+        instantTime, table, records.listIterator());
+    HoodieWriteMetadata result = ((HoodieFlinkTable<T>) 
table).insertOverwrite(context, writeHandle, instantTime, records);
+    return postWrite(result, instantTime, table);
+  }
+
+  /**
+   * Removes all existing records of the Hoodie table and inserts the given 
HoodieRecords, into the table.
+   *
+   * @param records HoodieRecords to insert
+   * @param instantTime Instant time of the commit
+   * @return list of WriteStatus to inspect errors and counts
+   */
+  public List<WriteStatus> insertOverwriteTable(
+      List<HoodieRecord<T>> records, final String instantTime) {
+    HoodieTable table = 
getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
+    table.validateInsertSchema();
+    preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, 
table.getMetaClient());
+    // create the write handle if not exists
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = 
getOrCreateWriteHandle(records.get(0), getConfig(),
+        instantTime, table, records.listIterator());
+    HoodieWriteMetadata result = ((HoodieFlinkTable<T>) 
table).insertOverwriteTable(context, writeHandle, instantTime, records);
+    return postWrite(result, instantTime, table);
+  }
+
   @Override
   public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> 
preppedRecords, String instantTime) {
     throw new HoodieNotSupportedException("InsertPrepped operation is not 
supported yet");
@@ -353,27 +390,25 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
    * Get or create a new write handle in order to reuse the file handles.
    *
    * @param record        The first record in the bucket
-   * @param isDelta       Whether the table is in MOR mode
    * @param config        Write config
    * @param instantTime   The instant time
    * @param table         The table
-   * @param partitionPath Partition path
    * @param recordItr     Record iterator
    * @return Existing write handle or create a new one
    */
   private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
       HoodieRecord<T> record,
-      boolean isDelta,
       HoodieWriteConfig config,
       String instantTime,
       HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
-      String partitionPath,
       Iterator<HoodieRecord<T>> recordItr) {
     final HoodieRecordLocation loc = record.getCurrentLocation();
     final String fileID = loc.getFileId();
     if (bucketToHandles.containsKey(fileID)) {
       return bucketToHandles.get(fileID);
     }
+    final String partitionPath = record.getPartitionPath();
+    final boolean isDelta = 
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
     final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
     if (isDelta) {
       writeHandle = new FlinkAppendHandle<>(config, instantTime, table, 
partitionPath, fileID, recordItr,
@@ -409,19 +444,23 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
         .collect(Collectors.toList());
   }
 
-  public String getInflightAndRequestedInstant(String tableType) {
-    final String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+  public String getLastPendingInstant(HoodieTableType tableType) {
+    final String actionType = CommitUtils.getCommitActionType(tableType);
+    return getLastPendingInstant(actionType);
+  }
+
+  public String getLastPendingInstant(String actionType) {
     HoodieTimeline unCompletedTimeline = 
getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
     return unCompletedTimeline.getInstants()
-        .filter(x -> x.getAction().equals(commitType))
+        .filter(x -> x.getAction().equals(actionType))
         .map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList()).stream()
         .max(Comparator.naturalOrder())
         .orElse(null);
   }
 
-  public String getLastCompletedInstant(String tableType) {
-    final String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+  public String getLastCompletedInstant(HoodieTableType tableType) {
+    final String commitType = CommitUtils.getCommitActionType(tableType);
     HoodieTimeline completedTimeline = 
getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants();
     return completedTimeline.getInstants()
         .filter(x -> x.getAction().equals(commitType))
@@ -431,32 +470,49 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
         .orElse(null);
   }
 
-  public void deletePendingInstant(String tableType, String instant) {
-    HoodieFlinkTable<T> table = getHoodieTable();
-    String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    HoodieActiveTimeline activeTimeline = 
table.getMetaClient().getActiveTimeline();
-    activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT, 
commitType, instant);
-    activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED, 
commitType, instant);
-  }
-
-  public void transitionRequestedToInflight(String tableType, String 
inFlightInstant) {
+  public void transitionRequestedToInflight(String commitType, String 
inFlightInstant) {
     HoodieFlinkTable<T> table = getHoodieTable();
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
-    String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
     HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, 
commitType, inFlightInstant);
     activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
         config.shouldAllowMultiWriteOnSameInstant());
   }
 
-  public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
+  public HoodieFlinkTable<T> getHoodieTable() {
+    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+  }
+
+  public Map<String, List<String>> getPartitionToReplacedFileIds(
+      WriteOperationType writeOperationType,
+      List<WriteStatus> writeStatuses) {
     HoodieFlinkTable<T> table = getHoodieTable();
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      rollbackInflightCompaction(inflightInstant, table);
+    switch (writeOperationType) {
+      case INSERT_OVERWRITE:
+        return writeStatuses.stream().map(status -> 
status.getStat().getPartitionPath()).distinct()
+            .collect(
+                Collectors.toMap(
+                    partition -> partition,
+                    partitionPath -> getAllExistingFileIds(table, 
partitionPath)));
+      case INSERT_OVERWRITE_TABLE:
+        Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+        List<String> partitionPaths =
+            FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), 
table.getMetaClient().getBasePath());
+        if (partitionPaths != null && partitionPaths.size() > 0) {
+          context.setJobStatus(this.getClass().getSimpleName(), "Getting 
ExistingFileIds of all partitions");
+          partitionToExistingFileIds = partitionPaths.stream().parallel()
+              .collect(
+                  Collectors.toMap(
+                      partition -> partition,
+                      partition -> getAllExistingFileIds(table, partition)));
+        }
+        return partitionToExistingFileIds;
+      default:
+        throw new AssertionError();
     }
   }
 
-  public HoodieFlinkTable<T> getHoodieTable() {
-    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+  private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String 
partitionPath) {
+    // because new commit is not complete. it is safe to mark all existing 
file Ids as old files
+    return 
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
index c0699ff..f799919 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
@@ -125,4 +125,36 @@ public interface ExplicitWriteHandleTable<T extends 
HoodieRecordPayload> {
       HoodieWriteHandle<?, ?, ?, ?> writeHandle,
       String instantTime,
       List<HoodieRecord<T>> preppedRecords);
+
+  /**
+   * Replaces all the existing records and inserts the specified new records 
into Hoodie table at the supplied instantTime,
+   * for the partition paths contained in input records.
+   *
+   * @param context HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant time for the replace action
+   * @param records input records
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records);
+
+  /**
+   * Deletes all the existing records of the Hoodie table and inserts the 
specified new records into Hoodie table at the supplied instantTime,
+   * for the partition paths contained in input records.
+   *
+   * @param context HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant time for the replace action
+   * @param records input records
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records);
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 167b376..d8bdb9f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -47,6 +47,8 @@ import 
org.apache.hudi.table.action.clean.FlinkCleanActionExecutor;
 import org.apache.hudi.table.action.clean.FlinkScheduleCleanActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
+import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
+import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkMergeHelper;
 import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
@@ -182,6 +184,24 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
+  public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records) {
+    return new FlinkInsertOverwriteCommitActionExecutor(context, writeHandle, 
config, this, instantTime, records).execute();
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records) {
+    return new FlinkInsertOverwriteTableCommitActionExecutor(context, 
writeHandle, config, this, instantTime, records).execute();
+  }
+
+  @Override
   public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext 
context, String instantTime, List<HoodieRecord<T>> records) {
     throw new HoodieNotSupportedException("This method should not be invoked");
   }
@@ -229,12 +249,12 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> 
insertOverwrite(HoodieEngineContext context, String instantTime, 
List<HoodieRecord<T>> records) {
-    throw new HoodieNotSupportedException("InsertOverWrite is not supported 
yet");
+    throw new HoodieNotSupportedException("This method should not be invoked");
   }
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> 
insertOverwriteTable(HoodieEngineContext context, String instantTime, 
List<HoodieRecord<T>> records) {
-    throw new HoodieNotSupportedException("insertOverwriteTable is not 
supported yet");
+    throw new HoodieNotSupportedException("This method should not be invoked");
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java
new file mode 100644
index 0000000..583e0b6
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteCommitActionExecutor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class FlinkInsertOverwriteCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends BaseFlinkCommitActionExecutor<T> {
+
+  protected List<HoodieRecord<T>> inputRecords;
+
+  public FlinkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+                                                  HoodieWriteHandle<?, ?, ?, 
?> writeHandle,
+                                                  HoodieWriteConfig config,
+                                                  HoodieTable table,
+                                                  String instantTime,
+                                                  List<HoodieRecord<T>> 
inputRecords) {
+    this(context, writeHandle, config, table, instantTime, inputRecords, 
WriteOperationType.INSERT_OVERWRITE);
+  }
+
+  public FlinkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+                                                  HoodieWriteHandle<?, ?, ?, 
?> writeHandle,
+                                                  HoodieWriteConfig config,
+                                                  HoodieTable table,
+                                                  String instantTime,
+                                                  List<HoodieRecord<T>> 
inputRecords,
+                                                  WriteOperationType 
writeOperationType) {
+    super(context, writeHandle, config, table, instantTime, 
writeOperationType);
+    this.inputRecords = inputRecords;
+  }
+
+  @Override
+  protected String getCommitActionType() {
+    return HoodieTimeline.REPLACE_COMMIT_ACTION;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, 
context, table,
+        config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java
new file mode 100644
index 0000000..a31679b
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertOverwriteTableCommitActionExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class FlinkInsertOverwriteTableCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends FlinkInsertOverwriteCommitActionExecutor<T> {
+
+  public FlinkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext 
context,
+                                                       HoodieWriteHandle<?, ?, 
?, ?> writeHandle,
+                                                       HoodieWriteConfig 
config,
+                                                       HoodieTable table,
+                                                       String instantTime,
+                                                       List<HoodieRecord<T>> 
inputRecords) {
+    super(context, writeHandle, config, table, instantTime, inputRecords, 
WriteOperationType.INSERT_OVERWRITE_TABLE);
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, 
context, table,
+        config.shouldCombineBeforeInsert(), 
config.getInsertShuffleParallelism(), this, false);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 41cfc8a..b5a3cc0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -104,4 +104,8 @@ public enum WriteOperationType {
   public static boolean isChangingRecords(WriteOperationType operationType) {
     return operationType == UPSERT || operationType == UPSERT_PREPPED || 
operationType == DELETE;
   }
+
+  public static boolean isOverwrite(WriteOperationType operationType) {
+    return operationType == INSERT_OVERWRITE || operationType == 
INSERT_OVERWRITE_TABLE;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index ea36f67..4956c14 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -41,6 +41,17 @@ public class CommitUtils {
   private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
 
   /**
+   * Gets the commit action type for given write operation and table type.
+   */
+  public static String getCommitActionType(WriteOperationType operation, 
HoodieTableType tableType) {
+    if (operation == WriteOperationType.INSERT_OVERWRITE || operation == 
WriteOperationType.INSERT_OVERWRITE_TABLE) {
+      return HoodieTimeline.REPLACE_COMMIT_ACTION;
+    } else {
+      return getCommitActionType(tableType);
+    }
+  }
+
+  /**
    * Gets the commit action type for given table type.
    */
   public static String getCommitActionType(HoodieTableType tableType) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java
index 9413797..3c4fb1e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/InstantGenerateOperator.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
@@ -188,7 +189,8 @@ public class InstantGenerateOperator extends 
AbstractStreamOperator<HoodieRecord
    */
   private String startNewInstant(long checkpointId) {
     String newTime = writeClient.startCommit();
-    this.writeClient.transitionRequestedToInflight(this.cfg.tableType, 
newTime);
+    final String actionType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(this.cfg.tableType));
+    this.writeClient.transitionRequestedToInflight(actionType, newTime);
     LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
     return newTime;
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index b0321ac..8244226 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -21,7 +21,9 @@ package org.apache.hudi.sink;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -129,6 +131,11 @@ public class StreamWriteFunction<K, I, O>
   private transient OperatorEventGateway eventGateway;
 
   /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
    * Constructs a StreamingSinkFunction.
    *
    * @param config The config options
@@ -141,6 +148,9 @@ public class StreamWriteFunction<K, I, O>
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
     this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+    this.actionType = CommitUtils.getCommitActionType(
+        WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+        HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
     initBuffer();
     initWriteFunction();
   }
@@ -166,6 +176,7 @@ public class StreamWriteFunction<K, I, O>
   @Override
   public void close() {
     if (this.writeClient != null) {
+      this.writeClient.cleanHandles();
       this.writeClient.close();
     }
   }
@@ -224,6 +235,12 @@ public class StreamWriteFunction<K, I, O>
       case UPSERT:
         this.writeFunction = (records, instantTime) -> 
this.writeClient.upsert(records, instantTime);
         break;
+      case INSERT_OVERWRITE:
+        this.writeFunction = (records, instantTime) -> 
this.writeClient.insertOverwrite(records, instantTime);
+        break;
+      case INSERT_OVERWRITE_TABLE:
+        this.writeFunction = (records, instantTime) -> 
this.writeClient.insertOverwriteTable(records, instantTime);
+        break;
       default:
         throw new RuntimeException("Unsupported write operation : " + 
writeOperation);
     }
@@ -315,7 +332,7 @@ public class StreamWriteFunction<K, I, O>
 
   @SuppressWarnings("unchecked, rawtypes")
   private void flushBucket(DataBucket bucket) {
-    this.currentInstant = 
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+    this.currentInstant = 
this.writeClient.getLastPendingInstant(this.actionType);
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
       LOG.info("No inflight instant when flushing data, cancel.");
@@ -339,7 +356,7 @@ public class StreamWriteFunction<K, I, O>
 
   @SuppressWarnings("unchecked, rawtypes")
   private void flushRemaining(boolean isEndInput) {
-    this.currentInstant = 
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+    this.currentInstant = 
this.writeClient.getLastPendingInstant(this.actionType);
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
       LOG.info("No inflight instant when flushing data, cancel.");
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 54a7603..6244d65 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -20,7 +20,11 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -40,10 +44,14 @@ import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -117,6 +125,11 @@ public class StreamWriteOperatorCoordinator
   private HiveSyncContext hiveSyncContext;
 
   /**
+   * The table state.
+   */
+  private transient TableState tableState;
+
+  /**
    * Constructs a StreamingSinkOperatorCoordinator.
    *
    * @param conf    The config options
@@ -135,8 +148,8 @@ public class StreamWriteOperatorCoordinator
   public void start() throws Exception {
     // initialize event buffer
     reset();
-    // writeClient
     this.writeClient = StreamerUtil.createWriteClient(conf, null);
+    this.tableState = TableState.create(conf);
     // init table, create it if not exists.
     initTableIfNotExists(this.conf);
     // start a new instant
@@ -214,14 +227,16 @@ public class StreamWriteOperatorCoordinator
   }
 
   private void startInstant() {
-    this.instant = this.writeClient.startCommit();
-    
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE),
 this.instant);
+    final String instant = HoodieActiveTimeline.createNewInstantTime();
+    this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+    this.instant = instant;
+    this.writeClient.transitionRequestedToInflight(tableState.commitAction, 
this.instant);
     LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
             this.conf.getString(FlinkOptions.TABLE_NAME), 
conf.getString(FlinkOptions.TABLE_TYPE));
   }
 
   @Override
-  public void resetToCheckpoint(long checkpointID, @Nullable byte[] 
checkpointData) throws Exception {
+  public void resetToCheckpoint(long checkpointID, @Nullable byte[] 
checkpointData) {
     // no operation
   }
 
@@ -310,6 +325,7 @@ public class StreamWriteOperatorCoordinator
   }
 
   /** Performs the actual commit action. */
+  @SuppressWarnings("unchecked")
   private void doCommit(List<WriteStatus> writeResults) {
     // commit or rollback
     long totalErrorRecords = 
writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
@@ -323,7 +339,11 @@ public class StreamWriteOperatorCoordinator
             + totalErrorRecords + "/" + totalRecords);
       }
 
-      boolean success = writeClient.commit(this.instant, writeResults, 
Option.of(checkpointCommitMetadata));
+      final Map<String, List<String>> partitionToReplacedFileIds = 
tableState.isOverwrite
+          ? 
writeClient.getPartitionToReplacedFileIds(tableState.operationType, 
writeResults)
+          : Collections.emptyMap();
+      boolean success = writeClient.commit(this.instant, writeResults, 
Option.of(checkpointCommitMetadata),
+          tableState.commitAction, partitionToReplacedFileIds);
       if (success) {
         reset();
         LOG.info("Commit instant [{}] success!", this.instant);
@@ -401,4 +421,26 @@ public class StreamWriteOperatorCoordinator
       return new StreamWriteOperatorCoordinator(this.conf, context);
     }
   }
+
+  /**
+   * Remember some table state variables.
+   */
+  private static class TableState implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final WriteOperationType operationType;
+    private final String commitAction;
+    private final boolean isOverwrite;
+
+    private TableState(Configuration conf) {
+      this.operationType = 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+      this.commitAction = CommitUtils.getCommitActionType(this.operationType,
+          
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
+      this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
+    }
+
+    public static TableState create(Configuration conf) {
+      return new TableState(conf);
+    }
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 7e017cc..f765e9d 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -127,6 +127,7 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
     this.bucketAssigner = BucketAssigners.create(
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getNumberOfParallelSubtasks(),
+        
WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
         HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
         context,
         writeConfig);
@@ -190,7 +191,9 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
         default:
           throw new AssertionError();
       }
-      this.indexState.put(hoodieKey, location);
+      if (isChangingRecords) {
+        this.indexState.put(hoodieKey, location);
+      }
     }
     record.unseal();
     record.setCurrentLocation(location);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
index 237ec27..1c28e6d 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
@@ -33,19 +33,24 @@ public abstract class BucketAssigners {
   /**
    * Creates a {@code BucketAssigner}.
    *
-   * @param taskID    The task ID
-   * @param numTasks  The number of tasks
+   * @param taskID The task ID
+   * @param numTasks The number of tasks
+   * @param isOverwrite Whether the write operation is OVERWRITE
    * @param tableType The table type
-   * @param context   The engine context
-   * @param config    The configuration
+   * @param context The engine context
+   * @param config The configuration
    * @return the bucket assigner instance
    */
   public static BucketAssigner create(
       int taskID,
       int numTasks,
+      boolean isOverwrite,
       HoodieTableType tableType,
       HoodieFlinkEngineContext context,
       HoodieWriteConfig config) {
+    if (isOverwrite) {
+      return new OverwriteBucketAssigner(taskID, numTasks, context, config);
+    }
     switch (tableType) {
       case COPY_ON_WRITE:
         return new BucketAssigner(taskID, numTasks, context, config);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java
similarity index 55%
copy from 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
copy to 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java
index 237ec27..7e2320e 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java
@@ -19,40 +19,29 @@
 package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.sink.partitioner.delta.DeltaBucketAssigner;
+import org.apache.hudi.table.action.commit.SmallFile;
+
+import java.util.Collections;
+import java.util.List;
 
 /**
- * Utilities for {@code BucketAssigner}.
+ * BucketAssigner for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
+ * this assigner always skip the existing small files because of the 
'OVERWRITE' semantics.
+ *
+ * <p>Note: assumes the index can always index log files for Flink write.
  */
-public abstract class BucketAssigners {
-
-  private BucketAssigners() {}
-
-  /**
-   * Creates a {@code BucketAssigner}.
-   *
-   * @param taskID    The task ID
-   * @param numTasks  The number of tasks
-   * @param tableType The table type
-   * @param context   The engine context
-   * @param config    The configuration
-   * @return the bucket assigner instance
-   */
-  public static BucketAssigner create(
+public class OverwriteBucketAssigner extends BucketAssigner {
+  public OverwriteBucketAssigner(
       int taskID,
       int numTasks,
-      HoodieTableType tableType,
       HoodieFlinkEngineContext context,
       HoodieWriteConfig config) {
-    switch (tableType) {
-      case COPY_ON_WRITE:
-        return new BucketAssigner(taskID, numTasks, context, config);
-      case MERGE_ON_READ:
-        return new DeltaBucketAssigner(taskID, numTasks, context, config);
-      default:
-        throw new AssertionError();
-    }
+    super(taskID, numTasks, context, config);
+  }
+
+  @Override
+  protected List<SmallFile> getSmallFiles(String partitionPath) {
+    return Collections.emptyList();
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index b52e0ca..c3427f3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperatorFactory;
@@ -40,6 +41,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
@@ -49,10 +51,11 @@ import java.util.Map;
 /**
  * Hoodie table sink.
  */
-public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning 
{
+public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning, SupportsOverwrite {
 
   private final Configuration conf;
   private final TableSchema schema;
+  private boolean overwrite = false;
 
   public HoodieTableSink(Configuration conf, TableSchema schema) {
     this.conf = conf;
@@ -129,7 +132,21 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning {
   }
 
   @Override
-  public void applyStaticPartition(Map<String, String> map) {
-    // no operation
+  public void applyStaticPartition(Map<String, String> partition) {
+    // #applyOverwrite should have been invoked.
+    if (this.overwrite) {
+      final String operationType;
+      if (partition.size() > 0) {
+        operationType = WriteOperationType.INSERT_OVERWRITE.value();
+      } else {
+        operationType = WriteOperationType.INSERT_OVERWRITE_TABLE.value();
+      }
+      this.conf.setString(FlinkOptions.OPERATION, operationType);
+    }
+  }
+
+  @Override
+  public void applyOverwrite(boolean b) {
+    this.overwrite = b;
   }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0afd414..a2fdf22 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -104,9 +105,8 @@ public class TestStreamWriteOperatorCoordinator {
     coordinator.handleEventFromOperator(1, event1);
 
     coordinator.notifyCheckpointComplete(1);
-    String inflight = coordinator.getWriteClient()
-        .getInflightAndRequestedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
-    String lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    String inflight = 
coordinator.getWriteClient().getLastPendingInstant(HoodieTableType.COPY_ON_WRITE);
+    String lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE);
     assertThat("Instant should be complete", lastCompleted, is(instant));
     assertNotEquals("", inflight, "Should start a new instant");
     assertNotEquals(instant, inflight, "Should start a new instant");
@@ -156,7 +156,7 @@ public class TestStreamWriteOperatorCoordinator {
 
     assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1),
         "Returns early for empty write results");
-    String lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    String lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE);
     assertNull(lastCompleted, "Returns early for empty write results");
     assertNull(coordinator.getEventBuffer()[0]);
 
@@ -172,7 +172,7 @@ public class TestStreamWriteOperatorCoordinator {
     coordinator.handleEventFromOperator(1, event1);
     assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
         "Commits the instant with partial events anyway");
-    lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    lastCompleted = 
coordinator.getWriteClient().getLastCompletedInstant(HoodieTableType.COPY_ON_WRITE);
     assertThat("Commits the instant with partial events anyway", 
lastCompleted, is(instant));
   }
 
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index f373ab8..9e417e3 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -92,7 +92,7 @@ public class TestWriteCopyOnWrite {
   public void before() throws Exception {
     final String basePath = tempFile.getAbsolutePath();
     conf = TestConfigurations.getDefaultConf(basePath);
-    conf.setString(FlinkOptions.TABLE_TYPE, getTableType());
+    conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
     setUp(conf);
     this.funcWrapper = new 
StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
   }
@@ -125,8 +125,7 @@ public class TestWriteCopyOnWrite {
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(1);
 
-    String instant = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+    String instant = 
funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
     MatcherAssert.assertThat("The operator expect to send an event", 
nextEvent, instanceOf(BatchWriteSuccessEvent.class));
@@ -152,7 +151,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(2);
 
     String instant2 = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+        .getLastPendingInstant(getTableType());
     assertNotEquals(instant, instant2);
 
     final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
@@ -181,7 +180,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(1);
 
     String instant = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+        .getLastPendingInstant(getTableType());
     assertNotNull(instant);
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
@@ -223,7 +222,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(1);
 
     String instant = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+        .getLastPendingInstant(getTableType());
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
@@ -309,7 +308,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(2);
 
     String instant = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+        .getLastPendingInstant(getTableType());
 
     nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
@@ -354,7 +353,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(2);
 
     String instant = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+        .getLastPendingInstant(getTableType());
 
     nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
@@ -409,7 +408,7 @@ public class TestWriteCopyOnWrite {
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
 
     String instant = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+        .getLastPendingInstant(getTableType());
 
     funcWrapper.checkpointComplete(1);
 
@@ -493,7 +492,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(2);
 
     String instant = funcWrapper.getWriteClient()
-        .getInflightAndRequestedInstant(getTableType());
+        .getLastPendingInstant(getTableType());
 
     nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
@@ -516,7 +515,7 @@ public class TestWriteCopyOnWrite {
 
   @SuppressWarnings("rawtypes")
   private void checkInflightInstant(HoodieFlinkWriteClient writeClient) {
-    final String instant = 
writeClient.getInflightAndRequestedInstant(getTableType());
+    final String instant = writeClient.getLastPendingInstant(getTableType());
     assertNotNull(instant);
   }
 
@@ -528,7 +527,7 @@ public class TestWriteCopyOnWrite {
     final String instant;
     switch (state) {
       case REQUESTED:
-        instant = writeClient.getInflightAndRequestedInstant(getTableType());
+        instant = writeClient.getLastPendingInstant(getTableType());
         break;
       case COMPLETED:
         instant = writeClient.getLastCompletedInstant(getTableType());
@@ -539,8 +538,8 @@ public class TestWriteCopyOnWrite {
     assertThat(instant, is(instantStr));
   }
 
-  protected String getTableType() {
-    return HoodieTableType.COPY_ON_WRITE.name();
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.COPY_ON_WRITE;
   }
 
   protected void checkWrittenData(File baseFile, Map<String, String> expected) 
throws Exception {
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 5ce8ae2..b24881a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -90,7 +90,7 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
   }
 
   @Override
-  protected String getTableType() {
-    return HoodieTableType.MERGE_ON_READ.name();
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
   }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 2821cae..98d1211 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -53,7 +53,7 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
   }
 
   @Override
-  protected String getTableType() {
-    return HoodieTableType.MERGE_ON_READ.name();
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
   }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 56cbb55..d2ba3e6 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.util.StreamerUtil;
@@ -223,7 +224,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
 
     String latestCommit = StreamerUtil.createWriteClient(conf, null)
-        .getLastCompletedInstant(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+        .getLastCompletedInstant(HoodieTableType.MERGE_ON_READ);
 
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
@@ -276,6 +277,53 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
         + "id8,Han,56,1970-01-01T00:00:08,par4]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = ExecMode.class)
+  void testInsertOverwrite(ExecMode execMode) {
+    TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+
+    final String insertInto1 = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+        + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+        + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+        + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+
+    execInsertSql(tableEnv, insertInto1);
+
+    // overwrite partition 'par1' and increase in age by 1
+    final String insertInto2 = "insert overwrite t1 
partition(`partition`='par1') values\n"
+        + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01'),\n"
+        + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02')\n";
+
+    execInsertSql(tableEnv, insertInto2);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
+
+    // overwrite the whole table
+    final String insertInto3 = "insert overwrite t1 values\n"
+        + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
+        + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
+
+    execInsertSql(tableEnv, insertInto3);
+
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    final String expected = "["
+        + "id1,Danny,24,1970-01-01T00:00:01,par1, "
+        + "id2,Stephen,34,1970-01-01T00:00:02,par2]";
+    assertRowsEquals(result2, expected);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 2201aeb..4a2466c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -167,6 +167,26 @@ public class TestData {
           TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
   );
 
+  // data set of test_source.data with partition 'par1' overwrite
+  public static List<RowData> DATA_SET_SOURCE_INSERT_OVERWRITE = Arrays.asList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
+          TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
+          TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
+          TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+      insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
31,
+          TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+          TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+          TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+          TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+      insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+          TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+  );
+
   public static List<RowData> DATA_SET_UPDATE_DELETE = Arrays.asList(
       // this is update
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 632a155..00b7116 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -29,10 +29,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -193,14 +190,6 @@ public class DataSourceUtils {
     return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), 
createHoodieConfig(schemaStr, basePath, tblName, parameters));
   }
 
-  public static String getCommitActionType(WriteOperationType operation, 
HoodieTableType tableType) {
-    if (operation == WriteOperationType.INSERT_OVERWRITE || operation == 
WriteOperationType.INSERT_OVERWRITE_TABLE) {
-      return HoodieTimeline.REPLACE_COMMIT_ACTION;
-    } else {
-      return CommitUtils.getCommitActionType(tableType);
-    }
-  }
-
   public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, 
JavaRDD<HoodieRecord> hoodieRecords,
                                                    String instantTime, 
WriteOperationType operation) throws HoodieException {
     switch (operation) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index e804163..b2e7134 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.internal;
 
-import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -27,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -79,7 +79,7 @@ public class DataSourceInternalWriterHelper {
   public void commit(List<HoodieWriteStat> writeStatList) {
     try {
       writeClient.commitStats(instantTime, writeStatList, Option.of(new 
HashMap<>()),
-          DataSourceUtils.getCommitActionType(operationType, 
metaClient.getTableType()));
+          CommitUtils.getCommitActionType(operationType, 
metaClient.getTableType()));
     } catch (Exception ioe) {
       throw new HoodieException(ioe.getMessage(), ioe);
     } finally {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 5b87278..829690f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -19,7 +19,6 @@ package org.apache.hudi
 
 import java.util
 import java.util.Properties
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
@@ -33,7 +32,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, 
TypedProperties}
 import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, 
WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
-import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, 
BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
@@ -126,7 +125,7 @@ private[hudi] object HoodieSparkSqlWriter {
         tableConfig = tableMetaClient.getTableConfig
       }
 
-      val commitActionType = DataSourceUtils.getCommitActionType(operation, 
tableConfig.getTableType)
+      val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
 
       // short-circuit if bulk_insert via row is enabled.
       // scalastyle:off

Reply via email to