This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 316e38c  [HUDI-3659] Reducing the validation frequency with integ 
tests (#5067)
316e38c is described below

commit 316e38c71e2fec9d7a13417d409374f28875c4f1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Mar 18 09:45:33 2022 -0700

    [HUDI-3659] Reducing the validation frequency with integ tests (#5067)
---
 .../config/test-suite/cow-spark-long-running.yaml  |   8 +-
 ...treamer-long-running-multi-partitions-hive.yaml |   8 +-
 ...mer-long-running-multi-partitions-metadata.yaml |   1 +
 ...eltastreamer-long-running-multi-partitions.yaml |   1 +
 ...ltastreamer-medium-full-dataset-validation.yaml |   1 +
 .../detlastreamer-long-running-example.yaml        |   8 +-
 .../integ/testsuite/configuration/DeltaConfig.java |   5 +
 .../dag/nodes/BaseValidateDatasetNode.java         | 143 +++++++++++----------
 8 files changed, 87 insertions(+), 88 deletions(-)

diff --git a/docker/demo/config/test-suite/cow-spark-long-running.yaml 
b/docker/demo/config/test-suite/cow-spark-long-running.yaml
index c25b95c..795a4a5 100644
--- a/docker/demo/config/test-suite/cow-spark-long-running.yaml
+++ b/docker/demo/config/test-suite/cow-spark-long-running.yaml
@@ -25,11 +25,6 @@ dag_content:
       num_records_insert: 10000
     type: SparkInsertNode
     deps: none
-  first_validate:
-    config:
-      validate_hive: false
-    type: ValidateDatasetNode
-    deps: first_insert
   first_upsert:
     config:
       record_size: 200
@@ -39,7 +34,7 @@ dag_content:
       num_records_upsert: 3000
       num_partitions_upsert: 50
     type: SparkUpsertNode
-    deps: first_validate
+    deps: first_insert
   first_delete:
     config:
       num_partitions_delete: 50
@@ -48,6 +43,7 @@ dag_content:
     deps: first_upsert
   second_validate:
     config:
+      validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
     type: ValidateDatasetNode
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 68d14a0..09dd616 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -47,11 +47,6 @@ dag_content:
       engine: "mr"
     type: HiveSyncNode
     deps: third_insert
-  first_validate:
-    config:
-      validate_hive: false
-    type: ValidateDatasetNode
-    deps: first_hive_sync
   first_upsert:
     config:
       record_size: 1000
@@ -61,7 +56,7 @@ dag_content:
       num_records_upsert: 100
       num_partitions_upsert: 1
     type: UpsertNode
-    deps: first_validate
+    deps: first_hive_sync
   first_delete:
     config:
       num_partitions_delete: 50
@@ -76,6 +71,7 @@ dag_content:
     deps: first_delete
   second_validate:
     config:
+      validate_once_every_itr : 5
       validate_hive: true
       delete_input_data: true
     type: ValidateDatasetNode
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
index 0212fdf..b2ab525 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
@@ -59,6 +59,7 @@ dag_content:
     deps: first_upsert
   second_validate:
     config:
+      validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
     type: ValidateDatasetNode
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
index d7b1119..b8f2b68 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -59,6 +59,7 @@ dag_content:
     deps: first_upsert
   second_validate:
     config:
+      validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
     type: ValidateDatasetNode
diff --git 
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
 
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
index 7789864..a20870f 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -62,6 +62,7 @@ dag_content:
     deps: first_upsert
   second_validate:
     config:
+      validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: false
     type: ValidateDatasetNode
diff --git 
a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml 
b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 4b2ee7a..1c2f44b 100644
--- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -41,11 +41,6 @@ dag_content:
       num_records_insert: 300
     deps: second_insert
     type: InsertNode
-  first_validate:
-    config:
-      validate_hive: false
-    type: ValidateDatasetNode
-    deps: third_insert
   first_upsert:
     config:
       record_size: 1000
@@ -55,7 +50,7 @@ dag_content:
       num_records_upsert: 100
       num_partitions_upsert: 1
     type: UpsertNode
-    deps: first_validate
+    deps: third_insert
   first_delete:
     config:
       num_partitions_delete: 1
@@ -64,6 +59,7 @@ dag_content:
     deps: first_upsert
   second_validate:
     config:
+      validate_once_every_itr : 5
       validate_hive: false
       delete_input_data: true
     type: ValidateDatasetNode
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 2c39f5f..d728040 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -89,6 +89,7 @@ public class DeltaConfig implements Serializable {
     private static String START_PARTITION = "start_partition";
     private static String DELETE_INPUT_DATA = "delete_input_data";
     private static String VALIDATE_HIVE = "validate_hive";
+    private static String VALIDATE_ONCE_EVERY_ITR = "validate_once_every_itr";
     private static String EXECUTE_ITR_COUNT = "execute_itr_count";
     private static String VALIDATE_ARCHIVAL = "validate_archival";
     private static String VALIDATE_CLEAN = "validate_clean";
@@ -216,6 +217,10 @@ public class DeltaConfig implements Serializable {
       return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, 
false).toString());
     }
 
+    public int validateOnceEveryIteration() {
+      return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR, 
1).toString());
+    }
+
     public boolean isValidateFullData() {
       return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, 
false).toString());
     }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
index 09d44d9..de58bf6 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
@@ -74,81 +74,84 @@ public abstract class BaseValidateDatasetNode extends 
DagNode<Boolean> {
 
   @Override
   public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
-
-    SparkSession session = 
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
-
-    // todo: Fix partitioning schemes. For now, assumes data based 
partitioning.
-    String inputPath = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
-    log.warn("Validation using data from input path " + inputPath);
-    // listing batches to be validated
-    String inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-    if (log.isDebugEnabled()) {
-      FileSystem fs = new Path(inputPathStr)
-          
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
-      FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
-      log.info("fileStatuses length: " + fileStatuses.length);
-      for (FileStatus fileStatus : fileStatuses) {
-        log.debug("Listing all Micro batches to be validated :: " + 
fileStatus.getPath().toString());
-      }
-    }
-
-    Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
-
-    // read from hudi and remove meta columns.
-    Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, 
inputSnapshotDf.schema());
-    if (config.isValidateFullData()) {
-      log.debug("Validating full dataset");
-      Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
-      Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
-      long exceptInputCount = exceptInputDf.count();
-      long exceptHudiCount = exceptHudiDf.count();
-      log.debug("Except input df count " + exceptInputDf + ", except hudi 
count " + exceptHudiCount);
-      if (exceptInputCount != 0 || exceptHudiCount != 0) {
-        log.error("Data set validation failed. Total count in hudi " + 
trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
-            + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df 
except Input df " + exceptHudiCount);
-        throw new AssertionError("Hudi contents does not match contents input 
data. ");
-      }
-    } else {
-      Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
-      long inputCount = inputSnapshotDf.count();
-      long outputCount = trimmedHudiDf.count();
-      log.debug("Input count: " + inputCount + "; output count: " + 
outputCount);
-      // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-      if (outputCount == 0 || inputCount == 0 || 
inputSnapshotDf.except(intersectionDf).count() != 0) {
-        log.error("Data set validation failed. Total count in hudi " + 
outputCount + ", input df count " + inputCount);
-        throw new AssertionError("Hudi contents does not match contents input 
data. ");
+    int validateOnceEveryItr = config.validateOnceEveryIteration();
+    int itrCountToExecute = config.getIterationCountToExecute();
+    if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
+        (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 
0))) {
+      SparkSession session = 
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
+      // todo: Fix partitioning schemes. For now, assumes data based 
partitioning.
+      String inputPath = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
+      log.warn("Validation using data from input path " + inputPath);
+      // listing batches to be validated
+      String inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+      if (log.isDebugEnabled()) {
+        FileSystem fs = new Path(inputPathStr)
+            
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+        FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+        log.info("fileStatuses length: " + fileStatuses.length);
+        for (FileStatus fileStatus : fileStatuses) {
+          log.debug("Listing all Micro batches to be validated :: " + 
fileStatus.getPath().toString());
+        }
       }
 
-      if (config.isValidateHive()) {
-        String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
-        String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
-        log.warn("Validating hive table with db : " + database + " and table : 
" + tableName);
-        session.sql("REFRESH TABLE " + database + "." + tableName);
-        Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, 
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
-            "test_suite_source_ordering_field FROM " + database + "." + 
tableName);
-        Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", 
"rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
-            "_hoodie_is_deleted", "test_suite_source_ordering_field");
-
-        Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
-        outputCount = trimmedHudiDf.count();
-        log.warn("Input count: " + inputCount + "; output count: " + 
outputCount);
+      Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
+
+      // read from hudi and remove meta columns.
+      Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, 
inputSnapshotDf.schema());
+      if (config.isValidateFullData()) {
+        log.debug("Validating full dataset");
+        Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
+        Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
+        long exceptInputCount = exceptInputDf.count();
+        long exceptHudiCount = exceptHudiDf.count();
+        log.debug("Except input df count " + exceptInputDf + ", except hudi 
count " + exceptHudiCount);
+        if (exceptInputCount != 0 || exceptHudiCount != 0) {
+          log.error("Data set validation failed. Total count in hudi " + 
trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
+              + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df 
except Input df " + exceptHudiCount);
+          throw new AssertionError("Hudi contents does not match contents 
input data. ");
+        }
+      } else {
+        Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
+        long inputCount = inputSnapshotDf.count();
+        long outputCount = trimmedHudiDf.count();
+        log.debug("Input count: " + inputCount + "; output count: " + 
outputCount);
         // the intersected df should be same as inputDf. if not, there is some 
mismatch.
-        if (outputCount == 0 || 
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
-          log.error("Data set validation failed for COW hive table. Total 
count in hudi " + outputCount + ", input df count " + inputCount);
-          throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
+        if (outputCount == 0 || inputCount == 0 || 
inputSnapshotDf.except(intersectionDf).count() != 0) {
+          log.error("Data set validation failed. Total count in hudi " + 
outputCount + ", input df count " + inputCount);
+          throw new AssertionError("Hudi contents does not match contents 
input data. ");
         }
-      }
 
-      // if delete input data is enabled, erase input data.
-      if (config.isDeleteInputData()) {
-        // clean up input data for current group of writes.
-        inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
-        FileSystem fs = new Path(inputPathStr)
-            
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
-        FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
-        for (FileStatus fileStatus : fileStatuses) {
-          log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
-          fs.delete(fileStatus.getPath(), true);
+        if (config.isValidateHive()) {
+          String database = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
+          String tableName = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
+          log.warn("Validating hive table with db : " + database + " and table 
: " + tableName);
+          session.sql("REFRESH TABLE " + database + "." + tableName);
+          Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, 
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
+              "test_suite_source_ordering_field FROM " + database + "." + 
tableName);
+          Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", 
"rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
+              "_hoodie_is_deleted", "test_suite_source_ordering_field");
+
+          Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
+          outputCount = trimmedHudiDf.count();
+          log.warn("Input count: " + inputCount + "; output count: " + 
outputCount);
+          // the intersected df should be same as inputDf. if not, there is 
some mismatch.
+          if (outputCount == 0 || 
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
+            log.error("Data set validation failed for COW hive table. Total 
count in hudi " + outputCount + ", input df count " + inputCount);
+            throw new AssertionError("Hudi hive table contents does not match 
contents input data. ");
+          }
+        }
+
+        // if delete input data is enabled, erase input data.
+        if (config.isDeleteInputData()) {
+          // clean up input data for current group of writes.
+          inputPathStr = 
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+          FileSystem fs = new Path(inputPathStr)
+              
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+          FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+          for (FileStatus fileStatus : fileStatuses) {
+            log.debug("Micro batch to be deleted " + 
fileStatus.getPath().toString());
+            fs.delete(fileStatus.getPath(), true);
+          }
         }
       }
     }

Reply via email to