This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1f0d5c0 [HUDI-1349] spark sql support overwrite use
insert_overwrite_table (#2196)
1f0d5c0 is described below
commit 1f0d5c077ea34ea91c6577cf2391291d06917378
Author: lw0090 <[email protected]>
AuthorDate: Fri Dec 4 04:26:21 2020 +0800
[HUDI-1349] spark sql support overwrite use insert_overwrite_table (#2196)
---
.../java/org/apache/hudi/table/HoodieTable.java | 11 +++
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 5 ++
.../apache/hudi/client/SparkRDDWriteClient.java | 17 +++++
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 6 ++
.../SparkInsertOverwriteCommitActionExecutor.java | 9 ++-
...rkInsertOverwriteTableCommitActionExecutor.java | 74 ++++++++++++++++++++
.../hudi/common/model/WriteOperationType.java | 8 ++-
.../common/testutils/HoodieTestDataGenerator.java | 4 ++
.../utils/HoodieRealtimeInputFormatUtils.java | 2 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 4 +-
.../org/apache/hudi/HoodieDataSourceHelpers.java | 3 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 1 +
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 11 +--
.../apache/hudi/functional/TestCOWDataSource.scala | 79 ++++++++++++++++++++++
14 files changed, 224 insertions(+), 10 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 0413f75..a33ad99 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -205,6 +205,17 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
*/
public abstract HoodieWriteMetadata<O> insertOverwrite(HoodieEngineContext
context, String instantTime, I records);
+ /**
+ * Delete all the existing records of the Hoodie table and inserts the
specified new records into Hoodie table at the supplied instantTime,
+ * for the partition paths contained in input records.
+ *
+ * @param context HoodieEngineContext
+ * @param instantTime Instant time for the replace action
+ * @param records input records
+ * @return HoodieWriteMetadata
+ */
+ public abstract HoodieWriteMetadata<O>
insertOverwriteTable(HoodieEngineContext context, String instantTime, I
records);
+
public HoodieWriteConfig getConfig() {
return config;
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index acb010c..b8ae370 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -107,6 +107,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
@Override
+ public HoodieWriteMetadata<List<WriteStatus>>
insertOverwriteTable(HoodieEngineContext context, String instantTime,
List<HoodieRecord<T>> records) {
+ throw new HoodieNotSupportedException("insertOverwriteTable is not
supported yet");
+ }
+
+ @Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a
CopyOnWrite table");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 56f0689..10a55df 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -191,6 +191,23 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
return new HoodieWriteResult(postWrite(result, instantTime, table),
result.getPartitionToReplaceFileIds());
}
+
+ /**
+ * Removes all existing records of the Hoodie table and inserts the given
HoodieRecords, into the table.
+
+ * @param records HoodieRecords to insert
+ * @param instantTime Instant time of the commit
+ * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and
counts
+ */
+ public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>>
records, final String instantTime) {
+ HoodieTable table =
getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
+ table.validateInsertSchema();
+ setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
+ this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ HoodieWriteMetadata result = table.insertOverwriteTable(context,
instantTime, records);
+ return new HoodieWriteResult(postWrite(result, instantTime, table),
result.getPartitionToReplaceFileIds());
+ }
+
@Override
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records,
String instantTime) {
return bulkInsert(records, instantTime, Option.empty());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index f2b3364..99a8f1f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -45,6 +45,7 @@ import
org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import
org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
import org.apache.hudi.table.action.clean.SparkCleanActionExecutor;
import
org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
+import
org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
@@ -130,6 +131,11 @@ public class HoodieSparkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
@Override
+ public HoodieWriteMetadata<JavaRDD<WriteStatus>>
insertOverwriteTable(HoodieEngineContext context, String instantTime,
JavaRDD<HoodieRecord<T>> records) {
+ return new SparkInsertOverwriteTableCommitActionExecutor(context, config,
this, instantTime, records).execute();
+ }
+
+ @Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a
CopyOnWrite table");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 2771a22..1e38220 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -44,7 +44,14 @@ public class SparkInsertOverwriteCommitActionExecutor<T
extends HoodieRecordPayl
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
- super(context, config, table, instantTime,
WriteOperationType.INSERT_OVERWRITE);
+ this(context, config, table, instantTime, inputRecordsRDD,
WriteOperationType.INSERT_OVERWRITE);
+ }
+
+ public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+ WriteOperationType
writeOperationType) {
+ super(context, config, table, instantTime, writeOperationType);
this.inputRecordsRDD = inputRecordsRDD;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
new file mode 100644
index 0000000..e349657
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class SparkInsertOverwriteTableCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends SparkInsertOverwriteCommitActionExecutor<T> {
+
+ public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext
context,
+ HoodieWriteConfig
config, HoodieTable table,
+ String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(context, config, table, instantTime, inputRecordsRDD,
WriteOperationType.INSERT_OVERWRITE_TABLE);
+ }
+
+ protected List<String> getAllExistingFileIds(String partitionPath) {
+ return table.getSliceView().getLatestFileSlices(partitionPath)
+ .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
+ }
+
+ @Override
+ protected Map<String, List<String>>
getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
+ Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+ try {
+ List<String> partitionPaths =
FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
+ table.getMetaClient().getBasePath(), false);
+ JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+ if (partitionPaths != null && partitionPaths.size() > 0) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Getting
ExistingFileIds of all partitions");
+ JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths,
partitionPaths.size());
+ partitionToExistingFileIds = partitionPathRdd.mapToPair(
+ partitionPath -> new Tuple2<>(partitionPath,
getAllExistingFileIds(partitionPath))).collectAsMap();
+ }
+ } catch (IOException e) {
+ throw new HoodieCommitException("In InsertOverwriteTable action failed
to get existing fileIds of all partition "
+ + config.getBasePath() + " at time " + instantTime, e);
+ }
+ return partitionToExistingFileIds;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index f6386b9..5f328a9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -38,10 +38,12 @@ public enum WriteOperationType {
// delete
DELETE("delete"),
BOOTSTRAP("bootstrap"),
- // insert overwrite
+ // insert overwrite with static partitioning
INSERT_OVERWRITE("insert_overwrite"),
// cluster
CLUSTER("cluster"),
+ // insert overwrite with dynamic partitioning
+ INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// used for old version
UNKNOWN("unknown");
@@ -72,6 +74,8 @@ public enum WriteOperationType {
return DELETE;
case "insert_overwrite":
return INSERT_OVERWRITE;
+ case "insert_overwrite_table":
+ return INSERT_OVERWRITE_TABLE;
default:
throw new HoodieException("Invalid value of Type.");
}
@@ -88,4 +92,4 @@ public enum WriteOperationType {
public static boolean isChangingRecords(WriteOperationType operationType) {
return operationType == UPSERT || operationType == UPSERT_PREPPED ||
operationType == DELETE;
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index bf97280..17e93fe 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -451,6 +451,10 @@ public class HoodieTestDataGenerator {
return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA,
true).collect(Collectors.toList());
}
+ public List<HoodieRecord> generateInsertsForPartition(String instantTime,
Integer n, String partition) {
+ return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA,
false, () -> partition, () ->
UUID.randomUUID().toString()).collect(Collectors.toList());
+ }
+
public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer
n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
return generateInsertsStream(commitTime, n, isFlattened, schemaStr,
containsAllPartitions,
() -> partitionPaths[RAND.nextInt(partitionPaths.length)],
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 760dd96..c8a0d7f 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -86,7 +86,7 @@ public class HoodieRealtimeInputFormatUtils extends
HoodieInputFormatUtils {
.collect(Collectors.groupingBy(split ->
FSUtils.getFileId(split.getPath().getName())));
// Get the maxCommit from the last delta or compaction or commit -
when bootstrapped from COW table
String maxCommitTime =
metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
- HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
+ HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
latestFileSlices.forEach(fileSlice -> {
List<FileSplit> dataFileSplits =
groupedInputSplits.get(fileSlice.getFileId());
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 37572c3..1cb63c9 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -191,7 +191,7 @@ public class DataSourceUtils {
}
public static String getCommitActionType(WriteOperationType operation,
HoodieTableType tableType) {
- if (operation == WriteOperationType.INSERT_OVERWRITE) {
+ if (operation == WriteOperationType.INSERT_OVERWRITE || operation ==
WriteOperationType.INSERT_OVERWRITE_TABLE) {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
} else {
return CommitUtils.getCommitActionType(tableType);
@@ -211,6 +211,8 @@ public class DataSourceUtils {
return new HoodieWriteResult(client.upsert(hoodieRecords,
instantTime));
case INSERT_OVERWRITE:
return client.insertOverwrite(hoodieRecords, instantTime);
+ case INSERT_OVERWRITE_TABLE:
+ return client.insertOverwriteTable(hoodieRecords, instantTime);
default:
throw new HoodieException("Not a valid operation type for
doWriteOperation: " + operation.toString());
}
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index 1467cf6..734e0c0 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -74,7 +74,8 @@ public class HoodieDataSourceHelpers {
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
-
HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants();
+ HoodieActiveTimeline.DELTA_COMMIT_ACTION,
+
HoodieActiveTimeline.REPLACE_COMMIT_ACTION)).filterCompletedInstants();
} else {
return metaClient.getCommitTimeline().filterCompletedInstants();
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 73f70e7..9ff6b2f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -157,6 +157,7 @@ object DataSourceWriteOptions {
val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
val INSERT_OVERWRITE_OPERATION_OPT_VAL =
WriteOperationType.INSERT_OVERWRITE.value
+ val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =
WriteOperationType.INSERT_OVERWRITE_TABLE.value
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
/**
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fe0f3f5..e109501 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -93,6 +93,13 @@ private[hudi] object HoodieSparkSqlWriter {
operation = WriteOperationType.INSERT
}
+ // If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
+ // Then in DataSourceUtils.doWriteOperation will use
client.insertOverwriteTable to overwrite
+ // the table. This will replace the old fs.delete(tablepath) mode.
+ if (mode == SaveMode.Overwrite && operation !=
WriteOperationType.INSERT_OVERWRITE_TABLE) {
+ operation = WriteOperationType.INSERT_OVERWRITE_TABLE
+ }
+
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
@@ -319,10 +326,6 @@ private[hudi] object HoodieSparkSqlWriter {
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already
exists.")
- } else if (mode == SaveMode.Overwrite && tableExists) {
- log.warn(s"hoodie table at $tablePath already exists. Deleting
existing data & overwriting with new data.")
- fs.delete(tablePath, true)
- tableExists = false
}
} else {
// Delete Operation only supports Append mode
diff --git
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 5b746de..bf71e68 100644
---
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,7 +18,13 @@
package org.apache.hudi.functional
import java.sql.{Date, Timestamp}
+import java.util.function.Supplier
+import java.util.stream.Stream
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
@@ -156,6 +162,79 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be
pulled
}
+ @Test def testOverWriteModeUseReplaceAction(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val metaClient = new
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+ val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+ .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
+ assertEquals(2, commits.size)
+ assertEquals("commit", commits(0))
+ assertEquals("replacecommit", commits(1))
+ }
+
+ @Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
+ // step1: Write 5 records to hoodie table for partition1
DEFAULT_FIRST_PARTITION_PATH
+ val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001",
5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // step2: Write 7 more rectestOverWriteModeUseReplaceActionords using
SaveMode.Overwrite for partition2 DEFAULT_SECOND_PARTITION_PATH
+ val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002",
7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val allRecords = spark.read.format("org.apache.hudi").load(basePath +
"/*/*/*")
+ allRecords.registerTempTable("tmpTable")
+
+ spark.sql(String.format("select count(*) from tmpTable")).show()
+
+ // step3: Query the rows count from hoodie table for partition1
DEFAULT_FIRST_PARTITION_PATH
+ val recordCountForParititon1 = spark.sql(String.format("select count(*)
from tmpTable where partition = '%s'",
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
+ assertEquals("0", recordCountForParititon1(0).get(0).toString)
+
+ // step4: Query the rows count from hoodie table for partition1
DEFAULT_SECOND_PARTITION_PATH
+ val recordCountForParititon2 = spark.sql(String.format("select count(*)
from tmpTable where partition = '%s'",
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
+ assertEquals("7", recordCountForParititon2(0).get(0).toString)
+
+ // step5: Query the rows count from hoodie table
+ val recordCount = spark.sql(String.format("select count(*) from
tmpTable")).collect()
+ assertEquals("7", recordCountForParititon2(0).get(0).toString)
+
+ // step6: Query the rows count from hoodie table for partition1
DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode
+ val recordsForPartitionColumn = spark.sql(String.format("select partition
from tmpTable")).collect()
+ val filterSecondPartitionCount = recordsForPartitionColumn.filter(row =>
row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
+ assertEquals(7,filterSecondPartitionCount)
+
+ val metaClient = new
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+ val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+ .map(instant => instant.asInstanceOf[HoodieInstant].getAction)
+ assertEquals(2, commits.size)
+ assertEquals("commit", commits(0))
+ assertEquals("replacecommit", commits(1))
+ }
+
@Test def testDropInsertDup(): Unit = {
val insert1Cnt = 10
val insert2DupKeyCnt = 9