This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f0d5c0  [HUDI-1349] spark sql support overwrite use 
insert_overwrite_table (#2196)
1f0d5c0 is described below

commit 1f0d5c077ea34ea91c6577cf2391291d06917378
Author: lw0090 <[email protected]>
AuthorDate: Fri Dec 4 04:26:21 2020 +0800

    [HUDI-1349] spark sql support overwrite use insert_overwrite_table (#2196)
---
 .../java/org/apache/hudi/table/HoodieTable.java    | 11 +++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  5 ++
 .../apache/hudi/client/SparkRDDWriteClient.java    | 17 +++++
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  6 ++
 .../SparkInsertOverwriteCommitActionExecutor.java  |  9 ++-
 ...rkInsertOverwriteTableCommitActionExecutor.java | 74 ++++++++++++++++++++
 .../hudi/common/model/WriteOperationType.java      |  8 ++-
 .../common/testutils/HoodieTestDataGenerator.java  |  4 ++
 .../utils/HoodieRealtimeInputFormatUtils.java      |  2 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  4 +-
 .../org/apache/hudi/HoodieDataSourceHelpers.java   |  3 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  1 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 11 +--
 .../apache/hudi/functional/TestCOWDataSource.scala | 79 ++++++++++++++++++++++
 14 files changed, 224 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 0413f75..a33ad99 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -205,6 +205,17 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
    */
   public abstract HoodieWriteMetadata<O> insertOverwrite(HoodieEngineContext 
context, String instantTime, I records);
 
+  /**
+   * Delete all the existing records of the Hoodie table and inserts the 
specified new records into Hoodie table at the supplied instantTime,
+   * for the partition paths contained in input records.
+   *
+   * @param context HoodieEngineContext
+   * @param instantTime Instant time for the replace action
+   * @param records input records
+   * @return HoodieWriteMetadata
+   */
+  public abstract HoodieWriteMetadata<O> 
insertOverwriteTable(HoodieEngineContext context, String instantTime, I 
records);
+
   public HoodieWriteConfig getConfig() {
     return config;
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index acb010c..b8ae370 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -107,6 +107,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
insertOverwriteTable(HoodieEngineContext context, String instantTime, 
List<HoodieRecord<T>> records) {
+    throw new HoodieNotSupportedException("insertOverwriteTable is not 
supported yet");
+  }
+
+  @Override
   public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
     throw new HoodieNotSupportedException("Compaction is not supported on a 
CopyOnWrite table");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 56f0689..10a55df 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -191,6 +191,23 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
     return new HoodieWriteResult(postWrite(result, instantTime, table), 
result.getPartitionToReplaceFileIds());
   }
 
+
+  /**
+   * Removes all existing records of the Hoodie table and inserts the given 
HoodieRecords, into the table.
+
+   * @param records HoodieRecords to insert
+   * @param instantTime Instant time of the commit
+   * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
+   */
+  public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> 
records, final String instantTime) {
+    HoodieTable table = 
getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
+    table.validateInsertSchema();
+    setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
+    this.asyncCleanerService = 
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+    HoodieWriteMetadata result = table.insertOverwriteTable(context, 
instantTime, records);
+    return new HoodieWriteResult(postWrite(result, instantTime, table), 
result.getPartitionToReplaceFileIds());
+  }
+
   @Override
   public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, 
String instantTime) {
     return bulkInsert(records, instantTime, Option.empty());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index f2b3364..99a8f1f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -45,6 +45,7 @@ import 
org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import 
org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
 import org.apache.hudi.table.action.clean.SparkCleanActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
+import 
org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
@@ -130,6 +131,11 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> 
insertOverwriteTable(HoodieEngineContext context, String instantTime, 
JavaRDD<HoodieRecord<T>> records) {
+    return new SparkInsertOverwriteTableCommitActionExecutor(context, config, 
this, instantTime, records).execute();
+  }
+
+  @Override
   public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
     throw new HoodieNotSupportedException("Compaction is not supported on a 
CopyOnWrite table");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 2771a22..1e38220 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -44,7 +44,14 @@ public class SparkInsertOverwriteCommitActionExecutor<T 
extends HoodieRecordPayl
   public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
                                                   HoodieWriteConfig config, 
HoodieTable table,
                                                   String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
-    super(context, config, table, instantTime, 
WriteOperationType.INSERT_OVERWRITE);
+    this(context, config, table, instantTime, inputRecordsRDD, 
WriteOperationType.INSERT_OVERWRITE);
+  }
+
+  public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+                                                  HoodieWriteConfig config, 
HoodieTable table,
+                                                  String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+                                                  WriteOperationType 
writeOperationType) {
+    super(context, config, table, instantTime, writeOperationType);
     this.inputRecordsRDD = inputRecordsRDD;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
new file mode 100644
index 0000000..e349657
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkInsertOverwriteTableCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends SparkInsertOverwriteCommitActionExecutor<T> {
+
+  public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext 
context,
+                                                       HoodieWriteConfig 
config, HoodieTable table,
+                                                       String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    super(context, config, table, instantTime, inputRecordsRDD, 
WriteOperationType.INSERT_OVERWRITE_TABLE);
+  }
+
+  protected List<String> getAllExistingFileIds(String partitionPath) {
+    return table.getSliceView().getLatestFileSlices(partitionPath)
+        .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
+  }
+
+  @Override
+  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
+    Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+    try {
+      List<String> partitionPaths = 
FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
+          table.getMetaClient().getBasePath(), false);
+      JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+      if (partitionPaths != null && partitionPaths.size() > 0) {
+        context.setJobStatus(this.getClass().getSimpleName(), "Getting 
ExistingFileIds of all partitions");
+        JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, 
partitionPaths.size());
+        partitionToExistingFileIds = partitionPathRdd.mapToPair(
+            partitionPath -> new Tuple2<>(partitionPath, 
getAllExistingFileIds(partitionPath))).collectAsMap();
+      }
+    } catch (IOException e) {
+      throw new HoodieCommitException("In InsertOverwriteTable action failed 
to get existing fileIds of all partition "
+          + config.getBasePath() + " at time " + instantTime, e);
+    }
+    return partitionToExistingFileIds;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index f6386b9..5f328a9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -38,10 +38,12 @@ public enum WriteOperationType {
   // delete
   DELETE("delete"),
   BOOTSTRAP("bootstrap"),
-  // insert overwrite
+  // insert overwrite with static partitioning
   INSERT_OVERWRITE("insert_overwrite"),
   // cluster
   CLUSTER("cluster"),
+  // insert overwrite with dynamic partitioning
+  INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
   // used for old version
   UNKNOWN("unknown");
 
@@ -72,6 +74,8 @@ public enum WriteOperationType {
         return DELETE;
       case "insert_overwrite":
         return INSERT_OVERWRITE;
+      case "insert_overwrite_table":
+        return INSERT_OVERWRITE_TABLE;
       default:
         throw new HoodieException("Invalid value of Type.");
     }
@@ -88,4 +92,4 @@ public enum WriteOperationType {
   public static boolean isChangingRecords(WriteOperationType operationType) {
     return operationType == UPSERT || operationType == UPSERT_PREPPED || 
operationType == DELETE;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index bf97280..17e93fe 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -451,6 +451,10 @@ public class HoodieTestDataGenerator {
     return generateInsertsStream(instantTime,  n, false, TRIP_EXAMPLE_SCHEMA, 
true).collect(Collectors.toList());
   }
 
+  public List<HoodieRecord> generateInsertsForPartition(String instantTime, 
Integer n, String partition) {
+    return generateInsertsStream(instantTime,  n, false, TRIP_EXAMPLE_SCHEMA, 
false, () -> partition, () -> 
UUID.randomUUID().toString()).collect(Collectors.toList());
+  }
+
   public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer 
n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
     return generateInsertsStream(commitTime, n, isFlattened, schemaStr, 
containsAllPartitions,
         () -> partitionPaths[RAND.nextInt(partitionPaths.length)],
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 760dd96..c8a0d7f 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -86,7 +86,7 @@ public class HoodieRealtimeInputFormatUtils extends 
HoodieInputFormatUtils {
             .collect(Collectors.groupingBy(split -> 
FSUtils.getFileId(split.getPath().getName())));
         // Get the maxCommit from the last delta or compaction or commit - 
when bootstrapped from COW table
         String maxCommitTime = 
metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
-                HoodieTimeline.ROLLBACK_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
+                HoodieTimeline.ROLLBACK_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
             .filterCompletedInstants().lastInstant().get().getTimestamp();
         latestFileSlices.forEach(fileSlice -> {
           List<FileSplit> dataFileSplits = 
groupedInputSplits.get(fileSlice.getFileId());
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java 
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 37572c3..1cb63c9 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -191,7 +191,7 @@ public class DataSourceUtils {
   }
 
   public static String getCommitActionType(WriteOperationType operation, 
HoodieTableType tableType) {
-    if (operation == WriteOperationType.INSERT_OVERWRITE) {
+    if (operation == WriteOperationType.INSERT_OVERWRITE || operation == 
WriteOperationType.INSERT_OVERWRITE_TABLE) {
       return HoodieTimeline.REPLACE_COMMIT_ACTION;
     } else {
       return CommitUtils.getCommitActionType(tableType);
@@ -211,6 +211,8 @@ public class DataSourceUtils {
         return new HoodieWriteResult(client.upsert(hoodieRecords, 
instantTime));
       case INSERT_OVERWRITE:
         return client.insertOverwrite(hoodieRecords, instantTime);
+      case INSERT_OVERWRITE_TABLE:
+        return client.insertOverwriteTable(hoodieRecords, instantTime);
       default:
         throw new HoodieException("Not a valid operation type for 
doWriteOperation: " + operation.toString());
     }
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java 
b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index 1467cf6..734e0c0 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -74,7 +74,8 @@ public class HoodieDataSourceHelpers {
     if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
       return metaClient.getActiveTimeline().getTimelineOfActions(
           CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
-              
HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants();
+              HoodieActiveTimeline.DELTA_COMMIT_ACTION,
+              
HoodieActiveTimeline.REPLACE_COMMIT_ACTION)).filterCompletedInstants();
     } else {
       return metaClient.getCommitTimeline().filterCompletedInstants();
     }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 73f70e7..9ff6b2f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -157,6 +157,7 @@ object DataSourceWriteOptions {
   val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
   val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
   val INSERT_OVERWRITE_OPERATION_OPT_VAL = 
WriteOperationType.INSERT_OVERWRITE.value
+  val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL = 
WriteOperationType.INSERT_OVERWRITE_TABLE.value
   val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
 
   /**
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fe0f3f5..e109501 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -93,6 +93,13 @@ private[hudi] object HoodieSparkSqlWriter {
       operation = WriteOperationType.INSERT
     }
 
+    // If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
+    // Then in DataSourceUtils.doWriteOperation will use 
client.insertOverwriteTable to overwrite
+    // the table. This will replace the old fs.delete(tablepath) mode.
+    if (mode == SaveMode.Overwrite && operation !=  
WriteOperationType.INSERT_OVERWRITE_TABLE) {
+      operation = WriteOperationType.INSERT_OVERWRITE_TABLE
+    }
+
     val jsc = new JavaSparkContext(sparkContext)
     val basePath = new Path(path.get)
     val instantTime = HoodieActiveTimeline.createNewInstantTime()
@@ -319,10 +326,6 @@ private[hudi] object HoodieSparkSqlWriter {
     if (operation != WriteOperationType.DELETE) {
       if (mode == SaveMode.ErrorIfExists && tableExists) {
         throw new HoodieException(s"hoodie table at $tablePath already 
exists.")
-      } else if (mode == SaveMode.Overwrite && tableExists) {
-        log.warn(s"hoodie table at $tablePath already exists. Deleting 
existing data & overwriting with new data.")
-        fs.delete(tablePath, true)
-        tableExists = false
       }
     } else {
       // Delete Operation only supports Append mode
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 5b746de..bf71e68 100644
--- 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,7 +18,13 @@
 package org.apache.hudi.functional
 
 import java.sql.{Date, Timestamp}
+import java.util.function.Supplier
+import java.util.stream.Stream
 
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.HoodieClientTestBase
@@ -156,6 +162,79 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be 
pulled
   }
 
+  @Test def testOverWriteModeUseReplaceAction(): Unit = {
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val commits =  
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+      .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
+    assertEquals(2, commits.size)
+    assertEquals("commit", commits(0))
+    assertEquals("replacecommit", commits(1))
+  }
+
+  @Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
+    // step1: Write 5 records to hoodie table for partition1 
DEFAULT_FIRST_PARTITION_PATH
+    val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // step2: Write 7 more rectestOverWriteModeUseReplaceActionords using 
SaveMode.Overwrite for partition2 DEFAULT_SECOND_PARTITION_PATH
+    val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 
7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val allRecords =  spark.read.format("org.apache.hudi").load(basePath + 
"/*/*/*")
+    allRecords.registerTempTable("tmpTable")
+
+    spark.sql(String.format("select count(*) from tmpTable")).show()
+
+    // step3: Query the rows count from hoodie table for partition1 
DEFAULT_FIRST_PARTITION_PATH
+    val recordCountForParititon1 =  spark.sql(String.format("select count(*) 
from tmpTable where partition = '%s'", 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
+    assertEquals("0", recordCountForParititon1(0).get(0).toString)
+
+    // step4: Query the rows count from hoodie table  for partition1 
DEFAULT_SECOND_PARTITION_PATH
+    val recordCountForParititon2 = spark.sql(String.format("select count(*) 
from tmpTable where partition = '%s'", 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
+    assertEquals("7", recordCountForParititon2(0).get(0).toString)
+
+    // step5: Query the rows count from hoodie table
+    val recordCount = spark.sql(String.format("select count(*) from 
tmpTable")).collect()
+    assertEquals("7", recordCountForParititon2(0).get(0).toString)
+
+    // step6: Query the rows count from hoodie table  for partition1 
DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode
+    val recordsForPartitionColumn = spark.sql(String.format("select partition 
from tmpTable")).collect()
+    val filterSecondPartitionCount =  recordsForPartitionColumn.filter(row => 
row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
+    assertEquals(7,filterSecondPartitionCount)
+
+    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val commits =  
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+      .map(instant => instant.asInstanceOf[HoodieInstant].getAction)
+    assertEquals(2, commits.size)
+    assertEquals("commit", commits(0))
+    assertEquals("replacecommit", commits(1))
+  }
+
   @Test def testDropInsertDup(): Unit = {
     val insert1Cnt = 10
     val insert2DupKeyCnt = 9

Reply via email to