This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d5f2028  Adding fixes to test suite framework. Adding clustering node 
and validate async operations node. (#2400)
d5f2028 is described below

commit d5f202821b11659693a2e934d4b8f4c99f19755e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Feb 12 12:29:21 2021 -0500

    Adding fixes to test suite framework. Adding clustering node and validate 
async operations node. (#2400)
---
 docker/demo/config/test-suite/complex-dag-cow.yaml |  30 +++--
 docker/demo/config/test-suite/complex-dag-mor.yaml |  91 +++++----------
 ...ex-dag-cow.yaml => cow-clustering-example.yaml} |  52 +++++----
 .../test-suite/cow-long-running-example.yaml       |  39 +++++--
 ...yaml => cow-long-running-multi-partitions.yaml} |  49 +++++---
 docker/demo/config/test-suite/test.properties      |   4 +
 .../hudi/client/AbstractHoodieWriteClient.java     |   3 +-
 hudi-integ-test/pom.xml                            |  11 ++
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |   1 +
 .../integ/testsuite/HoodieTestSuiteWriter.java     |  39 +++++--
 .../integ/testsuite/configuration/DeltaConfig.java |  20 ++++
 .../hudi/integ/testsuite/dag/ExecutionContext.java |   2 +-
 .../hudi/integ/testsuite/dag/nodes/CleanNode.java  |   2 +-
 .../nodes/{CleanNode.java => ClusteringNode.java}  |  22 ++--
 .../integ/testsuite/dag/nodes/CompactNode.java     |   3 +-
 .../hudi/integ/testsuite/dag/nodes/DagNode.java    |   3 +-
 .../hudi/integ/testsuite/dag/nodes/DelayNode.java  |   2 +-
 .../integ/testsuite/dag/nodes/HiveQueryNode.java   |   2 +-
 .../integ/testsuite/dag/nodes/HiveSyncNode.java    |   2 +-
 .../hudi/integ/testsuite/dag/nodes/InsertNode.java |   2 +-
 .../integ/testsuite/dag/nodes/RollbackNode.java    |   4 +-
 .../testsuite/dag/nodes/ScheduleCompactNode.java   |   4 +-
 .../testsuite/dag/nodes/SparkSQLQueryNode.java     |   3 +-
 .../dag/nodes/ValidateAsyncOperations.java         | 124 +++++++++++++++++++++
 .../testsuite/dag/nodes/ValidateDatasetNode.java   |  26 +++--
 .../integ/testsuite/dag/nodes/ValidateNode.java    |   3 +-
 .../testsuite/dag/scheduler/DagScheduler.java      |   9 +-
 .../FlexibleSchemaRecordGenerationIterator.java    |   2 +-
 .../GenericRecordFullPayloadGenerator.java         |   2 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   7 +-
 packaging/hudi-integ-test-bundle/pom.xml           |   1 +
 31 files changed, 398 insertions(+), 166 deletions(-)

diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml 
b/docker/demo/config/test-suite/complex-dag-cow.yaml
index 5fa8596..acbe287 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/complex-dag-cow.yaml
@@ -13,13 +13,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-dag_name: cow-long-running-example.yaml
-dag_rounds: 2
+dag_name: complex-dag-cow.yaml
+dag_rounds: 1
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
     deps: none
   second_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 10000
@@ -35,19 +35,26 @@ dag_content:
     type: InsertNode
   third_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 300
     deps: second_insert
     type: InsertNode
+  first_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: third_insert
   first_validate:
     config:
+      validate_hive: true
     type: ValidateDatasetNode
-    deps: third_insert
+    deps: first_hive_sync
   first_upsert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       num_records_insert: 300
       repeat_count: 1
@@ -61,8 +68,15 @@ dag_content:
       num_records_delete: 2000
     type: DeleteNode
     deps: first_upsert
+  second_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: first_delete
   second_validate:
     config:
+      validate_hive: true
       delete_input_data: true
     type: ValidateDatasetNode
-    deps: first_delete
\ No newline at end of file
+    deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/complex-dag-mor.yaml 
b/docker/demo/config/test-suite/complex-dag-mor.yaml
index 505e5e2..24f3a9c 100644
--- a/docker/demo/config/test-suite/complex-dag-mor.yaml
+++ b/docker/demo/config/test-suite/complex-dag-mor.yaml
@@ -15,105 +15,70 @@
 # limitations under the License.
 dag_name: complex-dag-mor.yaml
 dag_rounds: 1
-dag_intermittent_delay_mins: 10
+dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
     config:
-      record_size: 70000
+      record_size: 1000
       num_partitions_insert: 1
-      repeat_count: 5
+      repeat_count: 1
       num_records_insert: 100
     type: InsertNode
     deps: none
   second_insert:
     config:
-      record_size: 70000
+      record_size: 1000
       num_partitions_insert: 1
-      repeat_count: 5
-      num_records_insert: 100
+      repeat_count: 1
+      num_records_insert: 1000
     deps: first_insert
     type: InsertNode
   third_insert:
     config:
-      record_size: 70000
+      record_size: 1000
       num_partitions_insert: 1
-      repeat_count: 2
+      repeat_count: 1
       num_records_insert: 300
     deps: second_insert
     type: InsertNode
-  first_rollback:
-    config:
-    deps: third_insert
-    type: RollbackNode
-  first_upsert:
-    config:
-      record_size: 70000
-      num_partitions_insert: 1
-      num_records_insert: 300
-      repeat_count: 1
-      num_records_upsert: 100
-      num_partitions_upsert: 10
-    type: UpsertNode
-    deps: first_rollback
   first_hive_sync:
     config:
       queue_name: "adhoc"
       engine: "mr"
     type: HiveSyncNode
-    deps: first_upsert
-  first_hive_query:
+    deps: third_insert
+  first_validate:
     config:
-      queue_name: "adhoc"
-      engine: "mr"
-    type: HiveQueryNode
+    type: ValidateDatasetNode
     deps: first_hive_sync
-  second_upsert:
+  first_upsert:
     config:
-      record_size: 70000
+      record_size: 1000
       num_partitions_insert: 1
       num_records_insert: 300
       repeat_count: 1
       num_records_upsert: 100
-      num_partitions_upsert: 10
+      num_partitions_upsert: 1
     type: UpsertNode
-    deps: first_hive_query
-  second_hive_query:
-    config:
-      queue_name: "adhoc"
-      engine: "mr"
-      hive_queries:
-        query1: "select count(*) from testdb.table1 group by `_row_key` having 
count(*) > 1"
-        result1: 0
-        query2: "select count(*) from testdb.table1"
-        result2: 1100
-    type: HiveQueryNode
-    deps: second_upsert
+    deps: first_validate
   first_schedule_compact:
     config:
     type: ScheduleCompactNode
-    deps: second_hive_query
-  third_upsert:
-    config:
-      record_size: 70000
-      num_partitions_insert: 1
-      num_records_insert: 300
-      repeat_count: 1
-      num_records_upsert: 100
-      num_partitions_upsert: 10
-    type: UpsertNode
-    deps: first_schedule_compact
-  first_compact:
+    deps: first_upsert
+  first_delete:
     config:
-    type: CompactNode
+      num_partitions_delete: 1
+      num_records_delete: 500
+    type: DeleteNode
     deps: first_schedule_compact
-  third_hive_query:
+  second_hive_sync:
     config:
       queue_name: "adhoc"
       engine: "mr"
-      hive_queries:
-        query1: "select count(*) from testdb.table1 group by `_row_key` having 
count(*) > 1"
-        result1: 0
-        query2: "select count(*) from testdb.table1"
-        result2: 1400
-    type: HiveQueryNode
-    deps: first_compact
+    type: HiveSyncNode
+    deps: first_delete
+  second_validate:
+    config:
+      delete_input_data: true
+    type: ValidateDatasetNode
+    deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml 
b/docker/demo/config/test-suite/cow-clustering-example.yaml
similarity index 71%
copy from docker/demo/config/test-suite/complex-dag-cow.yaml
copy to docker/demo/config/test-suite/cow-clustering-example.yaml
index 5fa8596..939e16f 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/cow-clustering-example.yaml
@@ -13,13 +13,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-dag_name: cow-long-running-example.yaml
-dag_rounds: 2
-dag_intermittent_delay_mins: 1
+dag_name: cow-clustering-example.yaml
+dag_rounds: 3
+dag_intermittent_delay_mins: 0
 dag_content:
   first_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
     deps: none
   second_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 10000
@@ -35,34 +35,42 @@ dag_content:
     type: InsertNode
   third_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 300
     deps: second_insert
     type: InsertNode
+  first_delete:
+    config:
+      num_partitions_delete: 1
+      num_records_delete: 9000
+    type: DeleteNode
+    deps: third_insert
+  first_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: first_delete
   first_validate:
     config:
+      validate_hive: true
     type: ValidateDatasetNode
-    deps: third_insert
-  first_upsert:
+    deps: first_hive_sync
+  first_cluster:
     config:
-      record_size: 100
-      num_partitions_insert: 1
-      num_records_insert: 300
-      repeat_count: 1
-      num_records_upsert: 100
-      num_partitions_upsert: 1
-    type: UpsertNode
+      execute_itr_count: 2
+    type: ClusteringNode
     deps: first_validate
-  first_delete:
+  second_hive_sync:
     config:
-      num_partitions_delete: 1
-      num_records_delete: 2000
-    type: DeleteNode
-    deps: first_upsert
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: first_cluster
   second_validate:
     config:
-      delete_input_data: true
+      validate_hive: true
     type: ValidateDatasetNode
-    deps: first_delete
\ No newline at end of file
+    deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml 
b/docker/demo/config/test-suite/cow-long-running-example.yaml
index b7026f2..71a34f8 100644
--- a/docker/demo/config/test-suite/cow-long-running-example.yaml
+++ b/docker/demo/config/test-suite/cow-long-running-example.yaml
@@ -14,12 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: cow-long-running-example.yaml
-dag_rounds: 20
-dag_intermittent_delay_mins: 10
+dag_rounds: 50
+dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
     deps: none
   second_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 10000
@@ -35,19 +35,26 @@ dag_content:
     type: InsertNode
   third_insert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       repeat_count: 1
       num_records_insert: 300
     deps: second_insert
     type: InsertNode
+  first_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: third_insert
   first_validate:
     config:
+      validate_hive: true
     type: ValidateDatasetNode
-    deps: third_insert
+    deps: first_hive_sync
   first_upsert:
     config:
-      record_size: 100
+      record_size: 1000
       num_partitions_insert: 1
       num_records_insert: 300
       repeat_count: 1
@@ -58,11 +65,25 @@ dag_content:
   first_delete:
     config:
       num_partitions_delete: 1
-      num_records_delete: 2000
+      num_records_delete: 8000
     type: DeleteNode
     deps: first_upsert
+  second_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: first_delete
   second_validate:
     config:
+      validate_hive: true
       delete_input_data: true
     type: ValidateDatasetNode
-    deps: first_delete
\ No newline at end of file
+    deps: second_hive_sync
+  last_validate:
+    config:
+      execute_itr_count: 50
+      validate_clean: true
+      validate_archival: true
+    type: ValidateAsyncOperations
+    deps: second_validate
diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml 
b/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
similarity index 65%
copy from docker/demo/config/test-suite/complex-dag-cow.yaml
copy to docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
index 5fa8596..b071c46 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
@@ -13,42 +13,49 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-dag_name: cow-long-running-example.yaml
-dag_rounds: 2
+dag_name: cow-long-running-multi-partitions.yaml
+dag_rounds: 50
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
     config:
-      record_size: 100
-      num_partitions_insert: 1
+      record_size: 1000
+      num_partitions_insert: 5
       repeat_count: 1
       num_records_insert: 1000
     type: InsertNode
     deps: none
   second_insert:
     config:
-      record_size: 100
-      num_partitions_insert: 1
+      record_size: 1000
+      num_partitions_insert: 50
       repeat_count: 1
       num_records_insert: 10000
     deps: first_insert
     type: InsertNode
   third_insert:
     config:
-      record_size: 100
-      num_partitions_insert: 1
+      record_size: 1000
+      num_partitions_insert: 2
       repeat_count: 1
       num_records_insert: 300
     deps: second_insert
     type: InsertNode
+  first_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: third_insert
   first_validate:
     config:
+      validate_hive: true
     type: ValidateDatasetNode
-    deps: third_insert
+    deps: first_hive_sync
   first_upsert:
     config:
-      record_size: 100
-      num_partitions_insert: 1
+      record_size: 1000
+      num_partitions_insert: 2
       num_records_insert: 300
       repeat_count: 1
       num_records_upsert: 100
@@ -57,12 +64,26 @@ dag_content:
     deps: first_validate
   first_delete:
     config:
-      num_partitions_delete: 1
-      num_records_delete: 2000
+      num_partitions_delete: 50
+      num_records_delete: 8000
     type: DeleteNode
     deps: first_upsert
+  second_hive_sync:
+    config:
+      queue_name: "adhoc"
+      engine: "mr"
+    type: HiveSyncNode
+    deps: first_delete
   second_validate:
     config:
+      validate_hive: true
       delete_input_data: true
     type: ValidateDatasetNode
-    deps: first_delete
\ No newline at end of file
+    deps: second_hive_sync
+  last_validate:
+    config:
+      execute_itr_count: 50
+      validate_clean: true
+      validate_archival: true
+    type: ValidateAsyncOperations
+    deps: second_validate
diff --git a/docker/demo/config/test-suite/test.properties 
b/docker/demo/config/test-suite/test.properties
index 0aa0f45..9dfb465 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test.properties
@@ -20,6 +20,10 @@ hoodie.datasource.write.recordkey.field=_row_key
 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
 hoodie.datasource.write.partitionpath.field=timestamp
 
+hoodie.clustering.plan.strategy.sort.columns=_row_key
+hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
+hoodie.clustering.inline.max.commits=1
+
 
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
 
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
 
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 63d0bff..25ce039 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -371,7 +371,6 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
    * Common method containing steps to be performed before write 
(upsert/insert/...
    *
    * @param instantTime Instant Time
-   * @param hoodieTable Hoodie Table
    * @return Write Status
    */
   protected void preWrite(String instantTime, WriteOperationType 
writeOperationType) {
@@ -719,7 +718,7 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload, I
    */
   protected abstract void completeCompaction(HoodieCommitMetadata metadata, O 
writeStatuses,
                                              HoodieTable<T, I, K, O> table, 
String compactionCommitTime);
-  
+
   /**
    * Rollback failed compactions. Inflight rollbacks for compactions revert 
the .inflight file to the .requested file
    *
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index b48dd81..d93c663 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -92,6 +92,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
@@ -201,6 +206,12 @@
       <version>${project.version}</version>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-spark-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+    </dependency>
 
     <!-- Fasterxml -->
     <dependency>
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index b5037e9..aeb8748 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -82,6 +82,7 @@ public class HoodieTestSuiteJob {
   private BuiltinKeyGenerator keyGenerator;
 
   public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) 
throws IOException {
+    log.warn("Running spark job w/ app id " + jsc.sc().applicationId());
     this.cfg = cfg;
     this.jsc = jsc;
     cfg.propsFilePath = 
FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index a06c281..3c61291 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.integ.testsuite;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -30,6 +28,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
@@ -41,9 +40,14 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
 import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
 import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -53,13 +57,15 @@ import java.util.Properties;
 import java.util.Set;
 
 /**
- * A writer abstraction for the Hudi test suite. This class wraps different 
implementations of writers used to perform
- * write operations into the target hudi dataset. Current supported writers 
are {@link HoodieDeltaStreamerWrapper}
- * and {@link SparkRDDWriteClient}.
+ * A writer abstraction for the Hudi test suite. This class wraps different 
implementations of writers used to perform write operations into the target 
hudi dataset. Current supported writers are
+ * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
  */
 public class HoodieTestSuiteWriter {
 
+  private static Logger log = 
LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
+
   private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
+  private HoodieWriteConfig writeConfig;
   private SparkRDDWriteClient writeClient;
   protected HoodieTestSuiteConfig cfg;
   private Option<String> lastCheckpoint;
@@ -85,8 +91,9 @@ public class HoodieTestSuiteWriter {
     HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
     this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
     this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath);
+    this.writeConfig = getHoodieClientConfig(cfg, props, schema);
     if (!cfg.useDeltaStreamer) {
-      this.writeClient = new SparkRDDWriteClient(context, 
getHoodieClientConfig(cfg, props, schema), rollbackInflight);
+      this.writeClient = new SparkRDDWriteClient(context, writeConfig, 
rollbackInflight);
     }
     this.cfg = cfg;
     this.configuration = jsc.hadoopConfiguration();
@@ -95,6 +102,10 @@ public class HoodieTestSuiteWriter {
     this.schema = schema;
   }
 
+  public HoodieWriteConfig getWriteConfig() {
+    return this.writeConfig;
+  }
+
   private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, 
Properties props, String schema) {
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder().combineInput(true, 
true).withPath(cfg.targetBasePath)
@@ -178,6 +189,20 @@ public class HoodieTestSuiteWriter {
     }
   }
 
+  public void inlineClustering() {
+    if (!cfg.useDeltaStreamer) {
+      Option<String> clusteringInstantOpt = 
writeClient.scheduleClustering(Option.empty());
+      clusteringInstantOpt.ifPresent(clusteringInstant -> {
+        // inline cluster should auto commit as the user is never given control
+        log.warn("Clustering instant :: " + clusteringInstant);
+        writeClient.cluster(clusteringInstant, true);
+      });
+    } else {
+      // TODO: fix clustering to be done async 
https://issues.apache.org/jira/browse/HUDI-1590
+      throw new IllegalArgumentException("Clustering cannot be triggered with 
deltastreamer");
+    }
+  }
+
   public Option<String> scheduleCompaction(Option<Map<String, String>> 
previousCommitExtraMetadata) throws
       Exception {
     if (!cfg.useDeltaStreamer) {
@@ -189,7 +214,7 @@ public class HoodieTestSuiteWriter {
   }
 
   public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> 
generatedDataStats,
-                     Option<String> instantTime) {
+      Option<String> instantTime) {
     if (!cfg.useDeltaStreamer) {
       Map<String, String> extraMetadata = new HashMap<>();
       /** Store the checkpoint in the commit metadata just like
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 329ef16..193bf2c 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -88,6 +88,10 @@ public class DeltaConfig implements Serializable {
     private static String REINIT_CONTEXT = "reinitialize_context";
     private static String START_PARTITION = "start_partition";
     private static String DELETE_INPUT_DATA = "delete_input_data";
+    private static String VALIDATE_HIVE = "validate_hive";
+    private static String EXECUTE_ITR_COUNT = "execute_itr_count";
+    private static String VALIDATE_ARCHIVAL = "validate_archival";
+    private static String VALIDATE_CLEAN = "validate_clean";
 
     private Map<String, Object> configsMap;
 
@@ -159,6 +163,22 @@ public class DeltaConfig implements Serializable {
       return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, 
false).toString());
     }
 
+    public boolean isValidateHive() {
+      return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, 
false).toString());
+    }
+
+    public int getIterationCountToExecute() {
+      return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, 
-1).toString());
+    }
+
+    public boolean validateArchival() {
+      return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_ARCHIVAL, 
false).toString());
+    }
+
+    public boolean validateClean() {
+      return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_CLEAN, 
false).toString());
+    }
+
     public Map<String, Object> getOtherConfigs() {
       if (configsMap == null) {
         return new HashMap<>();
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
index 17148f5..e4cf84a 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 
 /**
  * This wraps the context needed for an execution of
- * a {@link DagNode#execute(ExecutionContext)}.
+ * a {@link DagNode#execute(ExecutionContext, int)}.
  */
 public class ExecutionContext implements Serializable {
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
index 83a8d5e..0f449a8 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
@@ -32,7 +32,7 @@ public class CleanNode extends DagNode<Boolean> {
   }
 
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     log.info("Executing clean node {}", this.getName());
     executionContext.getHoodieTestSuiteWriter().getWriteClient(this).clean();
   }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java
similarity index 55%
copy from 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
copy to 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java
index 83a8d5e..9ee5ca2 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java
@@ -18,23 +18,31 @@
 
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 
 /**
- * Represents a clean node in the DAG of operations for a workflow. Clean up 
any stale/old files/data lying around
- * (either on file storage or index storage) based on configurations and 
CleaningPolicy used.
+ * Triggers inline clustering. Works only with writeClient. Also, add this as 
last node and end with validation if possible. As of now, after clustering, 
further inserts/upserts may not work since we
+ * call deltaStreamer.
  */
-public class CleanNode extends DagNode<Boolean> {
+public class ClusteringNode extends DagNode<Option<String>> {
 
-  public CleanNode(Config config) {
+  public ClusteringNode(Config config) {
     this.config = config;
   }
 
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
-    log.info("Executing clean node {}", this.getName());
-    executionContext.getHoodieTestSuiteWriter().getWriteClient(this).clean();
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
+    if (config.getIterationCountToExecute() == curItrCount) {
+      try {
+        log.warn("Executing ClusteringNode node {}", this.getName());
+        executionContext.getHoodieTestSuiteWriter().inlineClustering();
+      } catch (Exception e) {
+        log.warn("Exception thrown in ClusteringNode Node :: " + e.getCause() 
+ ", msg :: " + e.getMessage());
+        throw e;
+      }
+    }
   }
 
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
index 4c3ad61..7c9090d 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
@@ -40,10 +40,11 @@ public class CompactNode extends 
DagNode<JavaRDD<WriteStatus>> {
    * if it has one.
    *
    * @param executionContext Execution context to run this compaction
+   * @param curItrCount cur interation count.
    * @throws Exception  will be thrown if any error occurred.
    */
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
         executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
     Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline()
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
index 05ac242..8630243 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
@@ -91,9 +91,10 @@ public abstract class DagNode<O> implements 
Comparable<DagNode<O>> {
    * Execute the {@link DagNode}.
    *
    * @param context The context needed for an execution of a node.
+   * @param curItrCount iteration count for executing the node.
    * @throws Exception Thrown if the execution failed.
    */
-  public abstract void execute(ExecutionContext context) throws Exception;
+  public abstract void execute(ExecutionContext context, int curItrCount) 
throws Exception;
 
   public boolean isCompleted() {
     return isCompleted;
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
index c0671e8..01b8d4c 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
@@ -36,7 +36,7 @@ public class DelayNode extends DagNode<Boolean> {
   }
 
   @Override
-  public void execute(ExecutionContext context) throws Exception {
+  public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
     log.warn("Waiting for "+ delayMins+" mins before going for next test run");
     Thread.sleep(delayMins * 60 * 1000);
   }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index f36b7d4..bdde58a 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -43,7 +43,7 @@ public class HiveQueryNode extends DagNode<Boolean> {
   }
 
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     log.info("Executing hive query node {}", this.getName());
     
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
     HiveSyncConfig hiveSyncConfig = DataSourceUtils
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
index a2b4ee5..97a1bee 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
@@ -35,7 +35,7 @@ public class HiveSyncNode extends DagNode<Boolean> {
   }
 
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     log.info("Executing hive sync node");
     
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
     
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
index 1571349..5ca98cc 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
@@ -39,7 +39,7 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>> 
{
   }
 
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     // if the insert node has schema override set, reinitialize the table with 
new schema.
     if (this.config.getReinitContext()) {
       log.info(String.format("Reinitializing table with %s", 
this.config.getOtherConfigs().toString()));
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
index 12588ac..1824cb8 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector;
@@ -42,10 +41,11 @@ public class RollbackNode extends 
DagNode<Option<HoodieInstant>> {
    * Method helps to rollback the last commit instant in the timeline, if it 
has one.
    *
    * @param executionContext Execution context to perform this rollback
+   * @param curItrCount current iteration count.
    * @throws Exception will be thrown if any error occurred
    */
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     log.info("Executing rollback node {}", this.getName());
     // Can only be done with an instantiation of a new WriteClient hence 
cannot be done during DeltaStreamer
     // testing for now
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
index 0aa67f4..c54b25a 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
@@ -35,7 +35,7 @@ public class ScheduleCompactNode extends 
DagNode<Option<String>> {
   }
 
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     log.info("Executing schedule compact node {}", this.getName());
     // Can only be done with an instantiation of a new WriteClient hence 
cannot be done during DeltaStreamer
     // testing for now
@@ -48,7 +48,7 @@ public class ScheduleCompactNode extends 
DagNode<Option<String>> {
       HoodieCommitMetadata metadata = 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(metaClient
           .getActiveTimeline().getInstantDetails(lastInstant.get()).get(), 
HoodieCommitMetadata.class);
       Option<String> scheduledInstant = 
executionContext.getHoodieTestSuiteWriter().scheduleCompaction(Option.of(metadata
-              .getExtraMetadata()));
+          .getExtraMetadata()));
       if (scheduledInstant.isPresent()) {
         log.info("Scheduling compaction instant {}", scheduledInstant.get());
       }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
index e06d6de..8efd96c 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
@@ -42,10 +42,11 @@ public class SparkSQLQueryNode extends DagNode<Boolean> {
    * Method helps to execute a sparkSql query from a hive table.
    *
    * @param executionContext Execution context to perform this query.
+   * @param curItrCount current iteration count.
    * @throws Exception will be thrown if ant error occurred
    */
   @Override
-  public void execute(ExecutionContext executionContext) throws Exception {
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
     log.info("Executing spark sql query node");
     
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
     
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
new file mode 100644
index 0000000..de8855f
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Node to validate data set sanity like total file versions retained, has 
cleaning happened, has archival happened, etc.
+ */
+public class ValidateAsyncOperations extends DagNode<Option<String>> {
+
+  private static Logger log = 
LoggerFactory.getLogger(ValidateAsyncOperations.class);
+
+  public ValidateAsyncOperations(Config config) {
+    this.config = config;
+  }
+
+  @Override
+  public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
+    if (config.getIterationCountToExecute() == curItrCount) {
+      try {
+        log.warn("Executing ValidateHoodieAsyncOperations node {} with target 
base path {} ", this.getName(),
+            
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+        String basePath = 
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath;
+
+        int maxCommitsRetained = 
executionContext.getHoodieTestSuiteWriter().getWriteConfig().getCleanerCommitsRetained()
 + 1;
+        FileSystem fs = FSUtils.getFs(basePath, 
executionContext.getHoodieTestSuiteWriter().getConfiguration());
+        Map<String, Integer> fileIdCount = new HashMap<>();
+
+        AtomicInteger maxVal = new AtomicInteger();
+        List<String> partitionPaths = 
FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, basePath);
+        for (String partitionPath : partitionPaths) {
+          List<FileStatus> fileStatuses = 
Arrays.stream(FSUtils.getAllDataFilesInPartition(fs, new Path(basePath + "/" + 
partitionPath))).collect(Collectors.toList());
+          fileStatuses.forEach(entry -> {
+            String fileId = FSUtils.getFileId(entry.getPath().getName());
+            fileIdCount.computeIfAbsent(fileId, k -> 0);
+            fileIdCount.put(fileId, fileIdCount.get(fileId) + 1);
+            maxVal.set(Math.max(maxVal.get(), fileIdCount.get(fileId)));
+          });
+        }
+        if (maxVal.get() > maxCommitsRetained) {
+          throw new AssertionError("Total commits (" + maxVal + ") retained 
exceeds max value of " + maxCommitsRetained + ", total commits : ");
+        }
+
+        if (config.validateArchival() || config.validateClean()) {
+          Pattern ARCHIVE_FILE_PATTERN =
+              Pattern.compile("\\.commits_\\.archive\\..*");
+          Pattern CLEAN_FILE_PATTERN =
+              Pattern.compile(".*\\.clean\\..*");
+
+          String metadataPath = 
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath + 
"/.hoodie";
+          FileStatus[] metaFileStatuses = fs.listStatus(new 
Path(metadataPath));
+          boolean archFound = false;
+          boolean cleanFound = false;
+          for (FileStatus fileStatus : metaFileStatuses) {
+            Matcher archFileMatcher = 
ARCHIVE_FILE_PATTERN.matcher(fileStatus.getPath().getName());
+            if (archFileMatcher.matches()) {
+              archFound = true;
+              if (config.validateArchival() && !config.validateClean()) {
+                break;
+              }
+            }
+            Matcher cleanFileMatcher = 
CLEAN_FILE_PATTERN.matcher(fileStatus.getPath().getName());
+            if (cleanFileMatcher.matches()) {
+              cleanFound = true;
+              if (!config.validateArchival() && config.validateClean()) {
+                break;
+              }
+            }
+            if (config.validateClean() && config.validateArchival()) {
+              if (archFound && cleanFound) {
+                break;
+              }
+            }
+          }
+          if (config.validateArchival() && !archFound) {
+            throw new AssertionError("Archival NotFound in " + metadataPath);
+          }
+
+          if (config.validateClean() && !cleanFound) {
+            throw new AssertionError("Clean commits NotFound in " + 
metadataPath);
+          }
+        }
+      } catch (Exception e) {
+        log.warn("Exception thrown in ValidateHoodieAsyncOperations Node :: " 
+ e.getCause() + ", msg :: " + e.getMessage());
+        throw e;
+      }
+    }
+  }
+}
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
index 12fc525..22fea92 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
@@ -62,7 +62,7 @@ public class ValidateDatasetNode extends DagNode<Boolean> {
   }
 
   @Override
-  public void execute(ExecutionContext context) throws Exception {
+  public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
 
     SparkSession session = 
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
 
@@ -111,17 +111,19 @@ public class ValidateDatasetNode extends DagNode<Boolean> 
{
       throw new AssertionError("Hudi contents does not match contents input 
data. ");
     }
 
-    String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
-    String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
-    log.warn("Validating hive table with db : " + database + " and table : " + 
tableName);
-    Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + 
tableName);
-    Dataset<Row> trimmedCowDf = 
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
-    intersectionDf = inputSnapshotDf.intersect(trimmedDf);
-    // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-    if (inputSnapshotDf.except(intersectionDf).count() != 0) {
-      log.error("Data set validation failed for COW hive table. Total count in 
hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
-      throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
+    if (config.isValidateHive()) {
+      String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
+      String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
+      log.warn("Validating hive table with db : " + database + " and table : " 
+ tableName);
+      Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + 
tableName);
+      Dataset<Row> trimmedCowDf = 
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+          
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
+      intersectionDf = inputSnapshotDf.intersect(trimmedDf);
+      // the intersected df should be same as inputDf. if not, there is some 
mismatch.
+      if (inputSnapshotDf.except(intersectionDf).count() != 0) {
+        log.error("Data set validation failed for COW hive table. Total count 
in hudi " + trimmedCowDf.count() + ", input df count " + 
inputSnapshotDf.count());
+        throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
+      }
     }
 
     // if delete input data is enabled, erase input data.
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
index 37244c0..e4c4adb 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
@@ -40,9 +40,10 @@ public class ValidateNode<R> extends DagNode {
    * was set to true or default, but the parent nodes have not completed yet.
    *
    * @param executionContext Context to execute this node
+   * @param curItrCount current iteration count.
    */
   @Override
-  public void execute(ExecutionContext executionContext) {
+  public void execute(ExecutionContext executionContext, int curItrCount) {
     if (this.getParentNodes().size() > 0 && (Boolean) 
this.config.getOtherConfigs().getOrDefault("WAIT_FOR_PARENTS",
         true)) {
       for (DagNode node : (List<DagNode>) this.getParentNodes()) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
index d4074bc..34cb9bc 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
@@ -101,7 +101,8 @@ public class DagScheduler {
         while (queue.size() > 0) {
           DagNode nodeToExecute = queue.poll();
           log.warn("Executing node \"" + 
nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " + 
nodeToExecute.getConfig());
-          futures.add(service.submit(() -> executeNode(nodeToExecute)));
+          int finalCurRound = curRound;
+          futures.add(service.submit(() -> executeNode(nodeToExecute, 
finalCurRound)));
           if (nodeToExecute.getChildNodes().size() > 0) {
             childNodes.addAll(nodeToExecute.getChildNodes());
           }
@@ -114,7 +115,7 @@ public class DagScheduler {
       } while (queue.size() > 0);
       log.info("Finished workloads for round num " + curRound);
       if (curRound < workflowDag.getRounds()) {
-        new 
DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext);
+        new 
DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext, 
curRound);
       }
 
       // After each level, report and flush the metrics
@@ -128,14 +129,14 @@ public class DagScheduler {
    *
    * @param node The node to be executed
    */
-  private void executeNode(DagNode node) {
+  private void executeNode(DagNode node, int curRound) {
     if (node.isCompleted()) {
       throw new RuntimeException("DagNode already completed! Cannot 
re-execute");
     }
     try {
       int repeatCount = node.getConfig().getRepeatCount();
       while (repeatCount > 0) {
-        node.execute(executionContext);
+        node.execute(executionContext, curRound);
         log.info("Finished executing {}", node.getName());
         repeatCount--;
       }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
index 787ec84..00928f3 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
@@ -70,7 +70,7 @@ public class FlexibleSchemaRecordGenerationIterator 
implements Iterator<GenericR
     if (lastRecord == null) {
       GenericRecord record = partitionPathsNonEmpty
           ? 
this.generator.getNewPayloadWithTimestamp(this.firstPartitionPathField)
-          : this.generator.getNewPayload();
+          : this.generator.getNewPayload(partitionPathFieldNames);
       lastRecord = record;
       return record;
     } else {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index 510fc49..c900a8e 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -331,7 +331,7 @@ public class GenericRecordFullPayloadGenerator implements 
Serializable {
    */
   public GenericRecord updateTimestamp(GenericRecord record, String fieldName) 
{
     long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex % 
numDatePartitions, TimeUnit.DAYS);
-    record.put(fieldName, System.currentTimeMillis() - delta);
+    record.put(fieldName, (System.currentTimeMillis() - delta)/1000);
     return record;
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 0ce9aca..9eb5c9d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -309,7 +309,10 @@ public class DeltaSync implements Serializable {
           if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
             resumeCheckpointStr = 
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
           }
-        }  else if 
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+        } else if (commitMetadata.getOperationType() == 
WriteOperationType.CLUSTER) {
+          // incase of CLUSTER commit, no checkpoint will be available in 
metadata.
+          resumeCheckpointStr = Option.empty();
+        } else if 
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
             HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
           throw new HoodieDeltaStreamerException(
               "Unable to find previous checkpoint. Please double check if this 
table "
@@ -373,7 +376,7 @@ public class DeltaSync implements Serializable {
 
     if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
       LOG.info("No new data, source checkpoint has not changed. Nothing to 
commit. Old checkpoint=("
-          + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+           + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + 
")");
       return null;
     }
 
diff --git a/packaging/hudi-integ-test-bundle/pom.xml 
b/packaging/hudi-integ-test-bundle/pom.xml
index f11e95d..db2635a 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -70,6 +70,7 @@
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-client-common</include>
                   <include>org.apache.hudi:hudi-spark-client</include>
+                  <include>org.apache.hudi:hudi-spark-common</include>
                   
<include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
                   
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
                   
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>

Reply via email to