This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 316e38c [HUDI-3659] Reducing the validation frequency with integ
tests (#5067)
316e38c is described below
commit 316e38c71e2fec9d7a13417d409374f28875c4f1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Mar 18 09:45:33 2022 -0700
[HUDI-3659] Reducing the validation frequency with integ tests (#5067)
---
.../config/test-suite/cow-spark-long-running.yaml | 8 +-
...treamer-long-running-multi-partitions-hive.yaml | 8 +-
...mer-long-running-multi-partitions-metadata.yaml | 1 +
...eltastreamer-long-running-multi-partitions.yaml | 1 +
...ltastreamer-medium-full-dataset-validation.yaml | 1 +
.../detlastreamer-long-running-example.yaml | 8 +-
.../integ/testsuite/configuration/DeltaConfig.java | 5 +
.../dag/nodes/BaseValidateDatasetNode.java | 143 +++++++++++----------
8 files changed, 87 insertions(+), 88 deletions(-)
diff --git a/docker/demo/config/test-suite/cow-spark-long-running.yaml
b/docker/demo/config/test-suite/cow-spark-long-running.yaml
index c25b95c..795a4a5 100644
--- a/docker/demo/config/test-suite/cow-spark-long-running.yaml
+++ b/docker/demo/config/test-suite/cow-spark-long-running.yaml
@@ -25,11 +25,6 @@ dag_content:
num_records_insert: 10000
type: SparkInsertNode
deps: none
- first_validate:
- config:
- validate_hive: false
- type: ValidateDatasetNode
- deps: first_insert
first_upsert:
config:
record_size: 200
@@ -39,7 +34,7 @@ dag_content:
num_records_upsert: 3000
num_partitions_upsert: 50
type: SparkUpsertNode
- deps: first_validate
+ deps: first_insert
first_delete:
config:
num_partitions_delete: 50
@@ -48,6 +43,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 68d14a0..09dd616 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -47,11 +47,6 @@ dag_content:
engine: "mr"
type: HiveSyncNode
deps: third_insert
- first_validate:
- config:
- validate_hive: false
- type: ValidateDatasetNode
- deps: first_hive_sync
first_upsert:
config:
record_size: 1000
@@ -61,7 +56,7 @@ dag_content:
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
- deps: first_validate
+ deps: first_hive_sync
first_delete:
config:
num_partitions_delete: 50
@@ -76,6 +71,7 @@ dag_content:
deps: first_delete
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
index 0212fdf..b2ab525 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
@@ -59,6 +59,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
index d7b1119..b8f2b68 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -59,6 +59,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
diff --git
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
index 7789864..a20870f 100644
---
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -62,6 +62,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
diff --git
a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 4b2ee7a..1c2f44b 100644
--- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -41,11 +41,6 @@ dag_content:
num_records_insert: 300
deps: second_insert
type: InsertNode
- first_validate:
- config:
- validate_hive: false
- type: ValidateDatasetNode
- deps: third_insert
first_upsert:
config:
record_size: 1000
@@ -55,7 +50,7 @@ dag_content:
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
- deps: first_validate
+ deps: third_insert
first_delete:
config:
num_partitions_delete: 1
@@ -64,6 +59,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 2c39f5f..d728040 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -89,6 +89,7 @@ public class DeltaConfig implements Serializable {
private static String START_PARTITION = "start_partition";
private static String DELETE_INPUT_DATA = "delete_input_data";
private static String VALIDATE_HIVE = "validate_hive";
+ private static String VALIDATE_ONCE_EVERY_ITR = "validate_once_every_itr";
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
private static String VALIDATE_ARCHIVAL = "validate_archival";
private static String VALIDATE_CLEAN = "validate_clean";
@@ -216,6 +217,10 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE,
false).toString());
}
+ public int validateOnceEveryIteration() {
+ return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR,
1).toString());
+ }
+
public boolean isValidateFullData() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA,
false).toString());
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
index 09d44d9..de58bf6 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
@@ -74,81 +74,84 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
@Override
public void execute(ExecutionContext context, int curItrCount) throws
Exception {
-
- SparkSession session =
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
-
- // todo: Fix partitioning schemes. For now, assumes data based
partitioning.
- String inputPath =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
- log.warn("Validation using data from input path " + inputPath);
- // listing batches to be validated
- String inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
- if (log.isDebugEnabled()) {
- FileSystem fs = new Path(inputPathStr)
-
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
- FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
- log.info("fileStatuses length: " + fileStatuses.length);
- for (FileStatus fileStatus : fileStatuses) {
- log.debug("Listing all Micro batches to be validated :: " +
fileStatus.getPath().toString());
- }
- }
-
- Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
-
- // read from hudi and remove meta columns.
- Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context,
inputSnapshotDf.schema());
- if (config.isValidateFullData()) {
- log.debug("Validating full dataset");
- Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
- Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
- long exceptInputCount = exceptInputDf.count();
- long exceptHudiCount = exceptHudiDf.count();
- log.debug("Except input df count " + exceptInputDf + ", except hudi
count " + exceptHudiCount);
- if (exceptInputCount != 0 || exceptHudiCount != 0) {
- log.error("Data set validation failed. Total count in hudi " +
trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
- + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df
except Input df " + exceptHudiCount);
- throw new AssertionError("Hudi contents does not match contents input
data. ");
- }
- } else {
- Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
- long inputCount = inputSnapshotDf.count();
- long outputCount = trimmedHudiDf.count();
- log.debug("Input count: " + inputCount + "; output count: " +
outputCount);
- // the intersected df should be same as inputDf. if not, there is some
mismatch.
- if (outputCount == 0 || inputCount == 0 ||
inputSnapshotDf.except(intersectionDf).count() != 0) {
- log.error("Data set validation failed. Total count in hudi " +
outputCount + ", input df count " + inputCount);
- throw new AssertionError("Hudi contents does not match contents input
data. ");
+ int validateOnceEveryItr = config.validateOnceEveryIteration();
+ int itrCountToExecute = config.getIterationCountToExecute();
+ if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
+ (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) ==
0))) {
+ SparkSession session =
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
+ // todo: Fix partitioning schemes. For now, assumes data based
partitioning.
+ String inputPath =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
+ log.warn("Validation using data from input path " + inputPath);
+ // listing batches to be validated
+ String inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+ if (log.isDebugEnabled()) {
+ FileSystem fs = new Path(inputPathStr)
+
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+ FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+ log.info("fileStatuses length: " + fileStatuses.length);
+ for (FileStatus fileStatus : fileStatuses) {
+ log.debug("Listing all Micro batches to be validated :: " +
fileStatus.getPath().toString());
+ }
}
- if (config.isValidateHive()) {
- String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
- String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
- log.warn("Validating hive table with db : " + database + " and table :
" + tableName);
- session.sql("REFRESH TABLE " + database + "." + tableName);
- Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver,
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
- "test_suite_source_ordering_field FROM " + database + "." +
tableName);
- Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key",
"rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
- "_hoodie_is_deleted", "test_suite_source_ordering_field");
-
- Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
- outputCount = trimmedHudiDf.count();
- log.warn("Input count: " + inputCount + "; output count: " +
outputCount);
+ Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
+
+ // read from hudi and remove meta columns.
+ Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context,
inputSnapshotDf.schema());
+ if (config.isValidateFullData()) {
+ log.debug("Validating full dataset");
+ Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
+ Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
+ long exceptInputCount = exceptInputDf.count();
+ long exceptHudiCount = exceptHudiDf.count();
+ log.debug("Except input df count " + exceptInputDf + ", except hudi
count " + exceptHudiCount);
+ if (exceptInputCount != 0 || exceptHudiCount != 0) {
+ log.error("Data set validation failed. Total count in hudi " +
trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
+ + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df
except Input df " + exceptHudiCount);
+ throw new AssertionError("Hudi contents does not match contents
input data. ");
+ }
+ } else {
+ Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
+ long inputCount = inputSnapshotDf.count();
+ long outputCount = trimmedHudiDf.count();
+ log.debug("Input count: " + inputCount + "; output count: " +
outputCount);
// the intersected df should be same as inputDf. if not, there is some
mismatch.
- if (outputCount == 0 ||
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
- log.error("Data set validation failed for COW hive table. Total
count in hudi " + outputCount + ", input df count " + inputCount);
- throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ if (outputCount == 0 || inputCount == 0 ||
inputSnapshotDf.except(intersectionDf).count() != 0) {
+ log.error("Data set validation failed. Total count in hudi " +
outputCount + ", input df count " + inputCount);
+ throw new AssertionError("Hudi contents does not match contents
input data. ");
}
- }
- // if delete input data is enabled, erase input data.
- if (config.isDeleteInputData()) {
- // clean up input data for current group of writes.
- inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
- FileSystem fs = new Path(inputPathStr)
-
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
- FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
- for (FileStatus fileStatus : fileStatuses) {
- log.debug("Micro batch to be deleted " +
fileStatus.getPath().toString());
- fs.delete(fileStatus.getPath(), true);
+ if (config.isValidateHive()) {
+ String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
+ String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
+ log.warn("Validating hive table with db : " + database + " and table
: " + tableName);
+ session.sql("REFRESH TABLE " + database + "." + tableName);
+ Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver,
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
+ "test_suite_source_ordering_field FROM " + database + "." +
tableName);
+ Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key",
"rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
+ "_hoodie_is_deleted", "test_suite_source_ordering_field");
+
+ Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
+ outputCount = trimmedHudiDf.count();
+ log.warn("Input count: " + inputCount + "; output count: " +
outputCount);
+ // the intersected df should be same as inputDf. if not, there is
some mismatch.
+ if (outputCount == 0 ||
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
+ log.error("Data set validation failed for COW hive table. Total
count in hudi " + outputCount + ", input df count " + inputCount);
+ throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ }
+ }
+
+ // if delete input data is enabled, erase input data.
+ if (config.isDeleteInputData()) {
+ // clean up input data for current group of writes.
+ inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+ FileSystem fs = new Path(inputPathStr)
+
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+ FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+ for (FileStatus fileStatus : fileStatuses) {
+ log.debug("Micro batch to be deleted " +
fileStatus.getPath().toString());
+ fs.delete(fileStatus.getPath(), true);
+ }
}
}
}