This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1379300  [HUDI-3483] Adding insert override nodes to integ test suite 
and few clean ups (#4895)
1379300 is described below

commit 1379300b5b4357243ab7279f9a39011296731741
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Feb 26 08:00:15 2022 -0500

    [HUDI-3483] Adding insert override nodes to integ test suite and few clean 
ups (#4895)
---
 .../config/test-suite/insert-overwrite-table.yaml  | 104 ++++++++++++++++++++
 .../demo/config/test-suite/insert-overwrite.yaml   | 106 ++++++++++++++++++++
 .../demo/config/test-suite/spark-clustering.yaml   |  73 ++++++++++++++
 .../{test.properties => test-metadata.properties}  |  31 +++---
 docker/demo/config/test-suite/test.properties      |   6 +-
 .../integ/testsuite/configuration/DeltaConfig.java |  10 ++
 .../dag/nodes/BaseValidateDatasetNode.java         | 107 ++++++++++++---------
 .../dag/nodes/DeleteInputDatasetNode.java          |  56 +++++++++++
 .../integ/testsuite/generator/DeltaGenerator.java  |   4 +
 .../testsuite/dag/nodes/SparkInsertNode.scala      |   7 +-
 .../dag/nodes/SparkInsertOverwriteNode.scala       |  31 ++++++
 .../dag/nodes/SparkInsertOverwriteTableNode.scala  |  30 ++++++
 .../testsuite/dag/nodes/SparkUpsertNode.scala      |  40 +-------
 13 files changed, 504 insertions(+), 101 deletions(-)

diff --git a/docker/demo/config/test-suite/insert-overwrite-table.yaml 
b/docker/demo/config/test-suite/insert-overwrite-table.yaml
new file mode 100644
index 0000000..8b5a26e
--- /dev/null
+++ b/docker/demo/config/test-suite/insert-overwrite-table.yaml
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  first_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: none
+  first_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: first_insert
+  second_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: first_upsert
+  second_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: second_insert
+  first_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: second_upsert
+  first_insert_overwrite_table:
+    config:
+      record_size: 1000
+      repeat_count: 10
+      num_records_insert: 10
+    type: SparkInsertOverwriteTableNode
+    deps: first_hive_sync
+  delete_all_input_except_last:
+    config:
+      delete_input_data_except_latest: true
+    type: DeleteInputDatasetNode
+    deps: first_insert_overwrite_table
+  third_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: delete_all_input_except_last
+  third_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: third_insert
+  second_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: third_upsert
+  second_validate:
+    config:
+      validate_full_data : true
+      validate_hive: false
+      delete_input_data: false
+    type: ValidateDatasetNode
+    deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/insert-overwrite.yaml 
b/docker/demo/config/test-suite/insert-overwrite.yaml
new file mode 100644
index 0000000..f2299c5
--- /dev/null
+++ b/docker/demo/config/test-suite/insert-overwrite.yaml
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+
+  first_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: none
+  first_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: first_insert
+  second_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: first_upsert
+  second_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: second_insert
+  first_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: second_upsert
+  first_insert_overwrite:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10
+    type: SparkInsertOverwriteNode
+    deps: first_hive_sync
+  delete_all_input_except_last:
+    config:
+      delete_input_data_except_latest: true
+    type: DeleteInputDatasetNode
+    deps: first_insert_overwrite
+  third_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: delete_all_input_except_last
+  third_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: third_insert
+  second_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: third_upsert
+  second_validate:
+    config:
+      validate_full_data : true
+      validate_hive: false
+      delete_input_data: false
+    type: ValidateDatasetNode
+    deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/spark-clustering.yaml 
b/docker/demo/config/test-suite/spark-clustering.yaml
new file mode 100644
index 0000000..e8e722c
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-clustering.yaml
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: cow-spark-simple.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  first_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: none
+  first_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: first_insert
+  second_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: first_upsert
+  second_upsert:
+      config:
+        record_size: 1000
+        num_partitions_insert: 10
+        num_records_insert: 1000
+        repeat_count: 1
+        num_records_upsert: 8000
+        num_partitions_upsert: 10
+      type: SparkUpsertNode
+      deps: second_insert
+  first_delete:
+    config:
+      num_partitions_delete: 10
+      num_records_delete: 16000
+    type: SparkDeleteNode
+    deps: second_upsert
+  second_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: first_delete
+  second_validate:
+    config:
+      validate_hive: false
+      delete_input_data: false
+    type: ValidateDatasetNode
+    deps: second_hive_sync
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/test.properties 
b/docker/demo/config/test-suite/test-metadata.properties
similarity index 69%
copy from docker/demo/config/test-suite/test.properties
copy to docker/demo/config/test-suite/test-metadata.properties
index 30cd1c1..48da77c 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test-metadata.properties
@@ -1,24 +1,29 @@
 
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
 #
-#      http://www.apache.org/licenses/LICENSE-2.0
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
 
 hoodie.insert.shuffle.parallelism=100
 hoodie.upsert.shuffle.parallelism=100
 hoodie.bulkinsert.shuffle.parallelism=100
 
+hoodie.metadata.enable=true
+
 hoodie.deltastreamer.source.test.num_partitions=100
 
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
 hoodie.deltastreamer.source.test.max_unique_records=100000000
diff --git a/docker/demo/config/test-suite/test.properties 
b/docker/demo/config/test-suite/test.properties
index 30cd1c1..509b9f4 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test.properties
@@ -19,6 +19,8 @@ hoodie.insert.shuffle.parallelism=100
 hoodie.upsert.shuffle.parallelism=100
 hoodie.bulkinsert.shuffle.parallelism=100
 
+hoodie.metadata.enable=false
+
 hoodie.deltastreamer.source.test.num_partitions=100
 
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
 hoodie.deltastreamer.source.test.max_unique_records=100000000
@@ -32,10 +34,6 @@ hoodie.datasource.write.recordkey.field=_row_key
 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
 hoodie.datasource.write.partitionpath.field=timestamp
 
-hoodie.clustering.plan.strategy.sort.columns=_row_key
-hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
-hoodie.clustering.inline.max.commits=1
-
 
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
 
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
 
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 56aa390..2c39f5f 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -96,6 +96,8 @@ public class DeltaConfig implements Serializable {
     private static String NUM_ROLLBACKS = "num_rollbacks";
     private static String ENABLE_ROW_WRITING = "enable_row_writing";
     private static String ENABLE_METADATA_VALIDATE = 
"enable_metadata_validate";
+    private static String VALIDATE_FULL_DATA = "validate_full_data";
+    private static String DELETE_INPUT_DATA_EXCEPT_LATEST = 
"delete_input_data_except_latest";
 
     // Spark SQL Create Table
     private static String TABLE_TYPE = "table_type";
@@ -206,10 +208,18 @@ public class DeltaConfig implements Serializable {
       return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, 
false).toString());
     }
 
+    public boolean isDeleteInputDataExceptLatest() {
+      return 
Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA_EXCEPT_LATEST, 
false).toString());
+    }
+
     public boolean isValidateHive() {
       return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, 
false).toString());
     }
 
+    public boolean isValidateFullData() {
+      return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, 
false).toString());
+    }
+
     public int getIterationCountToExecute() {
       return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, 
-1).toString());
     }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
index 986e973..09d44d9 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
@@ -19,16 +19,14 @@
 
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.ReduceFunction;
 import org.apache.spark.sql.Dataset;
@@ -42,13 +40,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 
-import java.util.List;
-import java.util.stream.Collectors;
-
 /**
  * This nodes validates contents from input path are in tact with Hudi. By 
default no configs are required for this node. But there is an
  * optional config "delete_input_data" that you can set for this node. If set, 
once validation completes, contents from inputPath are deleted. This will come 
in handy for long running test suites.
@@ -78,6 +76,7 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
   public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
 
     SparkSession session = 
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
+
     // todo: Fix partitioning schemes. For now, assumes data based 
partitioning.
     String inputPath = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
     log.warn("Validation using data from input path " + inputPath);
@@ -97,46 +96,60 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
 
     // read from hudi and remove meta columns.
     Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, 
inputSnapshotDf.schema());
-    Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
-    long inputCount = inputSnapshotDf.count();
-    long outputCount = trimmedHudiDf.count();
-    log.debug("Input count: " + inputCount + "; output count: " + outputCount);
-    // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-    if (outputCount == 0 || inputCount == 0 || 
inputSnapshotDf.except(intersectionDf).count() != 0) {
-      log.error("Data set validation failed. Total count in hudi " + 
outputCount + ", input df count " + inputCount);
-      throw new AssertionError("Hudi contents does not match contents input 
data. ");
-    }
-
-    if (config.isValidateHive()) {
-      String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
-      String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
-      log.warn("Validating hive table with db : " + database + " and table : " 
+ tableName);
-      session.sql("REFRESH TABLE " + database + "." + tableName);
-      Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, 
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
-          "test_suite_source_ordering_field FROM " + database + "." + 
tableName);
-      Dataset<Row> reorderedInputDf = 
inputSnapshotDf.select("_row_key","rider","driver","begin_lat","begin_lon","end_lat","end_lon","fare",
-          "_hoodie_is_deleted","test_suite_source_ordering_field");
-
-      Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
-      outputCount = trimmedHudiDf.count();
-      log.warn("Input count: " + inputCount + "; output count: " + 
outputCount);
+    if (config.isValidateFullData()) {
+      log.debug("Validating full dataset");
+      Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
+      Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
+      long exceptInputCount = exceptInputDf.count();
+      long exceptHudiCount = exceptHudiDf.count();
+      log.debug("Except input df count " + exceptInputDf + ", except hudi 
count " + exceptHudiCount);
+      if (exceptInputCount != 0 || exceptHudiCount != 0) {
+        log.error("Data set validation failed. Total count in hudi " + 
trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
+            + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df 
except Input df " + exceptHudiCount);
+        throw new AssertionError("Hudi contents does not match contents input 
data. ");
+      }
+    } else {
+      Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
+      long inputCount = inputSnapshotDf.count();
+      long outputCount = trimmedHudiDf.count();
+      log.debug("Input count: " + inputCount + "; output count: " + 
outputCount);
       // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-      if (outputCount == 0 || 
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
-        log.error("Data set validation failed for COW hive table. Total count 
in hudi " + outputCount + ", input df count " + inputCount);
-        throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
+      if (outputCount == 0 || inputCount == 0 || 
inputSnapshotDf.except(intersectionDf).count() != 0) {
+        log.error("Data set validation failed. Total count in hudi " + 
outputCount + ", input df count " + inputCount);
+        throw new AssertionError("Hudi contents does not match contents input 
data. ");
       }
-    }
 
-    // if delete input data is enabled, erase input data.
-    if (config.isDeleteInputData()) {
-      // clean up input data for current group of writes.
-      inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-      FileSystem fs = new Path(inputPathStr)
-          
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
-      FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
-      for (FileStatus fileStatus : fileStatuses) {
-        log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
-        fs.delete(fileStatus.getPath(), true);
+      if (config.isValidateHive()) {
+        String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
+        String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
+        log.warn("Validating hive table with db : " + database + " and table : 
" + tableName);
+        session.sql("REFRESH TABLE " + database + "." + tableName);
+        Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, 
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
+            "test_suite_source_ordering_field FROM " + database + "." + 
tableName);
+        Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", 
"rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
+            "_hoodie_is_deleted", "test_suite_source_ordering_field");
+
+        Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
+        outputCount = trimmedHudiDf.count();
+        log.warn("Input count: " + inputCount + "; output count: " + 
outputCount);
+        // the intersected df should be same as inputDf. if not, there is some 
mismatch.
+        if (outputCount == 0 || 
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
+          log.error("Data set validation failed for COW hive table. Total 
count in hudi " + outputCount + ", input df count " + inputCount);
+          throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
+        }
+      }
+
+      // if delete input data is enabled, erase input data.
+      if (config.isDeleteInputData()) {
+        // clean up input data for current group of writes.
+        inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+        FileSystem fs = new Path(inputPathStr)
+            
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+        FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+        for (FileStatus fileStatus : fileStatuses) {
+          log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
+          fs.delete(fileStatus.getPath(), true);
+        }
       }
     }
   }
@@ -149,8 +162,8 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
     Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
     ExpressionEncoder encoder = getEncoder(inputDf.schema());
     return inputDf.groupByKey(
-        (MapFunction<Row, String>) value ->
-            value.getAs(partitionPathField) + "+" + 
value.getAs(recordKeyField), Encoders.STRING())
+            (MapFunction<Row, String>) value ->
+                value.getAs(partitionPathField) + "+" + 
value.getAs(recordKeyField), Encoders.STRING())
         .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
           int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
           int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java
new file mode 100644
index 0000000..2836f24
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Deletes all input except latest batch. Mostly used in insert_overwrite 
operations.
+ */
+public class DeleteInputDatasetNode extends DagNode<Boolean> {
+
+  public DeleteInputDatasetNode(DeltaConfig.Config config) {
+    this.config = config;
+  }
+
+  @Override
+  public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
+
+    String latestBatch = 
String.valueOf(context.getWriterContext().getDeltaGenerator().getBatchId());
+
+    if (config.isDeleteInputDataExceptLatest()) {
+      String inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+      FileSystem fs = new Path(inputPathStr)
+          
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+      FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+      for (FileStatus fileStatus : fileStatuses) {
+        if (!fileStatus.getPath().getName().equals(latestBatch)) {
+          log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
+          fs.delete(fileStatus.getPath(), true);
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 6d5bc4f..69e32df 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -110,6 +110,10 @@ public class DeltaGenerator implements Serializable {
     return ws;
   }
 
+  public int getBatchId() {
+    return batchId;
+  }
+
   public JavaRDD<GenericRecord> generateInserts(Config operation) {
     int numPartitions = operation.getNumInsertPartitions();
     long recordsPerPartition = operation.getNumRecordsInsert();
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
index db17a6e..b8c46ca 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
@@ -54,13 +54,18 @@ class SparkInsertNode(dagNodeConfig: Config) extends 
DagNode[RDD[WriteStatus]] {
       context.getWriterContext.getSparkSession)
     inputDF.write.format("hudi")
       
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
+      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), 
"test_suite_source_ordering_field")
       .option(DataSourceWriteOptions.TABLE_NAME.key, 
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
context.getHoodieTestSuiteWriter.getCfg.tableType)
-      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.OPERATION.key, getOperation())
       .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, 
"deltastreamer.checkpoint.key")
       .option("deltastreamer.checkpoint.key", 
context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
       .option(HoodieWriteConfig.TBL_NAME.key, 
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
       .mode(SaveMode.Append)
       .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
   }
+
+  def getOperation(): String = {
+    DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
+  }
 }
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala
new file mode 100644
index 0000000..6dd2eac
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+
+class SparkInsertOverwriteNode(dagNodeConfig: Config) extends 
SparkInsertNode(dagNodeConfig) {
+
+  override def getOperation(): String = {
+    DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL
+  }
+
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala
new file mode 100644
index 0000000..a6b80b3
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+
+class SparkInsertOverwriteTableNode(dagNodeConfig: Config) extends 
SparkInsertNode(dagNodeConfig) {
+
+  override def getOperation(): String = {
+    DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
index 858827a..113de93 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
@@ -18,49 +18,17 @@
 
 package org.apache.hudi.integ.testsuite.dag.nodes
 
-import org.apache.hudi.client.WriteStatus
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
-import org.apache.hudi.integ.testsuite.dag.ExecutionContext
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SaveMode
-
-import scala.collection.JavaConverters._
 
 /**
  * Spark datasource based upsert node
  *
  * @param dagNodeConfig DAG node configurations.
  */
-class SparkUpsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] 
{
-
-  config = dagNodeConfig
+class SparkUpsertNode(dagNodeConfig: Config) extends 
SparkInsertNode(dagNodeConfig) {
 
-  /**
-   * Execute the {@link DagNode}.
-   *
-   * @param context     The context needed for an execution of a node.
-   * @param curItrCount iteration count for executing the node.
-   * @throws Exception Thrown if the execution failed.
-   */
-  override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
-    if (!config.isDisableGenerate) {
-      println("Generating input data for node {}", this.getName)
-      
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
-    }
-    val inputDF = 
AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
-      context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
-      context.getWriterContext.getSparkSession)
-    inputDF.write.format("hudi")
-      
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
-      .option(DataSourceWriteOptions.TABLE_NAME.key, 
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, 
context.getHoodieTestSuiteWriter.getCfg.tableType)
-      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, 
"deltastreamer.checkpoint.key")
-      .option("deltastreamer.checkpoint.key", 
context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
-      .option(HoodieWriteConfig.TBL_NAME.key, 
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
-      .mode(SaveMode.Append)
-      .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
+  override def getOperation(): String = {
+    DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL
   }
 }

Reply via email to