This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new af1128acf9 [HUDI-4084] Add support to test async table services with 
integ test suite framework (#5557)
af1128acf9 is described below

commit af1128acf95ade0e52920638c2a7a0badaf53869
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon May 23 23:05:56 2022 -0400

    [HUDI-4084] Add support to test async table services with integ test suite 
framework (#5557)
    
    * Add support to test async table services with integ test suite framework
    
    * Make await time for validation configurable
---
 ...treamer-long-running-multi-partitions-hive.yaml |   2 +
 ...mer-long-running-multi-partitions-metadata.yaml |   2 +
 ...eltastreamer-long-running-multi-partitions.yaml |   2 +
 .../deltastreamer-medium-clustering.yaml           |   2 +
 ...ltastreamer-medium-full-dataset-validation.yaml |   2 +
 .../detlastreamer-long-running-example.yaml        |   2 +
 .../spark-long-running-non-partitioned.yaml        |   2 +
 .../demo/config/test-suite/spark-long-running.yaml |   2 +
 hudi-integ-test/README.md                          |  50 +++++
 .../testsuite/HoodieContinousTestSuiteWriter.java  | 157 ++++++++++++++++
 ...riter.java => HoodieInlineTestSuiteWriter.java} | 118 +-----------
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |   9 +-
 .../integ/testsuite/HoodieTestSuiteWriter.java     | 206 ++++-----------------
 .../integ/testsuite/configuration/DeltaConfig.java |   5 +
 .../hudi/integ/testsuite/dag/WriterContext.java    |  43 ++++-
 .../dag/nodes/BaseValidateDatasetNode.java         |  66 ++++++-
 .../integ/testsuite/generator/DeltaGenerator.java  |  23 ++-
 17 files changed, 395 insertions(+), 298 deletions(-)

diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 7617220386..8b82415982 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -74,10 +74,12 @@ dag_content:
       validate_once_every_itr : 5
       validate_hive: true
       delete_input_data: true
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: second_hive_sync
   last_validate:
     config:
       execute_itr_count: 50
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
index dc1e99a431..031664cd15 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
@@ -62,10 +62,12 @@ dag_content:
       validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
   last_validate:
     config:
       execute_itr_count: 30
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
index eca4eac1c7..c23775b2ce 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -62,10 +62,12 @@ dag_content:
       validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
   last_validate:
     config:
       execute_itr_count: 50
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml 
b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
index 81c21a7be6..2fc68596d8 100644
--- a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
@@ -64,10 +64,12 @@ dag_content:
     config:
       validate_hive: false
       delete_input_data: true
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
   last_validate:
     config:
       execute_itr_count: 20
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git 
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
 
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
index a2d85a7a4d..db7edb8f8f 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -65,10 +65,12 @@ dag_content:
       validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: false
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
   last_validate:
     config:
       execute_itr_count: 20
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git 
a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml 
b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 1c2f44b060..102807ec43 100644
--- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -62,10 +62,12 @@ dag_content:
       validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
   last_validate:
     config:
       execute_itr_count: 50
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git 
a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml 
b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
index dfbfba0a15..947bbdab86 100644
--- a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
+++ b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
@@ -45,10 +45,12 @@ dag_content:
     config:
       validate_hive: false
       delete_input_data: true
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
   last_validate:
     config:
       execute_itr_count: 6
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git a/docker/demo/config/test-suite/spark-long-running.yaml 
b/docker/demo/config/test-suite/spark-long-running.yaml
index 00fea43f45..2ffef55781 100644
--- a/docker/demo/config/test-suite/spark-long-running.yaml
+++ b/docker/demo/config/test-suite/spark-long-running.yaml
@@ -46,10 +46,12 @@ dag_content:
       validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
   last_validate:
     config:
       execute_itr_count: 30
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
index 5d26d03a20..687ad9a2a9 100644
--- a/hudi-integ-test/README.md
+++ b/hudi-integ-test/README.md
@@ -593,6 +593,56 @@ Sample spark-submit command to test one delta streamer and 
a spark data source w
 --use-hudi-data-to-generate-updates
 ```
 
+=======
+### Testing async table services
+We can test async table services with deltastreamer using below command. 3 
additional arguments are required to test async 
+table services comapared to previous command. 
+
+```shell
+--continuous \
+--test-continuous-mode \
+--min-sync-interval-seconds 20
+```
+
+Here is the full command: 
+```shell
+./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ --conf spark.task.cpus=1 --conf spark.executor.cores=1 \
+--conf spark.task.maxFailures=100 \
+--conf spark.memory.fraction=0.4 \
+--conf spark.rdd.compress=true \
+--conf spark.kryoserializer.buffer.max=2000m \
+--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.memory.storageFraction=0.1 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.sql.hive.convertMetastoreParquet=false \
+--conf spark.driver.maxResultSize=12g \
+--conf spark.executor.heartbeatInterval=120s \
+--conf spark.network.timeout=600s \
+--conf spark.yarn.max.executor.failures=10 \
+--conf spark.sql.catalogImplementation=hive \
+--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob 
<PATH_TO_BUNDLE>/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \
+--source-ordering-field test_suite_source_ordering_field \
+--use-deltastreamer \
+--target-base-path /tmp/hudi/output \
+--input-base-path /tmp/hudi/input \
+--target-table table1 \
+-props file:/tmp/test.properties \
+--schemaprovider-class 
org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \
+--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
+--input-file-size 125829120 \
+--workload-yaml-path file:/tmp/simple-deltastreamer.yaml \
+--workload-generator-classname 
org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
+--table-type COPY_ON_WRITE \
+--compact-scheduling-minshare 1 \
+--clean-input \
+--clean-output \
+--continuous \
+--test-continuous-mode \
+--min-sync-interval-seconds 20
+```
+
+We can use any yaml and properties file w/ above spark-submit command to test 
deltastreamer w/ async table services. 
 
 ## Automated tests for N no of yamls in Local Docker environment
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java
new file mode 100644
index 0000000000..1bf69aaf83
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
+import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Test suite Writer that assists in testing async table operations with 
Deltastreamer continuous mode.
+ *
+ * Sample command
+ * ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ *  --conf spark.task.cpus=1 --conf spark.executor.cores=1 \
+ * --conf spark.task.maxFailures=100 \
+ * --conf spark.memory.fraction=0.4 \
+ * --conf spark.rdd.compress=true \
+ * --conf spark.kryoserializer.buffer.max=2000m \
+ * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+ * --conf spark.memory.storageFraction=0.1 \
+ * --conf spark.shuffle.service.enabled=true \
+ * --conf spark.sql.hive.convertMetastoreParquet=false \
+ * --conf spark.driver.maxResultSize=12g \
+ * --conf spark.executor.heartbeatInterval=120s \
+ * --conf spark.network.timeout=600s \
+ * --conf spark.yarn.max.executor.failures=10 \
+ * --conf spark.sql.catalogImplementation=hive \
+ * --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob 
<PATH_TO_BUNDLE>/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \
+ * --source-ordering-field test_suite_source_ordering_field \
+ * --use-deltastreamer \
+ * --target-base-path /tmp/hudi/output \
+ * --input-base-path /tmp/hudi/input \
+ * --target-table table1 \
+ * -props file:/tmp/test.properties \
+ * --schemaprovider-class 
org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \
+ * --source-class org.apache.hudi.utilities.sources.AvroDFSSource \
+ * --input-file-size 125829120 \
+ * --workload-yaml-path file:/tmp/simple-deltastreamer.yaml \
+ * --workload-generator-classname 
org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
+ * --table-type COPY_ON_WRITE \
+ * --compact-scheduling-minshare 1 \
+ * --clean-input \
+ * --clean-output \
+ * --continuous \
+ * --test-continuous-mode \
+ * --min-sync-interval-seconds 20
+ */
+public class HoodieContinousTestSuiteWriter extends HoodieTestSuiteWriter {
+
+  private static Logger log = 
LoggerFactory.getLogger(HoodieContinousTestSuiteWriter.class);
+
+  public HoodieContinousTestSuiteWriter(JavaSparkContext jsc, Properties 
props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws 
Exception {
+    super(jsc, props, cfg, schema);
+  }
+
+  @Override
+  public void shutdownResources() {
+    log.info("Shutting down deltastreamer gracefully ");
+    this.deltaStreamerWrapper.shutdownGracefully();
+  }
+
+  @Override
+  public RDD<GenericRecord> getNextBatch() throws Exception {
+    return null;
+  }
+
+  @Override
+  public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchSource() throws Exception {
+    return null;
+  }
+
+  @Override
+  public Option<String> startCommit() {
+    return null;
+  }
+
+  public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws 
Exception {
+    return null;
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws 
Exception {
+    return null;
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) 
throws Exception {
+    return null;
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) 
throws Exception {
+    return null;
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws 
Exception {
+    return null;
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws 
Exception {
+    return null;
+  }
+
+  @Override
+  public void inlineClustering() {
+  }
+
+  @Override
+  public Option<String> scheduleCompaction(Option<Map<String, String>> 
previousCommitExtraMetadata) throws
+      Exception {
+    return Option.empty();
+  }
+
+  @Override
+  public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> 
generatedDataStats,
+                     Option<String> instantTime) {
+  }
+
+  @Override
+  public void commitCompaction(JavaRDD<WriteStatus> records, 
JavaRDD<DeltaWriteStats> generatedDataStats,
+                               Option<String> instantTime) throws IOException {
+  }
+}
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
similarity index 67%
copy from 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
copy to 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
index a98c7f2aec..63805e71a5 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
@@ -38,10 +38,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
 import 
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
-import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
-import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
-import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
-import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -58,77 +54,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 /**
  * A writer abstraction for the Hudi test suite. This class wraps different 
implementations of writers used to perform write operations into the target 
hudi dataset. Current supported writers are
  * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
  */
-public class HoodieTestSuiteWriter implements Serializable {
+public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter {
 
-  private static Logger log = 
LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
+  private static Logger log = 
LoggerFactory.getLogger(HoodieInlineTestSuiteWriter.class);
 
-  private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
-  private HoodieWriteConfig writeConfig;
-  private SparkRDDWriteClient writeClient;
-  protected HoodieTestSuiteConfig cfg;
-  private Option<String> lastCheckpoint;
-  private HoodieReadClient hoodieReadClient;
-  private Properties props;
-  private String schema;
-  private transient Configuration configuration;
-  private transient JavaSparkContext sparkContext;
-  private static Set<String> 
VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
-      Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), 
ScheduleCompactNode.class.getName()));
   private static final String GENERATED_DATA_PATH = "generated.data.path";
 
-  public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, 
HoodieTestSuiteConfig cfg, String schema) throws Exception {
-    // We ensure that only 1 instance of HoodieWriteClient is instantiated for 
a HoodieTestSuiteWriter
-    // This does not instantiate a HoodieWriteClient until a
-    // {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)} 
is invoked.
-    HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
-    this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
-    this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath);
-    this.writeConfig = getHoodieClientConfig(cfg, props, schema);
-    if (!cfg.useDeltaStreamer) {
-      this.writeClient = new SparkRDDWriteClient(context, writeConfig);
-    }
-    this.cfg = cfg;
-    this.configuration = jsc.hadoopConfiguration();
-    this.sparkContext = jsc;
-    this.props = props;
-    this.schema = schema;
-  }
-
-  public HoodieWriteConfig getWriteConfig() {
-    return this.writeConfig;
+  public HoodieInlineTestSuiteWriter(JavaSparkContext jsc, Properties props, 
HoodieTestSuiteConfig cfg, String schema) throws Exception {
+    super(jsc, props, cfg, schema);
   }
 
-  private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, 
Properties props, String schema) {
-    HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().combineInput(true, 
true).withPath(cfg.targetBasePath)
-            .withAutoCommit(false)
-            
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
-            
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
-                .build())
-            .forTable(cfg.targetTableName)
-            
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-            .withProps(props);
-    builder = builder.withSchema(schema);
-    return builder.build();
-  }
-
-  private boolean allowWriteClientAccess(DagNode dagNode) {
-    if 
(VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE.contains(dagNode.getClass().getName()))
 {
-      return true;
-    }
-    return false;
+  public void shutdownResources() {
+    // no-op for non continuous mode test suite writer.
   }
 
   public RDD<GenericRecord> getNextBatch() throws Exception {
@@ -139,13 +84,6 @@ public class HoodieTestSuiteWriter implements Serializable {
         .getInsertValue(new Schema.Parser().parse(schema)).get()).rdd();
   }
 
-  public void getNextBatchForDeletes() throws Exception {
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-    lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-    JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
-    inputRDD.collect();
-  }
-
   public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchSource() throws Exception {
     return this.deltaStreamerWrapper.fetchSource();
   }
@@ -254,7 +192,7 @@ public class HoodieTestSuiteWriter implements Serializable {
   }
 
   public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> 
generatedDataStats,
-      Option<String> instantTime) {
+                     Option<String> instantTime) {
     if (!cfg.useDeltaStreamer) {
       Map<String, String> extraMetadata = new HashMap<>();
       /** Store the checkpoint in the commit metadata just like
@@ -269,7 +207,7 @@ public class HoodieTestSuiteWriter implements Serializable {
   }
 
   public void commitCompaction(JavaRDD<WriteStatus> records, 
JavaRDD<DeltaWriteStats> generatedDataStats,
-                     Option<String> instantTime) throws IOException {
+                               Option<String> instantTime) throws IOException {
     if (!cfg.useDeltaStreamer) {
       Map<String, String> extraMetadata = new HashMap<>();
       /** Store the checkpoint in the commit metadata just like
@@ -284,44 +222,4 @@ public class HoodieTestSuiteWriter implements Serializable 
{
       writeClient.commitCompaction(instantTime.get(), metadata, 
Option.of(extraMetadata));
     }
   }
-
-  public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws 
IllegalAccessException {
-    if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) {
-      throw new IllegalAccessException("cannot access write client when 
testing in deltastreamer mode");
-    }
-    synchronized (this) {
-      if (writeClient == null) {
-        this.writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(this.sparkContext), getHoodieClientConfig(cfg, props, 
schema));
-      }
-    }
-    return writeClient;
-  }
-
-  public HoodieDeltaStreamerWrapper getDeltaStreamerWrapper() {
-    return deltaStreamerWrapper;
-  }
-
-  public HoodieTestSuiteConfig getCfg() {
-    return cfg;
-  }
-
-  public Configuration getConfiguration() {
-    return configuration;
-  }
-
-  public JavaSparkContext getSparkContext() {
-    return sparkContext;
-  }
-
-  public Option<String> getLastCheckpoint() {
-    return lastCheckpoint;
-  }
-
-  public Properties getProps() {
-    return props;
-  }
-
-  public String getSchema() {
-    return schema;
-  }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 2d9f841ae3..5e2f9812ba 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -190,11 +190,12 @@ public class HoodieTestSuiteJob {
   }
 
   public void runTestSuite() {
+    WriterContext writerContext = null;
     try {
       WorkflowDag workflowDag = createWorkflowDag();
       log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
       long startTime = System.currentTimeMillis();
-      WriterContext writerContext = new WriterContext(jsc, props, cfg, 
keyGenerator, sparkSession);
+      writerContext = new WriterContext(jsc, props, cfg, keyGenerator, 
sparkSession);
       writerContext.initContext(jsc);
       startOtherServicesIfNeeded(writerContext);
       if (this.cfg.saferSchemaEvolution) {
@@ -217,6 +218,9 @@ public class HoodieTestSuiteJob {
       log.error("Failed to run Test Suite ", e);
       throw new HoodieException("Failed to run Test Suite ", e);
     } finally {
+      if (writerContext != null) {
+        writerContext.shutdownResources();
+      }
       if (stopJsc) {
         stopQuietly();
       }
@@ -310,5 +314,8 @@ public class HoodieTestSuiteJob {
 
     @Parameter(names = {"--use-hudi-data-to-generate-updates"}, description = 
"Use data from hudi to generate updates for new batches ")
     public Boolean useHudiToGenerateUpdates = false;
+
+    @Parameter(names = {"--test-continuous-mode"}, description = "Tests 
continuous mode in deltastreamer.")
+    public Boolean testContinousMode = false;
   }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index a98c7f2aec..7a9e122e86 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -18,37 +18,25 @@
 
 package org.apache.hudi.integ.testsuite;
 
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
-import 
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
 import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
 import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
 import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
 import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
@@ -57,38 +45,31 @@ import org.apache.spark.rdd.RDD;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-/**
- * A writer abstraction for the Hudi test suite. This class wraps different 
implementations of writers used to perform write operations into the target 
hudi dataset. Current supported writers are
- * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
- */
-public class HoodieTestSuiteWriter implements Serializable {
+public abstract class HoodieTestSuiteWriter implements Serializable {
 
   private static Logger log = 
LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
 
-  private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
-  private HoodieWriteConfig writeConfig;
-  private SparkRDDWriteClient writeClient;
-  protected HoodieTestSuiteConfig cfg;
-  private Option<String> lastCheckpoint;
-  private HoodieReadClient hoodieReadClient;
-  private Properties props;
-  private String schema;
-  private transient Configuration configuration;
-  private transient JavaSparkContext sparkContext;
-  private static Set<String> 
VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
+  protected HoodieDeltaStreamerWrapper deltaStreamerWrapper;
+  protected HoodieWriteConfig writeConfig;
+  protected SparkRDDWriteClient writeClient;
+  protected HoodieTestSuiteJob.HoodieTestSuiteConfig cfg;
+  protected Option<String> lastCheckpoint;
+  protected HoodieReadClient hoodieReadClient;
+  protected Properties props;
+  protected String schema;
+  protected transient Configuration configuration;
+  protected transient JavaSparkContext sparkContext;
+  protected static Set<String> 
VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
       Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), 
ScheduleCompactNode.class.getName()));
-  private static final String GENERATED_DATA_PATH = "generated.data.path";
 
-  public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, 
HoodieTestSuiteConfig cfg, String schema) throws Exception {
+  public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, 
HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception {
     // We ensure that only 1 instance of HoodieWriteClient is instantiated for 
a HoodieTestSuiteWriter
     // This does not instantiate a HoodieWriteClient until a
     // {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)} 
is invoked.
@@ -110,7 +91,7 @@ public class HoodieTestSuiteWriter implements Serializable {
     return this.writeConfig;
   }
 
-  private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, 
Properties props, String schema) {
+  private HoodieWriteConfig 
getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, Properties 
props, String schema) {
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder().combineInput(true, 
true).withPath(cfg.targetBasePath)
             .withAutoCommit(false)
@@ -131,159 +112,35 @@ public class HoodieTestSuiteWriter implements 
Serializable {
     return false;
   }
 
-  public RDD<GenericRecord> getNextBatch() throws Exception {
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-    lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-    JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
-    return inputRDD.map(r -> (GenericRecord) ((HoodieAvroRecord) r).getData()
-        .getInsertValue(new Schema.Parser().parse(schema)).get()).rdd();
-  }
+  public abstract void shutdownResources();
 
-  public void getNextBatchForDeletes() throws Exception {
-    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-    lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-    JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
-    inputRDD.collect();
-  }
+  public abstract RDD<GenericRecord> getNextBatch() throws Exception;
 
-  public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchSource() throws Exception {
-    return this.deltaStreamerWrapper.fetchSource();
-  }
+  public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
fetchSource() throws Exception ;
 
-  public Option<String> startCommit() {
-    if (cfg.useDeltaStreamer) {
-      return Option.of(HoodieActiveTimeline.createNewInstantTime());
-    } else {
-      return Option.of(writeClient.startCommit());
-    }
-  }
+  public abstract Option<String> startCommit();
 
-  public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws 
Exception {
-    if (cfg.useDeltaStreamer) {
-      return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT);
-    } else {
-      Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-      lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-      return writeClient.upsert(nextBatch.getRight().getRight(), 
instantTime.get());
-    }
-  }
+  public abstract JavaRDD<WriteStatus> upsert(Option<String> instantTime) 
throws Exception;
 
-  public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws 
Exception {
-    if (cfg.useDeltaStreamer) {
-      return deltaStreamerWrapper.insert();
-    } else {
-      Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-      lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-      return writeClient.insert(nextBatch.getRight().getRight(), 
instantTime.get());
-    }
-  }
+  public abstract JavaRDD<WriteStatus> insert(Option<String> instantTime) 
throws Exception;
 
-  public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) 
throws Exception {
-    if (cfg.useDeltaStreamer) {
-      return deltaStreamerWrapper.insertOverwrite();
-    } else {
-      Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-      lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-      return writeClient.insertOverwrite(nextBatch.getRight().getRight(), 
instantTime.get()).getWriteStatuses();
-    }
-  }
+  public abstract JavaRDD<WriteStatus> insertOverwrite(Option<String> 
instantTime) throws Exception;
 
-  public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) 
throws Exception {
-    if (cfg.useDeltaStreamer) {
-      return deltaStreamerWrapper.insertOverwriteTable();
-    } else {
-      Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-      lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-      return writeClient.insertOverwriteTable(nextBatch.getRight().getRight(), 
instantTime.get()).getWriteStatuses();
-    }
-  }
+  public abstract JavaRDD<WriteStatus> insertOverwriteTable(Option<String> 
instantTime) throws Exception;
 
-  public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws 
Exception {
-    if (cfg.useDeltaStreamer) {
-      return deltaStreamerWrapper.bulkInsert();
-    } else {
-      Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = 
fetchSource();
-      lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
-      return writeClient.bulkInsert(nextBatch.getRight().getRight(), 
instantTime.get());
-    }
-  }
+  public abstract JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) 
throws Exception;
 
-  public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws 
Exception {
-    if (cfg.useDeltaStreamer) {
-      return deltaStreamerWrapper.compact();
-    } else {
-      if (!instantTime.isPresent()) {
-        Option<Pair<String, HoodieCompactionPlan>> compactionPlanPair = Option
-            .fromJavaOptional(hoodieReadClient.getPendingCompactions()
-                .stream().findFirst());
-        if (compactionPlanPair.isPresent()) {
-          instantTime = Option.of(compactionPlanPair.get().getLeft());
-        }
-      }
-      if (instantTime.isPresent()) {
-        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
writeClient.compact(instantTime.get());
-        return compactionMetadata.getWriteStatuses();
-      } else {
-        return null;
-      }
-    }
-  }
+  public abstract JavaRDD<WriteStatus> compact(Option<String> instantTime) 
throws Exception;
 
-  public void inlineClustering() {
-    if (!cfg.useDeltaStreamer) {
-      Option<String> clusteringInstantOpt = 
writeClient.scheduleClustering(Option.empty());
-      clusteringInstantOpt.ifPresent(clusteringInstant -> {
-        // inline cluster should auto commit as the user is never given control
-        log.warn("Clustering instant :: " + clusteringInstant);
-        writeClient.cluster(clusteringInstant, true);
-      });
-    } else {
-      // TODO: fix clustering to be done async 
https://issues.apache.org/jira/browse/HUDI-1590
-      throw new IllegalArgumentException("Clustering cannot be triggered with 
deltastreamer");
-    }
-  }
+  public abstract void inlineClustering() throws Exception ;
 
-  public Option<String> scheduleCompaction(Option<Map<String, String>> 
previousCommitExtraMetadata) throws
-      Exception {
-    if (cfg.useDeltaStreamer) {
-      deltaStreamerWrapper.scheduleCompact();
-      return Option.empty();
-    } else {
-      return writeClient.scheduleCompaction(previousCommitExtraMetadata);
-    }
-  }
+  public abstract Option<String> scheduleCompaction(Option<Map<String, 
String>> previousCommitExtraMetadata) throws Exception;
 
-  public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> 
generatedDataStats,
-      Option<String> instantTime) {
-    if (!cfg.useDeltaStreamer) {
-      Map<String, String> extraMetadata = new HashMap<>();
-      /** Store the checkpoint in the commit metadata just like
-       * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, 
Option)} **/
-      extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, 
lastCheckpoint.get());
-      if (generatedDataStats != null && generatedDataStats.count() > 1) {
-        // Just stores the path where this batch of data is generated to
-        extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> 
s.getFilePath()).collect().get(0));
-      }
-      writeClient.commit(instantTime.get(), records, Option.of(extraMetadata));
-    }
-  }
+  public abstract void commit(JavaRDD<WriteStatus> records, 
JavaRDD<DeltaWriteStats> generatedDataStats,
+                              Option<String> instantTime);
 
-  public void commitCompaction(JavaRDD<WriteStatus> records, 
JavaRDD<DeltaWriteStats> generatedDataStats,
-                     Option<String> instantTime) throws IOException {
-    if (!cfg.useDeltaStreamer) {
-      Map<String, String> extraMetadata = new HashMap<>();
-      /** Store the checkpoint in the commit metadata just like
-       * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, 
Option)} **/
-      extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, 
lastCheckpoint.get());
-      if (generatedDataStats != null && generatedDataStats.count() > 1) {
-        // Just stores the path where this batch of data is generated to
-        extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> 
s.getFilePath()).collect().get(0));
-      }
-      HoodieSparkTable<HoodieRecordPayload> table = 
HoodieSparkTable.create(writeClient.getConfig(), 
writeClient.getEngineContext());
-      HoodieCommitMetadata metadata = 
CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(), 
HoodieJavaRDD.of(records), writeClient.getConfig().getSchema());
-      writeClient.commitCompaction(instantTime.get(), metadata, 
Option.of(extraMetadata));
-    }
-  }
+  public abstract void commitCompaction(JavaRDD<WriteStatus> records, 
JavaRDD<DeltaWriteStats> generatedDataStats,
+                                        Option<String> instantTime) throws 
Exception;
 
   public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws 
IllegalAccessException {
     if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) {
@@ -301,7 +158,7 @@ public class HoodieTestSuiteWriter implements Serializable {
     return deltaStreamerWrapper;
   }
 
-  public HoodieTestSuiteConfig getCfg() {
+  public HoodieTestSuiteJob.HoodieTestSuiteConfig getCfg() {
     return cfg;
   }
 
@@ -325,3 +182,4 @@ public class HoodieTestSuiteWriter implements Serializable {
     return schema;
   }
 }
+
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 1578e86be4..a781d19cb7 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -103,6 +103,7 @@ public class DeltaConfig implements Serializable {
     private static String DELETE_INPUT_DATA_EXCEPT_LATEST = 
"delete_input_data_except_latest";
     private static String PARTITIONS_TO_DELETE = "partitions_to_delete";
     private static String INPUT_PARTITIONS_TO_SKIP_VALIDATE = 
"input_partitions_to_skip_validate";
+    private static String MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS = 
"max_wait_time_for_deltastreamer_catch_up_ms";
 
     // Spark SQL Create Table
     private static String TABLE_TYPE = "table_type";
@@ -253,6 +254,10 @@ public class DeltaConfig implements Serializable {
       return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, 
false).toString());
     }
 
+    public long maxWaitTimeForDeltastreamerToCatchupMs() {
+      return 
Long.valueOf(configsMap.getOrDefault(MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS,
 5 * 60 * 1000).toString());
+    }
+
     public Option<String> getTableType() {
       return !configsMap.containsKey(TABLE_TYPE) ? Option.empty()
           : Option.of(configsMap.get(TABLE_TYPE).toString());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
index d31ef195ec..83b5751c86 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -21,7 +21,9 @@ package org.apache.hudi.integ.testsuite.dag;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.integ.testsuite.HoodieContinousTestSuiteWriter;
 import 
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
+import org.apache.hudi.integ.testsuite.HoodieInlineTestSuiteWriter;
 import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
 import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
 import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
@@ -37,6 +39,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * WriterContext wraps the delta writer/data generator related configuration 
needed to init/reinit.
@@ -53,6 +57,7 @@ public class WriterContext {
   private BuiltinKeyGenerator keyGenerator;
   private transient SparkSession sparkSession;
   private transient JavaSparkContext jsc;
+  private ExecutorService executorService;
 
   public WriterContext(JavaSparkContext jsc, TypedProperties props, 
HoodieTestSuiteConfig cfg,
       BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) {
@@ -67,7 +72,8 @@ public class WriterContext {
     try {
       this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
       String schemaStr = schemaProvider.getSourceSchema().toString();
-      this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, 
schemaStr);
+      this.hoodieTestSuiteWriter = (cfg.testContinousMode && 
cfg.useDeltaStreamer) ? new HoodieContinousTestSuiteWriter(jsc, props, cfg, 
schemaStr)
+          : new HoodieInlineTestSuiteWriter(jsc, props, cfg, schemaStr);
       int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : 
jsc.defaultParallelism();
       this.deltaGenerator = new DeltaGenerator(
           new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), 
DeltaInputType.valueOf(cfg.inputFormatName),
@@ -75,6 +81,10 @@ public class WriterContext {
               schemaStr, cfg.limitFileSize, inputParallelism, 
cfg.deleteOldInput, cfg.useHudiToGenerateUpdates),
           jsc, sparkSession, schemaStr, keyGenerator);
       log.info(String.format("Initialized writerContext with: %s", schemaStr));
+      if (cfg.testContinousMode) {
+        executorService = Executors.newFixedThreadPool(1);
+        executorService.execute(new 
TestSuiteWriterRunnable(hoodieTestSuiteWriter));
+      }
     } catch (Exception e) {
       throw new HoodieException("Failed to reinitialize writerContext", e);
     }
@@ -113,4 +123,35 @@ public class WriterContext {
   public SparkSession getSparkSession() {
     return sparkSession;
   }
+
+  public void shutdownResources() {
+    this.hoodieTestSuiteWriter.shutdownResources();
+    if (executorService != null) {
+      executorService.shutdownNow();
+    }
+  }
+
+  /**
+   * TestSuiteWriterRunnable to spin up a thread to execute deltastreamer with 
async table services.
+   */
+  class TestSuiteWriterRunnable implements Runnable {
+    private HoodieTestSuiteWriter hoodieTestSuiteWriter;
+
+    TestSuiteWriterRunnable(HoodieTestSuiteWriter hoodieTestSuiteWriter) {
+      this.hoodieTestSuiteWriter = hoodieTestSuiteWriter;
+    }
+
+    @Override
+    public void run() {
+      try {
+        Thread.sleep(20000);
+        log.info("Starting continuous sync with deltastreamer ");
+        hoodieTestSuiteWriter.getDeltaStreamerWrapper().sync();
+        log.info("Completed continuous sync with deltastreamer ");
+      } catch (Exception e) {
+        log.error("Deltastreamer failed in continuous mode " + e.getMessage());
+        throw new HoodieException("Shutting down deltastreamer in continuous 
mode failed ", e);
+      }
+    }
+  }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
index a0ebdc5754..15c209e475 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
@@ -20,10 +20,17 @@
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +47,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -47,6 +57,8 @@ import scala.Tuple2;
 import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 
+import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
 /**
  * This nodes validates contents from input path are in tact with Hudi. By 
default no configs are required for this node. But there is an
  * optional config "delete_input_data" that you can set for this node. If set, 
once validation completes, contents from inputPath are deleted. This will come 
in handy for long running test suites.
@@ -78,6 +90,12 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
     int itrCountToExecute = config.getIterationCountToExecute();
     if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
         (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 
0))) {
+      FileSystem fs = new 
Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath)
+          
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+      if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) {
+        awaitUntilDeltaStreamerCaughtUp(context, 
context.getHoodieTestSuiteWriter().getCfg().targetBasePath, fs,
+            context.getHoodieTestSuiteWriter().getCfg().inputBasePath);
+      }
       SparkSession session = 
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
       // todo: Fix partitioning schemes. For now, assumes data based 
partitioning.
       String inputPath = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
@@ -85,8 +103,6 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
       // listing batches to be validated
       String inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
       if (log.isDebugEnabled()) {
-        FileSystem fs = new Path(inputPathStr)
-            
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
         FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
         log.info("fileStatuses length: " + fileStatuses.length);
         for (FileStatus fileStatus : fileStatuses) {
@@ -145,8 +161,6 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
         if (config.isDeleteInputData()) {
           // clean up input data for current group of writes.
           inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-          FileSystem fs = new Path(inputPathStr)
-              
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
           FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
           for (FileStatus fileStatus : fileStatuses) {
             log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
@@ -157,6 +171,50 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
     }
   }
 
+  private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, 
String hudiTablePath, FileSystem fs, String inputPath) throws IOException, 
InterruptedException {
+    HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(hudiTablePath).build();
+    HoodieTimeline commitTimeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    Option<String> latestCheckpoint = getLatestCheckpoint(commitTimeline);
+    FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
+    List<FileStatus> subDirList = Arrays.asList(subDirs);
+    subDirList.sort(Comparator.comparingLong(entry -> 
Long.parseLong(entry.getPath().getName())));
+    String latestSubDir = subDirList.get(subDirList.size() 
-1).getPath().getName();
+    log.info("Latest sub directory in input path " + latestSubDir + ", latest 
checkpoint from deltastreamer " +
+        (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
+    long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs();
+    long waitedSoFar = 0;
+    while (!(latestCheckpoint.isPresent() && 
latestCheckpoint.get().equals(latestSubDir))) {
+      log.warn("Sleeping for 20 secs awaiting for deltastreamer to catch up 
with ingested data");
+      Thread.sleep(20000);
+      meta.reloadActiveTimeline();
+      commitTimeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      latestCheckpoint = getLatestCheckpoint(commitTimeline);
+      waitedSoFar += 20000;
+      if (waitedSoFar >= maxWaitTime) {
+        throw new AssertionError("DeltaStreamer has not caught up after 5 mins 
of wait time. Last known checkpoint " +
+            (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + 
", expected checkpoint to have caugth up " + latestSubDir);
+      }
+      log.info("Latest sub directory in input path " + latestSubDir + ", 
latest checkpoint from deltastreamer " +
+          (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
+    }
+  }
+
+  private Option<String> getLatestCheckpoint(HoodieTimeline timeline) {
+    return (Option<String>) timeline.getReverseOrderedInstants().map(instant 
-> {
+      try {
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+            .fromBytes(timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+        if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
+          return Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
+        } else {
+          return Option.empty();
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to parse HoodieCommitMetadata for 
" + instant.toString(), e);
+      }
+    }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+  }
+
   private Dataset<Row> getInputDf(ExecutionContext context, SparkSession 
session, String inputPath) {
     String recordKeyField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
     String partitionPathField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index c30be2a2a5..20e12e9030 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -216,15 +216,22 @@ public class DeltaGenerator implements Serializable {
         adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete());
         adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, 
config.getNumRecordsDelete());
       } else {
-        deltaInputReader =
-            new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) 
deltaOutputConfig).getDatasetOutputPath(),
-                schemaStr);
-        if (config.getFractionUpsertPerFile() > 0) {
-          adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), 
config.getNumUpsertFiles(),
-              config.getFractionUpsertPerFile());
+        if (((DFSDeltaConfig) 
deltaOutputConfig).shouldUseHudiToGenerateUpdates()) {
+          deltaInputReader =
+              new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) 
deltaOutputConfig).getDatasetOutputPath(),
+                  schemaStr);
+          if (config.getFractionUpsertPerFile() > 0) {
+            adjustedRDD = 
deltaInputReader.read(config.getNumDeletePartitions(), 
config.getNumUpsertFiles(),
+                config.getFractionUpsertPerFile());
+          } else {
+            adjustedRDD = 
deltaInputReader.read(config.getNumDeletePartitions(), 
config.getNumUpsertFiles(), config
+                .getNumRecordsDelete());
+          }
         } else {
-          adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), 
config.getNumUpsertFiles(), config
-              .getNumRecordsDelete());
+          deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, 
schemaStr,
+              ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), 
Option.empty(), Option.empty());
+          adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete());
+          adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, 
config.getNumRecordsDelete());
         }
       }
 

Reply via email to