This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d60491  [HUDI-2388] Add DAG nodes for Spark SQL in integration test 
suite (#3583)
5d60491 is described below

commit 5d60491f5b76ef0f77174d71567d0673d9315bcd
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Sep 13 08:53:13 2021 -0700

    [HUDI-2388] Add DAG nodes for Spark SQL in integration test suite (#3583)
    
    - Fixed validation in integ test suite for both deltastreamer and write 
client path.
    
    Co-authored-by: Sivabalan Narayanan <[email protected]>
---
 docker/demo/config/log4j.properties                |   5 +-
 ...spark-sql-nonpartitioned-external-cow-ctas.yaml |  45 ++
 .../spark-sql-nonpartitioned-external-mor.yaml     |  44 ++
 .../spark-sql-nonpartitioned-managed-cow-ctas.yaml |  44 ++
 .../spark-sql-nonpartitioned-managed-cow.yaml      |  63 +++
 .../spark-sql-partition-cow-updates.yaml           |  61 +++
 .../spark-sql-partitioned-managed-cow-ctas.yaml    |  45 ++
 .../spark-sql-partitioned-managed-cow.yaml         |  64 +++
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |   2 +-
 .../integ/testsuite/configuration/DeltaConfig.java |  87 +++-
 ...tasetNode.java => BaseValidateDatasetNode.java} | 138 +++---
 .../hudi/integ/testsuite/dag/nodes/InsertNode.java |   1 +
 .../testsuite/dag/nodes/ValidateDatasetNode.java   | 118 +----
 .../integ/testsuite/generator/DeltaGenerator.java  |  22 +-
 .../testsuite/dag/nodes/SparkBulkInsertNode.scala  |   7 +-
 .../testsuite/dag/nodes/SparkDeleteNode.scala      |  41 +-
 .../testsuite/dag/nodes/SparkInsertNode.scala      |   7 +-
 .../testsuite/dag/nodes/SparkUpsertNode.scala      |   7 +-
 .../dag/nodes/spark/sql/BaseSparkSqlNode.scala     |  93 ++++
 .../nodes/spark/sql/SparkSqlCreateTableNode.scala  |  92 ++++
 .../dag/nodes/spark/sql/SparkSqlDeleteNode.scala   |  66 +++
 .../dag/nodes/spark/sql/SparkSqlInsertNode.scala   |  50 ++
 .../spark/sql/SparkSqlInsertOverwriteNode.scala    |  50 ++
 .../dag/nodes/spark/sql/SparkSqlMergeNode.scala    |  64 +++
 .../dag/nodes/spark/sql/SparkSqlUpdateNode.scala   |  66 +++
 .../spark/sql/SparkSqlValidateDatasetNode.scala    |  68 +++
 .../hudi/integ/testsuite/utils/SparkSqlUtils.scala | 526 +++++++++++++++++++++
 .../testsuite/job/TestHoodieTestSuiteJob.java      |  42 +-
 .../test/resources/unit-test-spark-sql-dag.yaml    |  64 +++
 29 files changed, 1772 insertions(+), 210 deletions(-)

diff --git a/docker/demo/config/log4j.properties 
b/docker/demo/config/log4j.properties
index 1618bff..225e62e 100644
--- a/docker/demo/config/log4j.properties
+++ b/docker/demo/config/log4j.properties
@@ -21,12 +21,12 @@ log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
-
 # Set the default spark-shell log level to WARN. When running the spark-shell, 
the
 # log level for this class is used to overwrite the root logger's log level, 
so that
 # the user can have different defaults for the shell and regular Spark apps.
 log4j.logger.org.apache.spark.repl.Main=WARN
-
+# Set logging of integration testsuite to INFO level
+log4j.logger.org.apache.hudi.integ.testsuite=INFO
 # Settings to quiet third party logs that are too verbose
 log4j.logger.org.spark_project.jetty=WARN
 log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
@@ -35,7 +35,6 @@ 
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
 log4j.logger.org.apache.parquet=ERROR
 log4j.logger.parquet=ERROR
 log4j.logger.org.apache.spark=WARN
-
 # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent 
UDFs in SparkSQL with Hive support
 log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
 log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
\ No newline at end of file
diff --git 
a/docker/demo/config/test-suite/spark-sql-nonpartitioned-external-cow-ctas.yaml 
b/docker/demo/config/test-suite/spark-sql-nonpartitioned-external-cow-ctas.yaml
new file mode 100644
index 0000000..376d2a5
--- /dev/null
+++ 
b/docker/demo/config/test-suite/spark-sql-nonpartitioned-external-cow-ctas.yaml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: spark-sql-nonpartitioned-managed-cow-ctas.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: cow
+      is_external: true
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      use_ctas: true
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  validate:
+    config:
+      delete_input_data: true
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: insert_records
\ No newline at end of file
diff --git 
a/docker/demo/config/test-suite/spark-sql-nonpartitioned-external-mor.yaml 
b/docker/demo/config/test-suite/spark-sql-nonpartitioned-external-mor.yaml
new file mode 100644
index 0000000..1899830
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-sql-nonpartitioned-external-mor.yaml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: sspark-sql-nonpartitioned-external-mor.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: mor
+      is_external: true
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  validate:
+    config:
+      delete_input_data: true
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: insert_records
\ No newline at end of file
diff --git 
a/docker/demo/config/test-suite/spark-sql-nonpartitioned-managed-cow-ctas.yaml 
b/docker/demo/config/test-suite/spark-sql-nonpartitioned-managed-cow-ctas.yaml
new file mode 100644
index 0000000..8659a90
--- /dev/null
+++ 
b/docker/demo/config/test-suite/spark-sql-nonpartitioned-managed-cow-ctas.yaml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: spark-sql-nonpartitioned-managed-cow-ctas.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: cow
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      use_ctas: true
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  validate:
+    config:
+      delete_input_data: true
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: insert_records
\ No newline at end of file
diff --git 
a/docker/demo/config/test-suite/spark-sql-nonpartitioned-managed-cow.yaml 
b/docker/demo/config/test-suite/spark-sql-nonpartitioned-managed-cow.yaml
new file mode 100644
index 0000000..79ea448
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-sql-nonpartitioned-managed-cow.yaml
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: spark-sql-nonpartitioned-managed-cow.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: cow
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  #merge_records:
+  #  config:
+  #    merge_condition: target._row_key = source._row_key
+  #    matched_action: update set *
+  #    not_matched_action: insert *
+  #    record_size: 1000
+  #    num_partitions_insert: 10
+  #    repeat_count: 1
+  #    num_records_upsert: 100
+  #    num_records_insert: 1000
+  #  type: spark.sql.SparkSqlMergeNode
+  #  deps: insert_records
+  delete_records:
+    config:
+      condition_column: begin_lat
+      record_size: 1000
+      repeat_count: 1
+      ratio_records_change: 0.2
+    type: spark.sql.SparkSqlDeleteNode
+    deps: insert_records
+  validate:
+    config:
+      delete_input_data: true
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: delete_records
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/spark-sql-partition-cow-updates.yaml 
b/docker/demo/config/test-suite/spark-sql-partition-cow-updates.yaml
new file mode 100644
index 0000000..a4b5255
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-sql-partition-cow-updates.yaml
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: spark-sql-partitioned-managed-cow.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: cow
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      partition_field: rider
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  first_validate:
+    config:
+      delete_input_data: false
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: insert_records
+  update_records:
+    config:
+    type: spark.sql.SparkSqlUpdateNode
+    deps: first_validate
+  delete_records:
+    config:
+      condition_column: begin_lat
+      record_size: 1000
+      repeat_count: 1
+      ratio_records_change: 0.2
+    type: spark.sql.SparkSqlDeleteNode
+    deps: update_records
+  second_validate:
+    config:
+      delete_input_data: true
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: delete_records
diff --git 
a/docker/demo/config/test-suite/spark-sql-partitioned-managed-cow-ctas.yaml 
b/docker/demo/config/test-suite/spark-sql-partitioned-managed-cow-ctas.yaml
new file mode 100644
index 0000000..da0f512
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-sql-partitioned-managed-cow-ctas.yaml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: spark-sql-partitioned-managed-cow-ctas.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: cow
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      partition_field: rider
+      use_ctas: true
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  validate:
+    config:
+      delete_input_data: true
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: insert_records
\ No newline at end of file
diff --git 
a/docker/demo/config/test-suite/spark-sql-partitioned-managed-cow.yaml 
b/docker/demo/config/test-suite/spark-sql-partitioned-managed-cow.yaml
new file mode 100644
index 0000000..cb75949
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-sql-partitioned-managed-cow.yaml
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: spark-sql-partitioned-managed-cow.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: cow
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      partition_field: rider
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  #merge_records:
+  #  config:
+  #    merge_condition: target._row_key = source._row_key
+  #    matched_action: update set *
+  #    not_matched_action: insert *
+  #    record_size: 1000
+  #    num_partitions_insert: 10
+  #    repeat_count: 1
+  #    num_records_upsert: 100
+  #    num_records_insert: 1000
+  #  type: spark.sql.SparkSqlMergeNode
+  #  deps: insert_records
+  delete_records:
+    config:
+      condition_column: begin_lat
+      record_size: 1000
+      repeat_count: 1
+      ratio_records_change: 0.2
+    type: spark.sql.SparkSqlDeleteNode
+    deps: insert_records
+  validate:
+    config:
+      delete_input_data: true
+    type: spark.sql.SparkSqlValidateDatasetNode
+    deps: delete_records
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 0bcbaf8..d8ed649 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -101,7 +101,7 @@ public class HoodieTestSuiteJob {
     this.cfg = cfg;
     this.jsc = jsc;
     cfg.propsFilePath = 
FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
-    this.sparkSession = 
SparkSession.builder().config(jsc.getConf()).getOrCreate();
+    this.sparkSession = 
SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
     this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
     this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
     log.info("Creating workload generator with configs : {}", 
props.toString());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index d8ea946..b0ae06b 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -18,14 +18,14 @@
 
 package org.apache.hudi.integ.testsuite.configuration;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
 import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.hadoop.conf.Configuration;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -42,7 +42,7 @@ public class DeltaConfig implements Serializable {
   private final SerializableConfiguration configuration;
 
   public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType 
deltaInputType,
-      SerializableConfiguration configuration) {
+                     SerializableConfiguration configuration) {
     this.deltaOutputMode = deltaOutputMode;
     this.deltaInputType = deltaInputType;
     this.configuration = configuration;
@@ -96,6 +96,33 @@ public class DeltaConfig implements Serializable {
     private static String NUM_ROLLBACKS = "num_rollbacks";
     private static String ENABLE_ROW_WRITING = "enable_row_writing";
 
+    // Spark SQL Create Table
+    private static String TABLE_TYPE = "table_type";
+    private static String IS_EXTERNAL = "is_external";
+    private static String USE_CTAS = "use_ctas";
+    private static String PRIMARY_KEY = "primary_key";
+    private static String PRE_COMBINE_FIELD = "pre_combine_field";
+    private static String PARTITION_FIELD = "partition_field";
+    // Spark SQL Merge
+    private static String MERGE_CONDITION = "merge_condition";
+    private static String DEFAULT_MERGE_CONDITION = "target._row_key = 
source._row_key";
+    private static String MERGE_MATCHED_ACTION = "matched_action";
+    private static String DEFAULT_MERGE_MATCHED_ACTION = "update set *";
+    private static String MERGE_NOT_MATCHED_ACTION = "not_matched_action";
+    private static String DEFAULT_MERGE_NOT_MATCHED_ACTION = "insert *";
+    // Spark SQL Update
+    // column to update.  The logic is fixed, i.e., to do "fare = fare * 1.6". 
to be fixed.
+    private static String UPDATE_COLUMN = "update_column";
+    private static String DEFAULT_UPDATE_COLUMN = "fare";
+    private static String WHERE_CONDITION_COLUMN = "condition_column";
+    // the where condition expression is like "begin_lon between 0.1 and 0.2"
+    // the value range is determined by the ratio of records to update or 
delete
+    // only support numeric type column for now
+    private static String DEFAULT_WHERE_CONDITION_COLUMN = "begin_lon";
+    // the ratio range is between 0.01 and 1.0. The ratio is approximate to 
the actual ratio achieved
+    private static String RATIO_RECORDS_CHANGE = "ratio_records_change";
+    private static double DEFAULT_RATIO_RECORDS_CHANGE = 0.5;
+
     private Map<String, Object> configsMap;
 
     public Config(Map<String, Object> configsMap) {
@@ -194,6 +221,58 @@ public class DeltaConfig implements Serializable {
       return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, 
false).toString());
     }
 
+    public Option<String> getTableType() {
+      return !configsMap.containsKey(TABLE_TYPE) ? Option.empty()
+          : Option.of(configsMap.get(TABLE_TYPE).toString());
+    }
+
+    public boolean shouldUseCtas() {
+      return Boolean.valueOf(configsMap.getOrDefault(USE_CTAS, 
false).toString());
+    }
+
+    public boolean isTableExternal() {
+      return Boolean.valueOf(configsMap.getOrDefault(IS_EXTERNAL, 
false).toString());
+    }
+
+    public Option<String> getPrimaryKey() {
+      return !configsMap.containsKey(PRIMARY_KEY) ? Option.empty()
+          : Option.of(configsMap.get(PRIMARY_KEY).toString());
+    }
+
+    public Option<String> getPreCombineField() {
+      return !configsMap.containsKey(PRE_COMBINE_FIELD) ? Option.empty()
+          : Option.of(configsMap.get(PRE_COMBINE_FIELD).toString());
+    }
+
+    public Option<String> getPartitionField() {
+      return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty()
+              : Option.of(configsMap.get(PARTITION_FIELD).toString());
+    }
+
+    public String getMergeCondition() {
+      return configsMap.getOrDefault(MERGE_CONDITION, 
DEFAULT_MERGE_CONDITION).toString();
+    }
+
+    public String getMatchedAction() {
+      return configsMap.getOrDefault(MERGE_MATCHED_ACTION, 
DEFAULT_MERGE_MATCHED_ACTION).toString();
+    }
+
+    public String getNotMatchedAction() {
+      return configsMap.getOrDefault(MERGE_NOT_MATCHED_ACTION, 
DEFAULT_MERGE_NOT_MATCHED_ACTION).toString();
+    }
+
+    public String getUpdateColumn() {
+      return configsMap.getOrDefault(UPDATE_COLUMN, 
DEFAULT_UPDATE_COLUMN).toString();
+    }
+
+    public String getWhereConditionColumn() {
+      return configsMap.getOrDefault(WHERE_CONDITION_COLUMN, 
DEFAULT_WHERE_CONDITION_COLUMN).toString();
+    }
+
+    public double getRatioRecordsChange() {
+      return Double.valueOf(configsMap.getOrDefault(RATIO_RECORDS_CHANGE, 
DEFAULT_RATIO_RECORDS_CHANGE).toString());
+    }
+
     public Map<String, Object> getOtherConfigs() {
       if (configsMap == null) {
         return new HashMap<>();
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
similarity index 65%
copy from 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
copy to 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
index 09e27c2..9a369bc 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
@@ -7,26 +7,28 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.ReduceFunction;
 import org.apache.spark.sql.Dataset;
@@ -39,75 +41,69 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.stream.Collectors;
 
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
- * This nodes validates contents from input path are in tact with Hudi. This 
nodes uses spark datasource for comparison purposes. By default no configs are 
required for this node. But there is an
+ * This nodes validates contents from input path are in tact with Hudi. By 
default no configs are required for this node. But there is an
  * optional config "delete_input_data" that you can set for this node. If set, 
once validation completes, contents from inputPath are deleted. This will come 
in handy for long running test suites.
  * README has more details under docker set up for usages of this node.
  */
-public class ValidateDatasetNode extends DagNode<Boolean> {
-
-  private static Logger log = 
LoggerFactory.getLogger(ValidateDatasetNode.class);
+public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
 
-  public ValidateDatasetNode(Config config) {
+  public BaseValidateDatasetNode(DeltaConfig.Config config) {
     this.config = config;
   }
 
+  /**
+   * @return {@link Logger} instance to use.
+   */
+  public abstract Logger getLogger();
+
+  /**
+   * @param session     {@link SparkSession} instance to use.
+   * @param context     {@link ExecutionContext} instance to use.
+   * @param inputSchema input schema in {@link StructType}
+   * @return data in {@link Dataset<Row>} to validate.
+   */
+  public abstract Dataset<Row> getDatasetToValidate(SparkSession session, 
ExecutionContext context,
+                                                    StructType inputSchema);
+
   @Override
   public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
 
     SparkSession session = 
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
-
     // todo: Fix partitioning schemes. For now, assumes data based 
partitioning.
     String inputPath = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
-    String hudiPath = 
context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
-    log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path " 
+ hudiPath);
+    log.warn("Validation using data from input path " + inputPath);
     // listing batches to be validated
     String inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-    FileSystem fs = new Path(inputPathStr)
-        .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
-    FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
-    for (FileStatus fileStatus : fileStatuses) {
-      log.debug("Listing all Micro batches to be validated :: " + 
fileStatus.getPath().toString());
+    if (log.isDebugEnabled()) {
+      FileSystem fs = new Path(inputPathStr)
+          
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+      FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+      log.info("fileStatuses length: " + fileStatuses.length);
+      for (FileStatus fileStatus : fileStatuses) {
+        log.debug("Listing all Micro batches to be validated :: " + 
fileStatus.getPath().toString());
+      }
     }
 
-    String recordKeyField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
-    String partitionPathField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
-    // todo: fix hard coded fields from configs.
-    // read input and resolve insert, updates, etc.
-    Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
-    ExpressionEncoder encoder = getEncoder(inputDf.schema());
-    Dataset<Row> inputSnapshotDf = inputDf.groupByKey(
-        (MapFunction<Row, String>) value -> partitionPathField + "+" + 
recordKeyField, Encoders.STRING())
-        .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
-          int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
-          int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
-          if (ts1 > ts2) {
-            return v1;
-          } else {
-            return v2;
-          }
-        })
-        .map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, 
encoder)
-        .filter("_hoodie_is_deleted is NULL");
+    Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
 
     // read from hudi and remove meta columns.
-    Dataset<Row> hudiDf = session.read().format("hudi").load(hudiPath);
-    Dataset<Row> trimmedDf = 
hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
-
-    Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedDf);
+    Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, 
inputSnapshotDf.schema());
+    Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
+    long inputCount = inputSnapshotDf.count();
+    long outputCount = trimmedHudiDf.count();
+    log.debug("Input count: " + inputCount + "; output count: " + outputCount);
     // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-    if (inputSnapshotDf.except(intersectionDf).count() != 0) {
-      log.error("Data set validation failed. Total count in hudi " + 
trimmedDf.count() + ", input df count " + inputSnapshotDf.count());
+    if (outputCount == 0 || inputCount == 0 || 
inputSnapshotDf.except(intersectionDf).count() != 0) {
+      log.error("Data set validation failed. Total count in hudi " + 
outputCount + ", input df count " + inputCount);
       throw new AssertionError("Hudi contents does not match contents input 
data. ");
     }
 
@@ -118,10 +114,12 @@ public class ValidateDatasetNode extends DagNode<Boolean> 
{
       Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + 
tableName);
       Dataset<Row> trimmedCowDf = 
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
           
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
-      intersectionDf = inputSnapshotDf.intersect(trimmedDf);
+      intersectionDf = inputSnapshotDf.intersect(trimmedCowDf);
+      outputCount = trimmedHudiDf.count();
+      log.warn("Input count: " + inputCount + "; output count: " + 
outputCount);
       // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-      if (inputSnapshotDf.except(intersectionDf).count() != 0) {
-        log.error("Data set validation failed for COW hive table. Total count 
in hudi " + trimmedCowDf.count() + ", input df count " + 
inputSnapshotDf.count());
+      if (outputCount == 0 || inputSnapshotDf.except(intersectionDf).count() 
!= 0) {
+        log.error("Data set validation failed for COW hive table. Total count 
in hudi " + outputCount + ", input df count " + inputCount);
         throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
       }
     }
@@ -130,9 +128,9 @@ public class ValidateDatasetNode extends DagNode<Boolean> {
     if (config.isDeleteInputData()) {
       // clean up input data for current group of writes.
       inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-      fs = new Path(inputPathStr)
+      FileSystem fs = new Path(inputPathStr)
           
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
-      fileStatuses = fs.listStatus(new Path(inputPathStr));
+      FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
       for (FileStatus fileStatus : fileStatuses) {
         log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
         fs.delete(fileStatus.getPath(), true);
@@ -140,6 +138,30 @@ public class ValidateDatasetNode extends DagNode<Boolean> {
     }
   }
 
+  private Dataset<Row> getInputDf(ExecutionContext context, SparkSession 
session, String inputPath) {
+    String recordKeyField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
+    String partitionPathField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
+    // todo: fix hard coded fields from configs.
+    // read input and resolve insert, updates, etc.
+    Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
+    ExpressionEncoder encoder = getEncoder(inputDf.schema());
+    return inputDf.groupByKey(
+        (MapFunction<Row, String>) value ->
+            value.getAs(partitionPathField) + "+" + 
value.getAs(recordKeyField), Encoders.STRING())
+        .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
+          int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
+          int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
+          if (ts1 > ts2) {
+            return v1;
+          } else {
+            return v2;
+          }
+        })
+        .map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, 
encoder)
+        .filter("_hoodie_is_deleted != true");
+  }
+
+
   private ExpressionEncoder getEncoder(StructType schema) {
     List<Attribute> attributes = 
JavaConversions.asJavaCollection(schema.toAttributes()).stream()
         .map(Attribute::toAttribute).collect(Collectors.toList());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
index 5ca98cc..f5cf56b 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
@@ -60,6 +60,7 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>> 
{
     if (!config.isDisableGenerate()) {
       log.info("Generating input data for node {}", this.getName());
       this.deltaWriteStatsRDD = 
deltaGenerator.writeRecords(deltaGenerator.generateInserts(config));
+      this.deltaWriteStatsRDD.cache();
       this.deltaWriteStatsRDD.count();
     }
   }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
index 09e27c2..03b37a9 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
@@ -18,133 +18,39 @@
 
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
-import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
-import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.ReduceFunction;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.encoders.RowEncoder;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
-
 /**
- * This nodes validates contents from input path are in tact with Hudi. This 
nodes uses spark datasource for comparison purposes. By default no configs are 
required for this node. But there is an
- * optional config "delete_input_data" that you can set for this node. If set, 
once validation completes, contents from inputPath are deleted. This will come 
in handy for long running test suites.
- * README has more details under docker set up for usages of this node.
+ * This validation node uses spark datasource for comparison purposes.
  */
-public class ValidateDatasetNode extends DagNode<Boolean> {
+public class ValidateDatasetNode extends BaseValidateDatasetNode {
 
   private static Logger log = 
LoggerFactory.getLogger(ValidateDatasetNode.class);
 
   public ValidateDatasetNode(Config config) {
-    this.config = config;
+    super(config);
   }
 
   @Override
-  public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
-
-    SparkSession session = 
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
+  public Logger getLogger() {
+    return log;
+  }
 
-    // todo: Fix partitioning schemes. For now, assumes data based 
partitioning.
-    String inputPath = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
+  @Override
+  public Dataset<Row> getDatasetToValidate(SparkSession session, 
ExecutionContext context,
+                                           StructType inputSchema) {
     String hudiPath = 
context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
-    log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path " 
+ hudiPath);
-    // listing batches to be validated
-    String inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-    FileSystem fs = new Path(inputPathStr)
-        .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
-    FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
-    for (FileStatus fileStatus : fileStatuses) {
-      log.debug("Listing all Micro batches to be validated :: " + 
fileStatus.getPath().toString());
-    }
-
-    String recordKeyField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
-    String partitionPathField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
-    // todo: fix hard coded fields from configs.
-    // read input and resolve insert, updates, etc.
-    Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
-    ExpressionEncoder encoder = getEncoder(inputDf.schema());
-    Dataset<Row> inputSnapshotDf = inputDf.groupByKey(
-        (MapFunction<Row, String>) value -> partitionPathField + "+" + 
recordKeyField, Encoders.STRING())
-        .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
-          int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
-          int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
-          if (ts1 > ts2) {
-            return v1;
-          } else {
-            return v2;
-          }
-        })
-        .map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, 
encoder)
-        .filter("_hoodie_is_deleted is NULL");
-
-    // read from hudi and remove meta columns.
+    log.info("Validate data in target hudi path " + hudiPath);
     Dataset<Row> hudiDf = session.read().format("hudi").load(hudiPath);
-    Dataset<Row> trimmedDf = 
hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
-
-    Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedDf);
-    // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-    if (inputSnapshotDf.except(intersectionDf).count() != 0) {
-      log.error("Data set validation failed. Total count in hudi " + 
trimmedDf.count() + ", input df count " + inputSnapshotDf.count());
-      throw new AssertionError("Hudi contents does not match contents input 
data. ");
-    }
-
-    if (config.isValidateHive()) {
-      String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
-      String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
-      log.warn("Validating hive table with db : " + database + " and table : " 
+ tableName);
-      Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + 
tableName);
-      Dataset<Row> trimmedCowDf = 
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-          
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
-      intersectionDf = inputSnapshotDf.intersect(trimmedDf);
-      // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-      if (inputSnapshotDf.except(intersectionDf).count() != 0) {
-        log.error("Data set validation failed for COW hive table. Total count 
in hudi " + trimmedCowDf.count() + ", input df count " + 
inputSnapshotDf.count());
-        throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
-      }
-    }
-
-    // if delete input data is enabled, erase input data.
-    if (config.isDeleteInputData()) {
-      // clean up input data for current group of writes.
-      inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-      fs = new Path(inputPathStr)
-          
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
-      fileStatuses = fs.listStatus(new Path(inputPathStr));
-      for (FileStatus fileStatus : fileStatuses) {
-        log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
-        fs.delete(fileStatus.getPath(), true);
-      }
-    }
-  }
-
-  private ExpressionEncoder getEncoder(StructType schema) {
-    List<Attribute> attributes = 
JavaConversions.asJavaCollection(schema.toAttributes()).stream()
-        .map(Attribute::toAttribute).collect(Collectors.toList());
-    return RowEncoder.apply(schema)
-        
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
-            SimpleAnalyzer$.MODULE$);
+    return 
hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+            
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
   }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index e004b3b..6d5bc4f 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -44,6 +44,7 @@ import 
org.apache.hudi.integ.testsuite.converter.UpdateConverter;
 import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
 import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
 import org.apache.hudi.integ.testsuite.reader.DeltaInputReader;
+import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
 import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
 import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
@@ -75,7 +76,7 @@ public class DeltaGenerator implements Serializable {
   private int batchId;
 
   public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext 
jsc, SparkSession sparkSession,
-      String schemaStr, BuiltinKeyGenerator keyGenerator) {
+                        String schemaStr, BuiltinKeyGenerator keyGenerator) {
     this.deltaOutputConfig = deltaOutputConfig;
     this.jsc = jsc;
     this.sparkSession = sparkSession;
@@ -123,7 +124,11 @@ public class DeltaGenerator implements Serializable {
         .mapPartitionsWithIndex((index, p) -> {
           return new LazyRecordGeneratorIterator(new 
FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
               minPayloadSize, schemaStr, partitionPathFieldNames, 
numPartitions, startPartition));
-        }, true);
+        }, true)
+        .map(record -> {
+          record.put(SchemaUtils.SOURCE_ORDERING_FIELD, batchId);
+          return record;
+        });
 
     if (deltaOutputConfig.getInputParallelism() < numPartitions) {
       inputBatch = 
inputBatch.coalesce(deltaOutputConfig.getInputParallelism());
@@ -167,7 +172,11 @@ public class DeltaGenerator implements Serializable {
         log.info("Repartitioning records done for updates");
         UpdateConverter converter = new UpdateConverter(schemaStr, 
config.getRecordSize(),
             partitionPathFieldNames, recordRowKeyFieldNames);
-        JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
+        JavaRDD<GenericRecord> convertedRecords = 
converter.convert(adjustedRDD);
+        JavaRDD<GenericRecord> updates = convertedRecords.map(record -> {
+          record.put(SchemaUtils.SOURCE_ORDERING_FIELD, batchId);
+          return record;
+        });
         updates.persist(StorageLevel.DISK_ONLY());
         if (inserts == null) {
           inserts = updates;
@@ -205,11 +214,16 @@ public class DeltaGenerator implements Serializable {
               .getNumRecordsDelete());
         }
       }
+
       log.info("Repartitioning records for delete");
       // persist this since we will make multiple passes over this
       adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
       Converter converter = new DeleteConverter(schemaStr, 
config.getRecordSize());
-      JavaRDD<GenericRecord> deletes = converter.convert(adjustedRDD);
+      JavaRDD<GenericRecord> convertedRecords = converter.convert(adjustedRDD);
+      JavaRDD<GenericRecord> deletes = convertedRecords.map(record -> {
+        record.put(SchemaUtils.SOURCE_ORDERING_FIELD, batchId);
+        return record;
+      });
       deletes.persist(StorageLevel.DISK_ONLY());
       return deletes;
     } else {
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala
index 4d17570..6654264 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala
@@ -30,11 +30,12 @@ import scala.collection.JavaConverters._
 
 /**
  * Spark datasource based bulk insert node
- * @param config1
+ *
+ * @param dagNodeConfig DAG node configurations.
  */
-class SparkBulkInsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
+class SparkBulkInsertNode(dagNodeConfig: Config) extends 
DagNode[RDD[WriteStatus]] {
 
-  config = config1
+  config = dagNodeConfig
 
   /**
    * Execute the {@link DagNode}.
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
index 4ebd59d..645787a 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
@@ -19,6 +19,7 @@
 package org.apache.hudi.integ.testsuite.dag.nodes
 
 import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.client.WriteStatus
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
@@ -32,12 +33,13 @@ import scala.collection.JavaConverters._
 
 /**
  * Spark datasource based upsert node
- * @param config1
+ *
+ * @param dagNodeConfig DAG node configurations.
  */
-class SparkDeleteNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
+class SparkDeleteNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] 
{
 
   private val log = LogManager.getLogger(getClass)
-  config = config1
+  config = dagNodeConfig
 
   /**
    * Execute the {@link DagNode}.
@@ -47,20 +49,9 @@ class SparkDeleteNode(config1: Config) extends 
DagNode[RDD[WriteStatus]] {
    * @throws Exception Thrown if the execution failed.
    */
   override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
-    if (!config.isDisableGenerate) {
-      println("Generating input data for node {}", this.getName)
-      
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count()
-    }
-
     // Deletes can't be fetched using getNextBatch() bcoz, getInsert(schema) 
from payload will return empty for delete
     // records
-    context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes()
-    val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + 
context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")
-
-    val avroDf = 
context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
-    val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, 
"testStructName","testNamespace", false,
-      org.apache.hudi.common.util.Option.of(new 
Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
-
+    val genRecsRDD = generateRecordsForDelete(config, context)
     val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
       context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
       context.getWriterContext.getSparkSession)
@@ -75,4 +66,24 @@ class SparkDeleteNode(config1: Config) extends 
DagNode[RDD[WriteStatus]] {
       .mode(SaveMode.Append)
       .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
   }
+
+  /**
+   * Generates records for delete operations in Spark.
+   *
+   * @param config  Node configs.
+   * @param context The context needed for an execution of a node.
+   * @return Records in {@link RDD}.
+   */
+  private def generateRecordsForDelete(config: Config, context: 
ExecutionContext): RDD[GenericRecord] = {
+    if (!config.isDisableGenerate) {
+      
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count()
+    }
+
+    context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes()
+    val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + 
context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")
+
+    val avroDf = 
context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
+    HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", 
false,
+      org.apache.hudi.common.util.Option.of(new 
Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
+  }
 }
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
index f962e41..1b69cf8 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
@@ -30,11 +30,12 @@ import scala.collection.JavaConverters._
 
 /**
  * Spark datasource based insert node
- * @param config1
+ *
+ * @param dagNodeConfig DAG node configurations.
  */
-class SparkInsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
+class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] 
{
 
-  config = config1
+  config = dagNodeConfig
 
   /**
    * Execute the {@link DagNode}.
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
index 6486fed..858827a 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
@@ -30,11 +30,12 @@ import scala.collection.JavaConverters._
 
 /**
  * Spark datasource based upsert node
- * @param config1
+ *
+ * @param dagNodeConfig DAG node configurations.
  */
-class SparkUpsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
+class SparkUpsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] 
{
 
-  config = config1
+  config = dagNodeConfig
 
   /**
    * Execute the {@link DagNode}.
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/BaseSparkSqlNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/BaseSparkSqlNode.scala
new file mode 100644
index 0000000..ce6a40e
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/BaseSparkSqlNode.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+import org.apache.spark.rdd.RDD
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Abstract class for DAG node of running Spark SQL.
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+abstract class BaseSparkSqlNode(dagNodeConfig: Config) extends 
DagNode[RDD[WriteStatus]] {
+
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+  val TEMP_TABLE_NAME = "_spark_sql_temp_table"
+  config = dagNodeConfig
+
+  /**
+   * Returns the Spark SQL query to execute for this {@link DagNode}.
+   *
+   * @param config  DAG node configurations.
+   * @param context The context needed for an execution of a node.
+   * @return the query String.
+   */
+  def queryToRun(config: Config, context: ExecutionContext): String
+
+  /**
+   * Prepares the data for the Spark write operation.
+   *
+   * @param context The context needed for an execution of a node.
+   * @return Records in {@link RDD}.
+   */
+  def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
+    if (!config.isDisableGenerate) {
+      
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
+    }
+    context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
+  }
+
+  /**
+   * @return Name of the temp table containing the input data.
+   */
+  def getTempTableName(): String = {
+    TEMP_TABLE_NAME
+  }
+
+  /**
+   * Execute the {@link DagNode}.
+   *
+   * @param context     The context needed for an execution of a node.
+   * @param curItrCount iteration count for executing the node.
+   * @throws Exception Thrown if the execution failed.
+   */
+  override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
+    LOG.info("Run query in Spark SQL ...")
+    val nextBatch = prepareData(context)
+    val sparkSession = context.getWriterContext.getSparkSession
+    val inputDF = AvroConversionUtils.createDataFrame(nextBatch,
+      context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
+      sparkSession)
+    inputDF.createOrReplaceTempView(TEMP_TABLE_NAME)
+
+    val query = queryToRun(config, context)
+    SparkSqlUtils.logQuery(LOG, query)
+    sparkSession.sql(query)
+    LOG.info("Finish run query in Spark SQL.")
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlCreateTableNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlCreateTableNode.scala
new file mode 100644
index 0000000..3db6aa2
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlCreateTableNode.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+import org.apache.spark.rdd.RDD
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * DAG node of create table using Spark SQL.
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+class SparkSqlCreateTableNode(dagNodeConfig: Config) extends 
DagNode[RDD[WriteStatus]] {
+
+  val LOG: Logger = LoggerFactory.getLogger(classOf[SparkSqlCreateTableNode])
+  val TEMP_TABLE_NAME: String = "_spark_sql_temp_table"
+
+  config = dagNodeConfig
+
+  /**
+   * Execute the {@link DagNode}.
+   *
+   * @param context     The context needed for an execution of a node.
+   * @param curItrCount iteration count for executing the node.
+   * @throws Exception Thrown if the execution failed.
+   */
+  override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
+    LOG.info("Creating table in Spark SQL ...")
+    val sparkSession = context.getWriterContext.getSparkSession
+    val targetTableName = context.getWriterContext.getCfg.targetTableName
+    val targetBasePath = context.getWriterContext.getCfg.targetBasePath + 
"_sql"
+
+    if (config.shouldUseCtas) {
+      // Prepares data for CTAS query
+      if (!config.isDisableGenerate) {
+        
context.getDeltaGenerator.writeRecords(context.getDeltaGenerator.generateInserts(config)).count()
+      }
+      val nextBatch = 
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
+      val sparkSession = context.getWriterContext.getSparkSession
+      val inputDF = AvroConversionUtils.createDataFrame(nextBatch,
+        context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
+        sparkSession)
+      inputDF.createOrReplaceTempView(TEMP_TABLE_NAME)
+    }
+
+    // Cleans up the table
+    sparkSession.sql("drop table if exists " + targetTableName)
+    if (config.isTableExternal) {
+      LOG.info("Clean up " + targetBasePath)
+      val fs = FSUtils.getFs(targetBasePath, 
context.getJsc.hadoopConfiguration())
+      val targetPath = new Path(targetBasePath)
+      if (fs.exists(targetPath)) {
+        fs.delete(targetPath, true)
+      }
+    }
+
+    // Executes the create table query
+    val createTableQuery = SparkSqlUtils.constructCreateTableQuery(
+      config, targetTableName, targetBasePath,
+      context.getWriterContext.getHoodieTestSuiteWriter.getSchema, 
TEMP_TABLE_NAME)
+    SparkSqlUtils.logQuery(LOG, createTableQuery)
+    sparkSession.sql(createTableQuery)
+    val targetTableCount = sparkSession.sql("select * from " + targetTableName)
+    LOG.info("Target table count: " + targetTableCount.count())
+    LOG.info("Finish create table in Spark SQL.")
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlDeleteNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlDeleteNode.scala
new file mode 100644
index 0000000..847381f
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlDeleteNode.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+import org.apache.spark.rdd.RDD
+
+/**
+ * DAG node of delete using Spark SQL.
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+class SparkSqlDeleteNode(dagNodeConfig: Config) extends 
BaseSparkSqlNode(dagNodeConfig) {
+
+  config = dagNodeConfig
+
+  /**
+   * Prepares the data for the write operation.
+   *
+   * @param context The context needed for an execution of a node.
+   * @return Records in {@link RDD}.
+   */
+  override def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
+    val sparkSession = context.getWriterContext.getSparkSession
+    val recordsToDelete = SparkSqlUtils.generateDeleteRecords(
+      config, sparkSession, 
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
+      context.getWriterContext.getCfg.targetTableName, 
sparkSession.sparkContext.defaultParallelism)
+    LOG.info("Number of records to delete: " + recordsToDelete.count())
+    // The update records corresponding to the SQL are only used for data 
validation
+    context.getDeltaGenerator().writeRecords(recordsToDelete).count()
+    recordsToDelete
+  }
+
+  /**
+   * Returns the Spark SQL query to execute for this {@link DagNode}.
+   *
+   * @param config  DAG node configurations.
+   * @param context The context needed for an execution of a node.
+   * @return the query String.
+   */
+  override def queryToRun(config: Config, context: ExecutionContext): String = 
{
+    SparkSqlUtils.constructDeleteQuery(config, 
context.getWriterContext.getSparkSession,
+      context.getWriterContext.getCfg.targetTableName)
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlInsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlInsertNode.scala
new file mode 100644
index 0000000..6fc79f4
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlInsertNode.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+
+/**
+ * DAG node of insert using Spark SQL.
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+class SparkSqlInsertNode(dagNodeConfig: Config) extends 
BaseSparkSqlNode(dagNodeConfig) {
+
+  config = dagNodeConfig
+
+  /**
+   * Returns the Spark SQL query to execute for this {@link DagNode}.
+   *
+   * @param config  DAG node configurations.
+   * @param context The context needed for an execution of a node.
+   * @return the query String.
+   */
+  override def queryToRun(config: Config, context: ExecutionContext): String = 
{
+    val targetTableName = context.getWriterContext.getCfg.targetTableName
+    SparkSqlUtils.constructInsertQuery(
+      "into", targetTableName,
+      SparkSqlUtils.getTableSchema(context.getWriterContext.getSparkSession, 
targetTableName),
+      getTempTableName())
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlInsertOverwriteNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlInsertOverwriteNode.scala
new file mode 100644
index 0000000..248b70d
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlInsertOverwriteNode.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+
+/**
+ * DAG node of insert overwrite using Spark SQL.
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+class SparkSqlInsertOverwriteNode(dagNodeConfig: Config) extends 
BaseSparkSqlNode(dagNodeConfig) {
+
+  config = dagNodeConfig
+
+  /**
+   * Returns the Spark SQL query to execute for this {@link DagNode}.
+   *
+   * @param config  DAG node configurations.
+   * @param context The context needed for an execution of a node.
+   * @return the query String.
+   */
+  override def queryToRun(config: Config, context: ExecutionContext): String = 
{
+    val targetTableName = context.getWriterContext.getCfg.targetTableName
+    SparkSqlUtils.constructInsertQuery(
+      "overwrite", targetTableName,
+      SparkSqlUtils.getTableSchema(context.getWriterContext.getSparkSession, 
targetTableName),
+      getTempTableName())
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlMergeNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlMergeNode.scala
new file mode 100644
index 0000000..b03230b
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlMergeNode.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+import org.apache.spark.rdd.RDD
+
+/**
+ * DAG node of merge using Spark SQL.
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+class SparkSqlMergeNode(dagNodeConfig: Config) extends 
BaseSparkSqlNode(dagNodeConfig) {
+
+  config = dagNodeConfig
+
+  /**
+   * Prepares the data for the Spark write operation.
+   *
+   * @param context The context needed for an execution of a node.
+   * @return Records in {@link RDD}.
+   */
+  override def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
+    if (!config.isDisableGenerate) {
+      
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count()
+    }
+    context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
+  }
+
+  /**
+   * Returns the Spark SQL query to execute for this {@link DagNode}.
+   *
+   * @param config  DAG node configurations.
+   * @param context The context needed for an execution of a node.
+   * @return the query String.
+   */
+  override def queryToRun(config: Config, context: ExecutionContext): String = 
{
+    val targetTableName = context.getWriterContext.getCfg.targetTableName
+    SparkSqlUtils.constructMergeQuery(
+      config, targetTableName,
+      SparkSqlUtils.getTableSchema(context.getWriterContext.getSparkSession, 
targetTableName),
+      getTempTableName())
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlUpdateNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlUpdateNode.scala
new file mode 100644
index 0000000..fdc799f
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlUpdateNode.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+import org.apache.spark.rdd.RDD
+
+/**
+ * DAG node of update using Spark SQL.
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+class SparkSqlUpdateNode(dagNodeConfig: Config) extends 
BaseSparkSqlNode(dagNodeConfig) {
+
+  config = dagNodeConfig
+
+  /**
+   * Prepares the data for the Spark write operation.
+   *
+   * @param context The context needed for an execution of a node.
+   * @return Records in {@link RDD}.
+   */
+  override def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
+    val sparkSession = context.getWriterContext.getSparkSession
+    val recordsToUpdate = SparkSqlUtils.generateUpdateRecords(
+      config, sparkSession, 
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
+      context.getWriterContext.getCfg.targetTableName, 
sparkSession.sparkContext.defaultParallelism)
+    LOG.info("Number of records to update: " + recordsToUpdate.count())
+    // The update records corresponding to the SQL are only used for data 
validation
+    context.getDeltaGenerator().writeRecords(recordsToUpdate).count()
+    recordsToUpdate
+  }
+
+  /**
+   * Returns the Spark SQL query to execute for this {@link DagNode}.
+   *
+   * @param config  DAG node configurations.
+   * @param context The context needed for an execution of a node.
+   * @return the query String.
+   */
+  override def queryToRun(config: Config, context: ExecutionContext): String = 
{
+    SparkSqlUtils.constructUpdateQuery(config, 
context.getWriterContext.getSparkSession,
+      context.getWriterContext.getCfg.targetTableName)
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlValidateDatasetNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlValidateDatasetNode.scala
new file mode 100644
index 0000000..01804ba
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlValidateDatasetNode.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
+
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.dag.nodes.BaseValidateDatasetNode
+import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * This validation node uses Spark SQL to get data for comparison purposes.
+ */
+class SparkSqlValidateDatasetNode(dagNodeConfig: Config) extends 
BaseValidateDatasetNode(dagNodeConfig) {
+
+  val LOG: Logger = 
LoggerFactory.getLogger(classOf[SparkSqlValidateDatasetNode])
+
+  config = dagNodeConfig
+
+  /**
+   * @return {@link Logger} instance to use.
+   */
+  override def getLogger: Logger = LOG
+
+  /**
+   * @param session     {@link SparkSession} instance to use.
+   * @param context     {@link ExecutionContext} instance to use.
+   * @param inputSchema input schema in {@link StructType}
+   * @return data in {@link Dataset< Row >} to validate.
+   */
+  override def getDatasetToValidate(session: SparkSession, context: 
ExecutionContext,
+                                    inputSchema: StructType): Dataset[Row] = {
+    val tableName = context.getWriterContext.getCfg.targetTableName
+    LOG.info("Validate data in table " + tableName)
+    val sortedInputFieldNames = inputSchema.fieldNames.sorted
+    val tableSchema = session.table(tableName).schema
+    val sortedTableFieldNames = tableSchema.fieldNames
+      .filter(field => 
!HoodieRecord.HOODIE_META_COLUMNS.contains(field)).sorted
+    if (!(sortedInputFieldNames sameElements sortedTableFieldNames)) {
+      LOG.error("Input schema: ")
+      inputSchema.printTreeString()
+      LOG.error("Table schema: ")
+      tableSchema.printTreeString()
+      throw new AssertionError("Data set validation failed.  The schema does 
not match.")
+    }
+    session.sql(SparkSqlUtils.constructSelectQuery(inputSchema, tableName))
+  }
+}
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
new file mode 100644
index 0000000..fa16eae
--- /dev/null
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.utils
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.Option
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import 
org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator
+import 
org.apache.hudi.integ.testsuite.utils.SparkSqlUtils.getFieldNamesAndTypes
+import org.apache.hudi.utilities.schema.RowBasedSchemaProvider
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import org.slf4j.Logger
+
+import scala.math.BigDecimal.RoundingMode.RoundingMode
+
+/**
+ * Utils for test nodes in Spark SQL
+ */
+object SparkSqlUtils {
+
+  /**
+   * @param sparkSession spark session to use
+   * @param tableName    table name
+   * @return table schema excluding meta columns in `StructType`
+   */
+  def getTableSchema(sparkSession: SparkSession, tableName: String): 
StructType = {
+    new StructType(sparkSession.table(tableName).schema.fields
+      .filter(field => !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name)))
+  }
+
+  /**
+   * Converts Avro schema in String to the SQL schema expression, with 
partition fields at the end
+   *
+   * For example, given the Avro schema below:
+   * """
+   * 
{"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
+   * 
{"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
+   * 
{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
+   * {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
+   * {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
+   * """
+   * and the partition columns Set("rider"),
+   * the SQL schema expression is:
+   * """
+   * timestamp bigint,
+   * _row_key string,
+   * driver string,
+   * begin_lat double,
+   * begin_lon double,
+   * end_lat double,
+   * end_lon double,
+   * fare double,
+   * _hoodie_is_deleted boolean,
+   * rider string
+   * """
+   *
+   * @param avroSchemaString Avro schema String
+   * @param partitionColumns partition columns
+   * @return corresponding SQL schema expression
+   */
+  def convertAvroToSqlSchemaExpression(avroSchemaString: String, 
partitionColumns: Set[String]): String = {
+    val fields: Array[(String, String)] = 
getFieldNamesAndTypes(avroSchemaString)
+    val reorderedFields = fields.filter(field => 
!partitionColumns.contains(field._1)) ++
+      fields.filter(field => partitionColumns.contains(field._1))
+    reorderedFields.map(e => e._1 + " " + e._2).mkString(",\n")
+  }
+
+  /**
+   * Converts Avro schema in String to an array of field names.
+   *
+   * For example, given the Avro schema below:
+   * """
+   * 
{"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
+   * 
{"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
+   * 
{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
+   * {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
+   * {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
+   * """
+   * the output is
+   * ["timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", 
"end_lat", "end_lon",
+   * "fare", "_hoodie_is_deleted"]
+   *
+   * @param avroSchemaString Avro schema String
+   * @return an array of field names.
+   */
+  def convertAvroToFieldNames(avroSchemaString: String): Array[String] = {
+    getFieldNamesAndTypes(avroSchemaString).map(e => e._1)
+  }
+
+  /**
+   * Gets an array of field names and types from Avro schema String.
+   *
+   * For example, given the Avro schema below:
+   * """
+   * 
{"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
+   * 
{"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
+   * 
{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
+   * {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
+   * {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
+   * """
+   * the output is
+   * [("timestamp", "bigint"),
+   * ("_row_key", "string"),
+   * ("rider", "string",
+   * ("driver", "string"),
+   * ("begin_lat", "double"),
+   * ("begin_lon", "double"),
+   * ("end_lat", "double"),
+   * ("end_lon", "double"),
+   * ("fare", "double"),
+   * ("_hoodie_is_deleted", "boolean")]
+   *
+   * @param avroSchemaString Avro schema String
+   * @return an array of field names and types
+   */
+  def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] 
= {
+    val schema = new Schema.Parser().parse(avroSchemaString)
+    val structType = 
SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
+    structType.fields.map(field => (field.name, field.dataType.simpleString))
+  }
+
+  /**
+   * Logs the Spark SQL query to run.
+   *
+   * @param log   {@link Logger} instance to use.
+   * @param query query String.
+   */
+  def logQuery(log: Logger, query: String): Unit = {
+    log.warn("----- Running the following Spark SQL query -----")
+    log.warn(query)
+    log.warn("-" * 50)
+  }
+
+  /**
+   * Constructs the select query.
+   *
+   * For example, given the Avro schema below:
+   * """
+   * 
{"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
+   * 
{"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
+   * 
{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
+   * {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
+   * {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
+   * """
+   * and the partition columns Set("rider"),
+   * the output is
+   * """
+   * select timestamp, _row_key, driver, begin_lat, begin_lon, end_lat, 
end_lon, fare,
+   * _hoodie_is_deleted, rider from _temp_table
+   * """
+   *
+   * @param inputSchema      input Avro schema String.
+   * @param partitionColumns partition columns
+   * @param tableName        table name.
+   * @return select query String.
+   */
+  def constructSelectQuery(inputSchema: String, partitionColumns: Set[String], 
tableName: String): String = {
+    val fieldNames: Array[String] = 
SparkSqlUtils.convertAvroToFieldNames(inputSchema)
+    val reorderedFieldNames = fieldNames.filter(name => 
!partitionColumns.contains(name)) ++
+      fieldNames.filter(name => partitionColumns.contains(name))
+    constructSelectQuery(reorderedFieldNames, tableName)
+  }
+
+  /**
+   * Constructs the select query with {@link StructType} columns in the select.
+   *
+   * @param structType {@link StructType} instance.
+   * @param tableName  table name.
+   * @return select query String.
+   */
+  def constructSelectQuery(structType: StructType, tableName: String): String 
= {
+    constructSelectQuery(structType, Set.empty[String], tableName)
+  }
+
+  /**
+   * Constructs the select query with {@link StructType} columns in the select 
and the partition
+   * columns at the end.
+   *
+   * @param structType       {@link StructType} instance.
+   * @param partitionColumns partition columns in a {@link Set}
+   * @param tableName        table name.
+   * @return select query String.
+   */
+  def constructSelectQuery(structType: StructType, partitionColumns: 
Set[String], tableName: String): String = {
+    val fieldNames: Array[String] = structType.fields.map(field => field.name)
+    val reorderedFieldNames = fieldNames.filter(name => 
!partitionColumns.contains(name)) ++
+      fieldNames.filter(name => partitionColumns.contains(name))
+    constructSelectQuery(reorderedFieldNames, tableName)
+  }
+
+  /**
+   * Constructs the select query with a {@link Array} of String.
+   *
+   * @param fieldNames field names in String.
+   * @param tableName  table name.
+   * @return select query String.
+   */
+  def constructSelectQuery(fieldNames: Array[String], tableName: String): 
String = {
+    val selectQueryBuilder = new StringBuilder("select ");
+    selectQueryBuilder.append(fieldNames.mkString(", "))
+    selectQueryBuilder.append(" from ")
+    selectQueryBuilder.append(tableName)
+    selectQueryBuilder.toString()
+  }
+
+  /**
+   * Constructs the Spark SQL create table query based on the configs.
+   *
+   * @param config          DAG node configurations.
+   * @param targetTableName target table name.
+   * @param targetBasePath  target bash path for external table.
+   * @param inputSchema     input Avro schema String.
+   * @param inputTableName  name of the table containing input data.
+   * @return create table query.
+   */
+  def constructCreateTableQuery(config: Config, targetTableName: String, 
targetBasePath: String,
+                                inputSchema: String, inputTableName: String): 
String = {
+    // Constructs create table statement
+    val createTableQueryBuilder = new StringBuilder("create table ")
+    createTableQueryBuilder.append(targetTableName)
+    val partitionColumns: Set[String] =
+      if (config.getPartitionField.isPresent) 
Set(config.getPartitionField.get) else Set.empty
+    if (!config.shouldUseCtas) {
+      // Adds the schema statement if not using CTAS
+      createTableQueryBuilder.append(" (")
+      
createTableQueryBuilder.append(SparkSqlUtils.convertAvroToSqlSchemaExpression(inputSchema,
 partitionColumns))
+      createTableQueryBuilder.append("\n)")
+    }
+    createTableQueryBuilder.append(" using hudi")
+    val tableTypeOption = config.getTableType
+    val primaryKeyOption = config.getPrimaryKey
+    val preCombineFieldOption = config.getPreCombineField
+
+    // Adds location for external table
+    if (config.isTableExternal) {
+      createTableQueryBuilder.append("\nlocation '" + targetBasePath + "'")
+    }
+
+    // Adds options if set
+    var options = Array[String]()
+    if (tableTypeOption.isPresent) {
+      options :+= ("type = '" + tableTypeOption.get() + "'")
+    }
+    if (primaryKeyOption.isPresent) {
+      options :+= ("primaryKey = '" + primaryKeyOption.get() + "'")
+    }
+    if (preCombineFieldOption.isPresent) {
+      options :+= ("preCombineField = '" + preCombineFieldOption.get() + "'")
+    }
+    if (options.length > 0) {
+      createTableQueryBuilder.append(options.mkString("\noptions ( \n", ",\n", 
"\n)"))
+    }
+
+    // Adds partition fields if set
+    val partitionFieldOption = config.getPartitionField
+    if (partitionFieldOption.isPresent) {
+      createTableQueryBuilder.append("\npartitioned by (" + 
partitionFieldOption.get() + ")")
+    }
+
+    if (config.shouldUseCtas()) {
+      // Adds as select query
+      createTableQueryBuilder.append("\nas\n");
+      createTableQueryBuilder.append(constructSelectQuery(inputSchema, 
partitionColumns, inputTableName))
+    }
+    createTableQueryBuilder.toString()
+  }
+
+  /**
+   * Constructs the Spark SQL insert query based on the configs.
+   *
+   * @param insertType      the insert type, in one of two types: "into" or 
"overwrite".
+   * @param targetTableName target table name.
+   * @param schema          table schema to use
+   * @param inputTableName  name of the table containing input data.
+   * @return insert query.
+   */
+  def constructInsertQuery(insertType: String, targetTableName: String, 
schema: StructType,
+                           inputTableName: String): String = {
+    // Constructs insert statement
+    val insertQueryBuilder = new StringBuilder("insert ")
+    insertQueryBuilder.append(insertType)
+    insertQueryBuilder.append(" ")
+    insertQueryBuilder.append(targetTableName)
+    insertQueryBuilder.append(" ")
+    insertQueryBuilder.append(constructSelectQuery(schema, inputTableName))
+    insertQueryBuilder.toString()
+  }
+
+  /**
+   * Constructs the Spark SQL merge query based on the configs.
+   *
+   * @param config          DAG node configurations.
+   * @param targetTableName target table name.
+   * @param schema          table schema to use
+   * @param inputTableName  name of the table containing input data.
+   * @return merge query.
+   */
+  def constructMergeQuery(config: Config, targetTableName: String, schema: 
StructType,
+                          inputTableName: String): String = {
+    val mergeQueryBuilder = new StringBuilder("merge into ")
+    mergeQueryBuilder.append(targetTableName)
+    mergeQueryBuilder.append(" as target using (\n")
+    mergeQueryBuilder.append(constructSelectQuery(schema, inputTableName))
+    mergeQueryBuilder.append("\n) source\non ")
+    mergeQueryBuilder.append(config.getMergeCondition)
+    mergeQueryBuilder.append("\nwhen matched then ")
+    mergeQueryBuilder.append(config.getMatchedAction)
+    mergeQueryBuilder.append("\nwhen not matched then ")
+    mergeQueryBuilder.append(config.getNotMatchedAction)
+    mergeQueryBuilder.toString()
+  }
+
+  /**
+   * Constructs the Spark SQL update query based on the configs.
+   *
+   * @param config          DAG node configurations.
+   * @param sparkSession    Spark session.
+   * @param targetTableName target table name.
+   * @return update query.
+   */
+  def constructUpdateQuery(config: Config, sparkSession: SparkSession,
+                           targetTableName: String): String = {
+    val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, 
targetTableName)
+    val updateQueryBuilder = new StringBuilder("update ")
+    updateQueryBuilder.append(targetTableName)
+    updateQueryBuilder.append(" set ")
+    updateQueryBuilder.append(config.getUpdateColumn)
+    updateQueryBuilder.append(" = ")
+    updateQueryBuilder.append(config.getUpdateColumn)
+    updateQueryBuilder.append(" * 1.6 ")
+    updateQueryBuilder.append(" where ")
+    updateQueryBuilder.append(config.getWhereConditionColumn)
+    updateQueryBuilder.append(" between ")
+    updateQueryBuilder.append(bounds._1)
+    updateQueryBuilder.append(" and ")
+    updateQueryBuilder.append(bounds._2)
+    updateQueryBuilder.toString()
+  }
+
+  /**
+   * Constructs the Spark SQL delete query based on the configs.
+   *
+   * @param config          DAG node configurations.
+   * @param sparkSession    Spark session.
+   * @param targetTableName target table name.
+   * @return delete query.
+   */
+  def constructDeleteQuery(config: Config, sparkSession: SparkSession,
+                           targetTableName: String): String = {
+    val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, 
targetTableName)
+    val deleteQueryBuilder = new StringBuilder("delete from ")
+    deleteQueryBuilder.append(targetTableName)
+    deleteQueryBuilder.append(" where ")
+    deleteQueryBuilder.append(config.getWhereConditionColumn)
+    deleteQueryBuilder.append(" between ")
+    deleteQueryBuilder.append(bounds._1)
+    deleteQueryBuilder.append(" and ")
+    deleteQueryBuilder.append(bounds._2)
+    deleteQueryBuilder.toString()
+  }
+
+  /**
+   * Generates the pair of percentile levels based on the ratio in the config.
+   *
+   * For example, given ratio as 0.4, the output is (0.3, 0.7).
+   *
+   * @param config DAG node configurations.
+   * @return the lower bound and upper bound percentiles.
+   */
+  def generatePercentiles(config: Config): (Double, Double) = {
+    val ratio: Double = config.getRatioRecordsChange
+    (Math.max(0.5 - (ratio / 2.0), 0.0), Math.min(0.5 + (ratio / 2.0), 1.0))
+  }
+
+  /**
+   * @param number input double number
+   * @param mode   rounding mode
+   * @return rounded double
+   */
+  def roundDouble(number: Double, mode: RoundingMode): Double = {
+    BigDecimal(number).setScale(4, mode).toDouble
+  }
+
+  /**
+   * @param config          DAG node configurations.
+   * @param sparkSession    Spark session.
+   * @param targetTableName target table name.
+   * @return lower and upper bound values based on the percentiles.
+   */
+  def getLowerUpperBoundsFromPercentiles(config: Config, sparkSession: 
SparkSession,
+                                         targetTableName: String): (Double, 
Double) = {
+    val percentiles = generatePercentiles(config)
+    val result = sparkSession.sql(constructPercentileQuery(config, 
targetTableName, percentiles)).collect()(0)
+    (roundDouble(result.get(0).asInstanceOf[Double], 
BigDecimal.RoundingMode.HALF_DOWN),
+      roundDouble(result.get(1).asInstanceOf[Double], 
BigDecimal.RoundingMode.HALF_UP))
+  }
+
+  /**
+   * Constructs the query to get percentiles for the where condition.
+   *
+   * @param config          DAG node configurations.
+   * @param targetTableName target table name.
+   * @param percentiles     lower and upper percentiles.
+   * @return percentile query in String.
+   */
+  def constructPercentileQuery(config: Config, targetTableName: String,
+                               percentiles: (Double, Double)): String = {
+    val percentileQueryBuilder = new StringBuilder("select percentile(")
+    percentileQueryBuilder.append(config.getWhereConditionColumn)
+    percentileQueryBuilder.append(", ")
+    percentileQueryBuilder.append(percentiles._1)
+    percentileQueryBuilder.append("), percentile(")
+    percentileQueryBuilder.append(config.getWhereConditionColumn)
+    percentileQueryBuilder.append(", ")
+    percentileQueryBuilder.append(percentiles._2)
+    percentileQueryBuilder.append(") from ")
+    percentileQueryBuilder.append(targetTableName)
+    percentileQueryBuilder.toString()
+  }
+
+  /**
+   * Constructs the Spark SQL query to get update or delete records.
+   *
+   * @param config           DAG node configurations.
+   * @param targetTableName  target table name.
+   * @param avroSchemaString input Avro schema String.
+   * @param lowerBound       lower bound value for the where condition.
+   * @param upperBound       upper bound value for the where condition.
+   * @return delete query.
+   */
+  def constructChangedRecordQuery(config: Config, targetTableName: String, 
avroSchemaString: String,
+                                  lowerBound: Double, upperBound: Double): 
String = {
+    val recordQueryBuilder = new 
StringBuilder(constructSelectQuery(avroSchemaString, Set.empty[String], 
targetTableName))
+    recordQueryBuilder.append(" where ")
+    recordQueryBuilder.append(config.getWhereConditionColumn)
+    recordQueryBuilder.append(" between ")
+    recordQueryBuilder.append(lowerBound)
+    recordQueryBuilder.append(" and ")
+    recordQueryBuilder.append(upperBound)
+    recordQueryBuilder.toString()
+  }
+
+  /**
+   * Generates the exact same records to update based on the SQL derived from 
the
+   * configs for data validation.
+   *
+   * @param config           DAG node configurations.
+   * @param sparkSession     Spark session.
+   * @param avroSchemaString input Avro schema String.
+   * @param targetTableName  target table name.
+   * @param parallelism      parallelism for RDD
+   * @return records in {@link JavaRdd[ GenericRecord ]}.
+   */
+  def generateUpdateRecords(config: Config, sparkSession: SparkSession, 
avroSchemaString: String,
+                            targetTableName: String, parallelism: Int): 
JavaRDD[GenericRecord] = {
+    val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, 
targetTableName)
+    val rows = sparkSession.sql(
+      constructChangedRecordQuery(config, targetTableName, avroSchemaString, 
bounds._1, bounds._2))
+
+    val rdd = HoodieSparkUtils
+      .createRdd(rows, RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
+        RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, 
reconcileToLatestSchema = false, Option.empty())
+      .map(record => {
+        record.put(config.getUpdateColumn, 
record.get(config.getUpdateColumn).toString.toDouble * 1.6)
+        record
+      })
+      .toJavaRDD()
+    val repartitionedRdd = rdd.repartition(parallelism)
+    repartitionedRdd.persist(StorageLevel.DISK_ONLY)
+    repartitionedRdd
+  }
+
+  /**
+   * Generates the exact same records to delete based on the SQL derived from 
the
+   * configs for data validation.
+   *
+   * @param config           DAG node configurations.
+   * @param sparkSession     Spark session.
+   * @param avroSchemaString input Avro schema String.
+   * @param targetTableName  target table name.
+   * @param parallelism      parallelism for RDD
+   * @return records in {@link JavaRdd[ GenericRecord ]}.
+   */
+  def generateDeleteRecords(config: Config, sparkSession: SparkSession, 
avroSchemaString: String,
+                            targetTableName: String, parallelism: Int): 
JavaRDD[GenericRecord] = {
+    val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, 
targetTableName)
+    val rows = sparkSession.sql(
+      constructChangedRecordQuery(config, targetTableName, avroSchemaString, 
bounds._1, bounds._2))
+
+    val rdd = HoodieSparkUtils
+      .createRdd(rows, RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
+        RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, 
reconcileToLatestSchema = false, Option.empty())
+      .map(record => {
+        
record.put(GenericRecordFullPayloadGenerator.DEFAULT_HOODIE_IS_DELETED_COL, 
true)
+        record
+      })
+      .toJavaRDD()
+    val repartitionedRdd = rdd.repartition(parallelism)
+    repartitionedRdd.persist(StorageLevel.DISK_ONLY)
+    repartitionedRdd
+  }
+}
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index be6e552..c32f44d 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -18,12 +18,6 @@
 
 package org.apache.hudi.integ.testsuite.job;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.util.UUID;
-import java.util.stream.Stream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -37,21 +31,31 @@ import 
org.apache.hudi.integ.testsuite.dag.HiveSyncDagGeneratorMOR;
 import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
 import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector;
 import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
+import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
+import org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider;
 import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
 import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.AvroDFSSource;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 /**
  * Unit test against {@link HoodieTestSuiteJob}.
  */
@@ -72,6 +76,9 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase 
{
   private static final String COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES = 
"unit-test-cow-dag-spark-datasource.yaml";
   private static final String COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH = 
"/hudi-integ-test/src/test/resources/unit-test-cow-dag-spark-datasource.yaml";
 
+  private static final String SPARK_SQL_DAG_FILE_NAME = 
"unit-test-spark-sql-dag.yaml";
+  private static final String SPARK_SQL_DAG_SOURCE_PATH = 
"/hudi-integ-test/src/test/resources/" + SPARK_SQL_DAG_FILE_NAME;
+
   public static Stream<Arguments> configParams() {
     Object[][] data =
         new Object[][] {{false, "COPY_ON_WRITE"}};
@@ -102,6 +109,8 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
         + COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, dfs, dfsBasePath + "/" 
+ COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
     UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), dfs, dfsBasePath 
+ "/test-source"
         + ".properties");
+    
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
+        + SPARK_SQL_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + 
SPARK_SQL_DAG_FILE_NAME);
 
     // Properties used for the delta-streamer which incrementally pulls from 
upstream DFS Avro source and
     // writes to downstream hudi table
@@ -269,22 +278,35 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
 3);
   }
 
+  @Test
+  public void testSparkSqlDag() throws Exception {
+    boolean useDeltaStreamer = false;
+    this.cleanDFSDirs();
+    String inputBasePath = dfsBasePath + "/input";
+    String outputBasePath = dfsBasePath + "/result";
+    HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, 
useDeltaStreamer, HoodieTableType
+        .COPY_ON_WRITE.name());
+    cfg.workloadYamlPath = dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME;
+    HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
+    hoodieTestSuiteJob.runTestSuite();
+  }
+
   protected HoodieTestSuiteConfig makeConfig(String inputBasePath, String 
outputBasePath, boolean useDeltaStream,
-      String tableType) {
+                                             String tableType) {
     HoodieTestSuiteConfig cfg = new HoodieTestSuiteConfig();
     cfg.targetBasePath = outputBasePath;
     cfg.inputBasePath = inputBasePath;
     cfg.targetTableName = "table1";
     cfg.tableType = tableType;
     cfg.sourceClassName = AvroDFSSource.class.getName();
-    cfg.sourceOrderingField = "timestamp";
+    cfg.sourceOrderingField = SchemaUtils.SOURCE_ORDERING_FIELD;
     cfg.propsFilePath = dfsBasePath + "/test-source.properties";
     cfg.outputTypeName = DeltaOutputMode.DFS.name();
     cfg.inputFormatName = DeltaInputType.AVRO.name();
     cfg.limitFileSize = 1024 * 1024L;
     cfg.sourceLimit = 20000000;
     cfg.workloadDagGenerator = WorkflowDagGenerator.class.getName();
-    cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
+    cfg.schemaProviderClassName = 
TestSuiteFileBasedSchemaProvider.class.getName();
     cfg.useDeltaStreamer = useDeltaStream;
     return cfg;
   }
diff --git a/hudi-integ-test/src/test/resources/unit-test-spark-sql-dag.yaml 
b/hudi-integ-test/src/test/resources/unit-test-spark-sql-dag.yaml
new file mode 100644
index 0000000..0b4ff07
--- /dev/null
+++ b/hudi-integ-test/src/test/resources/unit-test-spark-sql-dag.yaml
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: unit-test-spark-sql-dag.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  create_table:
+    config:
+      table_type: cow
+      primary_key: _row_key
+      pre_combine_field: test_suite_source_ordering_field
+      partition_field: rider
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlCreateTableNode
+    deps: none
+  insert_records:
+    config:
+      record_size: 1000
+      num_partitions_insert: 1
+      repeat_count: 1
+      num_records_insert: 1000
+    type: spark.sql.SparkSqlInsertNode
+    deps: create_table
+  #merge_records:
+  #  config:
+  #    merge_condition: target._row_key = source._row_key
+  #    matched_action: update set *
+  #    not_matched_action: insert *
+  #    record_size: 1000
+  #    num_partitions_insert: 10
+  #    repeat_count: 1
+  #    num_records_upsert: 100
+  #    num_records_insert: 1000
+  #  type: spark.sql.SparkSqlMergeNode
+  #  deps: insert_records
+  #delete_records:
+  #  config:
+  #    condition_column: begin_lat
+  #    record_size: 1000
+  #    repeat_count: 1
+  #    ratio_records_change: 0.2
+  #  type: spark.sql.SparkSqlDeleteNode
+  #  deps: insert_records
+  #validate:
+  #  config:
+  #    delete_input_data: true
+  #  type: spark.sql.SparkSqlValidateDatasetNode
+  #  deps: delete_records

Reply via email to