This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1379300 [HUDI-3483] Adding insert override nodes to integ test suite
and few clean ups (#4895)
1379300 is described below
commit 1379300b5b4357243ab7279f9a39011296731741
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Feb 26 08:00:15 2022 -0500
[HUDI-3483] Adding insert override nodes to integ test suite and few clean
ups (#4895)
---
.../config/test-suite/insert-overwrite-table.yaml | 104 ++++++++++++++++++++
.../demo/config/test-suite/insert-overwrite.yaml | 106 ++++++++++++++++++++
.../demo/config/test-suite/spark-clustering.yaml | 73 ++++++++++++++
.../{test.properties => test-metadata.properties} | 31 +++---
docker/demo/config/test-suite/test.properties | 6 +-
.../integ/testsuite/configuration/DeltaConfig.java | 10 ++
.../dag/nodes/BaseValidateDatasetNode.java | 107 ++++++++++++---------
.../dag/nodes/DeleteInputDatasetNode.java | 56 +++++++++++
.../integ/testsuite/generator/DeltaGenerator.java | 4 +
.../testsuite/dag/nodes/SparkInsertNode.scala | 7 +-
.../dag/nodes/SparkInsertOverwriteNode.scala | 31 ++++++
.../dag/nodes/SparkInsertOverwriteTableNode.scala | 30 ++++++
.../testsuite/dag/nodes/SparkUpsertNode.scala | 40 +-------
13 files changed, 504 insertions(+), 101 deletions(-)
diff --git a/docker/demo/config/test-suite/insert-overwrite-table.yaml
b/docker/demo/config/test-suite/insert-overwrite-table.yaml
new file mode 100644
index 0000000..8b5a26e
--- /dev/null
+++ b/docker/demo/config/test-suite/insert-overwrite-table.yaml
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: none
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: first_insert
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: first_upsert
+ second_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: second_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: second_upsert
+ first_insert_overwrite_table:
+ config:
+ record_size: 1000
+ repeat_count: 10
+ num_records_insert: 10
+ type: SparkInsertOverwriteTableNode
+ deps: first_hive_sync
+ delete_all_input_except_last:
+ config:
+ delete_input_data_except_latest: true
+ type: DeleteInputDatasetNode
+ deps: first_insert_overwrite_table
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: delete_all_input_except_last
+ third_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: third_insert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_upsert
+ second_validate:
+ config:
+ validate_full_data : true
+ validate_hive: false
+ delete_input_data: false
+ type: ValidateDatasetNode
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/insert-overwrite.yaml
b/docker/demo/config/test-suite/insert-overwrite.yaml
new file mode 100644
index 0000000..f2299c5
--- /dev/null
+++ b/docker/demo/config/test-suite/insert-overwrite.yaml
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: none
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: first_insert
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: first_upsert
+ second_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: second_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: second_upsert
+ first_insert_overwrite:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10
+ type: SparkInsertOverwriteNode
+ deps: first_hive_sync
+ delete_all_input_except_last:
+ config:
+ delete_input_data_except_latest: true
+ type: DeleteInputDatasetNode
+ deps: first_insert_overwrite
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: delete_all_input_except_last
+ third_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: third_insert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_upsert
+ second_validate:
+ config:
+ validate_full_data : true
+ validate_hive: false
+ delete_input_data: false
+ type: ValidateDatasetNode
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/spark-clustering.yaml
b/docker/demo/config/test-suite/spark-clustering.yaml
new file mode 100644
index 0000000..e8e722c
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-clustering.yaml
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: cow-spark-simple.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: none
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: first_insert
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: first_upsert
+ second_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: second_insert
+ first_delete:
+ config:
+ num_partitions_delete: 10
+ num_records_delete: 16000
+ type: SparkDeleteNode
+ deps: second_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
+ second_validate:
+ config:
+ validate_hive: false
+ delete_input_data: false
+ type: ValidateDatasetNode
+ deps: second_hive_sync
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/test.properties
b/docker/demo/config/test-suite/test-metadata.properties
similarity index 69%
copy from docker/demo/config/test-suite/test.properties
copy to docker/demo/config/test-suite/test-metadata.properties
index 30cd1c1..48da77c 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test-metadata.properties
@@ -1,24 +1,29 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
+hoodie.metadata.enable=true
+
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
diff --git a/docker/demo/config/test-suite/test.properties
b/docker/demo/config/test-suite/test.properties
index 30cd1c1..509b9f4 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test.properties
@@ -19,6 +19,8 @@ hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
+hoodie.metadata.enable=false
+
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
@@ -32,10 +34,6 @@ hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
-hoodie.clustering.plan.strategy.sort.columns=_row_key
-hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
-hoodie.clustering.inline.max.commits=1
-
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 56aa390..2c39f5f 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -96,6 +96,8 @@ public class DeltaConfig implements Serializable {
private static String NUM_ROLLBACKS = "num_rollbacks";
private static String ENABLE_ROW_WRITING = "enable_row_writing";
private static String ENABLE_METADATA_VALIDATE =
"enable_metadata_validate";
+ private static String VALIDATE_FULL_DATA = "validate_full_data";
+ private static String DELETE_INPUT_DATA_EXCEPT_LATEST =
"delete_input_data_except_latest";
// Spark SQL Create Table
private static String TABLE_TYPE = "table_type";
@@ -206,10 +208,18 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA,
false).toString());
}
+ public boolean isDeleteInputDataExceptLatest() {
+ return
Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA_EXCEPT_LATEST,
false).toString());
+ }
+
public boolean isValidateHive() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE,
false).toString());
}
+ public boolean isValidateFullData() {
+ return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA,
false).toString());
+ }
+
public int getIterationCountToExecute() {
return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT,
-1).toString());
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
index 986e973..09d44d9 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
@@ -19,16 +19,14 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
@@ -42,13 +40,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
+import java.util.List;
+import java.util.stream.Collectors;
+
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
-import java.util.List;
-import java.util.stream.Collectors;
-
/**
* This nodes validates contents from input path are in tact with Hudi. By
default no configs are required for this node. But there is an
* optional config "delete_input_data" that you can set for this node. If set,
once validation completes, contents from inputPath are deleted. This will come
in handy for long running test suites.
@@ -78,6 +76,7 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
public void execute(ExecutionContext context, int curItrCount) throws
Exception {
SparkSession session =
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
+
// todo: Fix partitioning schemes. For now, assumes data based
partitioning.
String inputPath =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
log.warn("Validation using data from input path " + inputPath);
@@ -97,46 +96,60 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
// read from hudi and remove meta columns.
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context,
inputSnapshotDf.schema());
- Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
- long inputCount = inputSnapshotDf.count();
- long outputCount = trimmedHudiDf.count();
- log.debug("Input count: " + inputCount + "; output count: " + outputCount);
- // the intersected df should be same as inputDf. if not, there is some
mismatch.
- if (outputCount == 0 || inputCount == 0 ||
inputSnapshotDf.except(intersectionDf).count() != 0) {
- log.error("Data set validation failed. Total count in hudi " +
outputCount + ", input df count " + inputCount);
- throw new AssertionError("Hudi contents does not match contents input
data. ");
- }
-
- if (config.isValidateHive()) {
- String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
- String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
- log.warn("Validating hive table with db : " + database + " and table : "
+ tableName);
- session.sql("REFRESH TABLE " + database + "." + tableName);
- Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver,
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
- "test_suite_source_ordering_field FROM " + database + "." +
tableName);
- Dataset<Row> reorderedInputDf =
inputSnapshotDf.select("_row_key","rider","driver","begin_lat","begin_lon","end_lat","end_lon","fare",
- "_hoodie_is_deleted","test_suite_source_ordering_field");
-
- Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
- outputCount = trimmedHudiDf.count();
- log.warn("Input count: " + inputCount + "; output count: " +
outputCount);
+ if (config.isValidateFullData()) {
+ log.debug("Validating full dataset");
+ Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
+ Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
+ long exceptInputCount = exceptInputDf.count();
+ long exceptHudiCount = exceptHudiDf.count();
+ log.debug("Except input df count " + exceptInputDf + ", except hudi
count " + exceptHudiCount);
+ if (exceptInputCount != 0 || exceptHudiCount != 0) {
+ log.error("Data set validation failed. Total count in hudi " +
trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
+ + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df
except Input df " + exceptHudiCount);
+ throw new AssertionError("Hudi contents does not match contents input
data. ");
+ }
+ } else {
+ Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
+ long inputCount = inputSnapshotDf.count();
+ long outputCount = trimmedHudiDf.count();
+ log.debug("Input count: " + inputCount + "; output count: " +
outputCount);
// the intersected df should be same as inputDf. if not, there is some
mismatch.
- if (outputCount == 0 ||
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
- log.error("Data set validation failed for COW hive table. Total count
in hudi " + outputCount + ", input df count " + inputCount);
- throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ if (outputCount == 0 || inputCount == 0 ||
inputSnapshotDf.except(intersectionDf).count() != 0) {
+ log.error("Data set validation failed. Total count in hudi " +
outputCount + ", input df count " + inputCount);
+ throw new AssertionError("Hudi contents does not match contents input
data. ");
}
- }
- // if delete input data is enabled, erase input data.
- if (config.isDeleteInputData()) {
- // clean up input data for current group of writes.
- inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
- FileSystem fs = new Path(inputPathStr)
-
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
- FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
- for (FileStatus fileStatus : fileStatuses) {
- log.debug("Micro batch to be deleted " +
fileStatus.getPath().toString());
- fs.delete(fileStatus.getPath(), true);
+ if (config.isValidateHive()) {
+ String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
+ String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
+ log.warn("Validating hive table with db : " + database + " and table :
" + tableName);
+ session.sql("REFRESH TABLE " + database + "." + tableName);
+ Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver,
begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
+ "test_suite_source_ordering_field FROM " + database + "." +
tableName);
+ Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key",
"rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
+ "_hoodie_is_deleted", "test_suite_source_ordering_field");
+
+ Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
+ outputCount = trimmedHudiDf.count();
+ log.warn("Input count: " + inputCount + "; output count: " +
outputCount);
+ // the intersected df should be same as inputDf. if not, there is some
mismatch.
+ if (outputCount == 0 ||
reorderedInputDf.except(intersectedHiveDf).count() != 0) {
+ log.error("Data set validation failed for COW hive table. Total
count in hudi " + outputCount + ", input df count " + inputCount);
+ throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ }
+ }
+
+ // if delete input data is enabled, erase input data.
+ if (config.isDeleteInputData()) {
+ // clean up input data for current group of writes.
+ inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+ FileSystem fs = new Path(inputPathStr)
+
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+ FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+ for (FileStatus fileStatus : fileStatuses) {
+ log.debug("Micro batch to be deleted " +
fileStatus.getPath().toString());
+ fs.delete(fileStatus.getPath(), true);
+ }
}
}
}
@@ -149,8 +162,8 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
ExpressionEncoder encoder = getEncoder(inputDf.schema());
return inputDf.groupByKey(
- (MapFunction<Row, String>) value ->
- value.getAs(partitionPathField) + "+" +
value.getAs(recordKeyField), Encoders.STRING())
+ (MapFunction<Row, String>) value ->
+ value.getAs(partitionPathField) + "+" +
value.getAs(recordKeyField), Encoders.STRING())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java
new file mode 100644
index 0000000..2836f24
--- /dev/null
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Deletes all input except latest batch. Mostly used in insert_overwrite
operations.
+ */
+public class DeleteInputDatasetNode extends DagNode<Boolean> {
+
+ public DeleteInputDatasetNode(DeltaConfig.Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void execute(ExecutionContext context, int curItrCount) throws
Exception {
+
+ String latestBatch =
String.valueOf(context.getWriterContext().getDeltaGenerator().getBatchId());
+
+ if (config.isDeleteInputDataExceptLatest()) {
+ String inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+ FileSystem fs = new Path(inputPathStr)
+
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+ FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+ for (FileStatus fileStatus : fileStatuses) {
+ if (!fileStatus.getPath().getName().equals(latestBatch)) {
+ log.debug("Micro batch to be deleted " +
fileStatus.getPath().toString());
+ fs.delete(fileStatus.getPath(), true);
+ }
+ }
+ }
+ }
+}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 6d5bc4f..69e32df 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -110,6 +110,10 @@ public class DeltaGenerator implements Serializable {
return ws;
}
+ public int getBatchId() {
+ return batchId;
+ }
+
public JavaRDD<GenericRecord> generateInserts(Config operation) {
int numPartitions = operation.getNumInsertPartitions();
long recordsPerPartition = operation.getNumRecordsInsert();
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
index db17a6e..b8c46ca 100644
---
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
@@ -54,13 +54,18 @@ class SparkInsertNode(dagNodeConfig: Config) extends
DagNode[RDD[WriteStatus]] {
context.getWriterContext.getSparkSession)
inputDF.write.format("hudi")
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(),
"test_suite_source_ordering_field")
.option(DataSourceWriteOptions.TABLE_NAME.key,
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.option(DataSourceWriteOptions.TABLE_TYPE.key,
context.getHoodieTestSuiteWriter.getCfg.tableType)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.OPERATION.key, getOperation())
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key,
"deltastreamer.checkpoint.key")
.option("deltastreamer.checkpoint.key",
context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
.option(HoodieWriteConfig.TBL_NAME.key,
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
}
+
+ def getOperation(): String = {
+ DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
+ }
}
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala
new file mode 100644
index 0000000..6dd2eac
--- /dev/null
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+
+class SparkInsertOverwriteNode(dagNodeConfig: Config) extends
SparkInsertNode(dagNodeConfig) {
+
+ override def getOperation(): String = {
+ DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL
+ }
+
+}
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala
new file mode 100644
index 0000000..a6b80b3
--- /dev/null
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+
+class SparkInsertOverwriteTableNode(dagNodeConfig: Config) extends
SparkInsertNode(dagNodeConfig) {
+
+ override def getOperation(): String = {
+ DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ }
+}
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
index 858827a..113de93 100644
---
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala
@@ -18,49 +18,17 @@
package org.apache.hudi.integ.testsuite.dag.nodes
-import org.apache.hudi.client.WriteStatus
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
-import org.apache.hudi.integ.testsuite.dag.ExecutionContext
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SaveMode
-
-import scala.collection.JavaConverters._
/**
* Spark datasource based upsert node
*
* @param dagNodeConfig DAG node configurations.
*/
-class SparkUpsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]]
{
-
- config = dagNodeConfig
+class SparkUpsertNode(dagNodeConfig: Config) extends
SparkInsertNode(dagNodeConfig) {
- /**
- * Execute the {@link DagNode}.
- *
- * @param context The context needed for an execution of a node.
- * @param curItrCount iteration count for executing the node.
- * @throws Exception Thrown if the execution failed.
- */
- override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
- if (!config.isDisableGenerate) {
- println("Generating input data for node {}", this.getName)
-
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
- }
- val inputDF =
AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
- context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
- context.getWriterContext.getSparkSession)
- inputDF.write.format("hudi")
-
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
- .option(DataSourceWriteOptions.TABLE_NAME.key,
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
- .option(DataSourceWriteOptions.TABLE_TYPE.key,
context.getHoodieTestSuiteWriter.getCfg.tableType)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key,
"deltastreamer.checkpoint.key")
- .option("deltastreamer.checkpoint.key",
context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
- .option(HoodieWriteConfig.TBL_NAME.key,
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
- .mode(SaveMode.Append)
- .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
+ override def getOperation(): String = {
+ DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL
}
}