This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8cf6a72 [HUDI-1331] Adding support for validating entire dataset and
long running tests in test suite framework (#2168)
8cf6a72 is described below
commit 8cf6a7223f5fa6c2014a7351198ac6d1ebd26621
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Dec 26 12:29:24 2020 -0500
[HUDI-1331] Adding support for validating entire dataset and long running
tests in test suite framework (#2168)
* trigger rebuild
* [HUDI-1156] Remove unused dependencies from HoodieDeltaStreamerWrapper
Class (#1927)
* Adding support for validating records and long running tests in test
sutie framework
* Adding partial validate node
* Fixing spark session initiation in Validate nodes
* Fixing validation
* Adding hive table validation to ValidateDatasetNode
* Rebasing with latest commits from master
* Addressing feedback
* Addressing comments
Co-authored-by: lamber-ken <[email protected]>
Co-authored-by: linshan-ma <[email protected]>
---
docker/demo/config/test-suite/complex-dag-cow.yaml | 172 ++++++-----------
docker/demo/config/test-suite/complex-dag-mor.yaml | 204 +++++++++++----------
.../test-suite/cow-long-running-example.yaml | 68 +++++++
.../demo/config/test-suite/test-source.properties | 37 ----
docker/demo/config/test-suite/test.properties | 8 +
hudi-integ-test/README.md | 187 ++++++++++++++++++-
.../testsuite/HoodieDeltaStreamerWrapper.java | 6 +-
.../hudi/integ/testsuite/HoodieTestSuiteJob.java | 34 ++--
.../integ/testsuite/configuration/DeltaConfig.java | 5 +
.../integ/testsuite/converter/UpdateConverter.java | 8 +-
.../apache/hudi/integ/testsuite/dag/DagUtils.java | 36 +++-
.../testsuite/dag/SimpleWorkflowDagGenerator.java | 8 +-
.../hudi/integ/testsuite/dag/WorkflowDag.java | 29 ++-
.../hudi/integ/testsuite/dag/WriterContext.java | 11 +-
.../hudi/integ/testsuite/dag/nodes/DagNode.java | 11 ++
.../dag/{WorkflowDag.java => nodes/DelayNode.java} | 26 +--
.../testsuite/dag/nodes/ValidateDatasetNode.java | 148 +++++++++++++++
.../testsuite/dag/scheduler/DagScheduler.java | 57 +++---
.../integ/testsuite/generator/DeltaGenerator.java | 19 +-
.../FlexibleSchemaRecordGenerationIterator.java | 7 +-
.../GenericRecordFullPayloadGenerator.java | 14 +-
.../generator/UpdateGeneratorIterator.java | 10 +-
.../helpers/DFSTestSuitePathSelector.java | 32 ++--
.../reader/DFSHoodieDatasetInputReader.java | 82 +++++----
.../WorkflowDag.java => schema/SchemaUtils.java} | 20 +-
.../schema/TestSuiteFileBasedSchemaProvider.java | 66 +++++++
.../testsuite/writer/DFSDeltaWriterAdapter.java | 29 ++-
.../integ/testsuite/writer/DeltaWriterFactory.java | 13 +-
.../configuration/TestWorkflowBuilder.java | 15 +-
.../hudi/integ/testsuite/dag/TestDagUtils.java | 3 +
.../src/test/resources/unit-test-cow-dag.yaml | 114 ++++++------
.../src/test/resources/unit-test-mor-dag.yaml | 114 ++++++------
.../utilities/schema/FilebasedSchemaProvider.java | 4 +-
33 files changed, 1051 insertions(+), 546 deletions(-)
diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml
b/docker/demo/config/test-suite/complex-dag-cow.yaml
index a10026c..5fa8596 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/complex-dag-cow.yaml
@@ -13,122 +13,56 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-first_insert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- repeat_count: 1
- num_records_insert: 1000
- type: InsertNode
- deps: none
-second_insert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- repeat_count: 1
- num_records_insert: 10000
- deps: first_insert
- type: InsertNode
-third_insert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- repeat_count: 1
- num_records_insert: 300
- deps: second_insert
- type: InsertNode
-first_rollback:
- config:
- deps: third_insert
- type: RollbackNode
-first_upsert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_upsert_partitions: 10
- type: UpsertNode
- deps: first_rollback
-first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_upsert
-first_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 11300
- type: HiveQueryNode
- deps: first_hive_sync
-second_upsert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_upsert_partitions: 10
- type: UpsertNode
- deps: first_hive_query
-second_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 11600
- type: HiveQueryNode
- deps: second_upsert
-fourth_insert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- repeat_count: 1
- num_records_insert: 1000
- deps: second_hive_query
- type: InsertNode
-third_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 12600
- type: HiveQueryNode
- deps: fourth_insert
-first_delete:
- config:
- record_size: 70000
- num_partitions_delete: 1
- num_records_delete: 200
- deps: third_hive_query
- type: DeleteNode
-fourth_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
-fourth_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 12400
- type: HiveQueryNode
- deps: fourth_hive_sync
\ No newline at end of file
+dag_name: cow-long-running-example.yaml
+dag_rounds: 2
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_validate:
+ config:
+ type: ValidateDatasetNode
+ deps: third_insert
+ first_upsert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 1
+ type: UpsertNode
+ deps: first_validate
+ first_delete:
+ config:
+ num_partitions_delete: 1
+ num_records_delete: 2000
+ type: DeleteNode
+ deps: first_upsert
+ second_validate:
+ config:
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: first_delete
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/complex-dag-mor.yaml
b/docker/demo/config/test-suite/complex-dag-mor.yaml
index 2652b03..505e5e2 100644
--- a/docker/demo/config/test-suite/complex-dag-mor.yaml
+++ b/docker/demo/config/test-suite/complex-dag-mor.yaml
@@ -13,103 +13,107 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-first_insert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- repeat_count: 1
- num_records_insert: 100
- type: InsertNode
- deps: none
-second_insert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- repeat_count: 1
- num_records_insert: 100
- deps: first_insert
- type: InsertNode
-third_insert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- repeat_count: 1
- num_records_insert: 300
- deps: second_insert
- type: InsertNode
-first_rollback:
- config:
- deps: third_insert
- type: RollbackNode
-first_upsert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_upsert_partitions: 10
- type: UpsertNode
- deps: first_rollback
-first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_upsert
-first_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveQueryNode
- deps: first_hive_sync
-second_upsert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_upsert_partitions: 10
- type: UpsertNode
- deps: first_hive_query
-second_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 1100
- type: HiveQueryNode
- deps: second_upsert
-first_schedule_compact:
- config:
- type: ScheduleCompactNode
- deps: second_hive_query
-third_upsert:
- config:
- record_size: 70000
- num_insert_partitions: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_upsert_partitions: 10
- type: UpsertNode
- deps: first_schedule_compact
-first_compact:
- config:
- type: CompactNode
- deps: first_schedule_compact
-third_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 1400
- type: HiveQueryNode
- deps: first_compact
+dag_name: complex-dag-mor.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 10
+dag_content:
+ first_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 5
+ num_records_insert: 100
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 5
+ num_records_insert: 100
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 2
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_rollback:
+ config:
+ deps: third_insert
+ type: RollbackNode
+ first_upsert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 10
+ type: UpsertNode
+ deps: first_rollback
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_upsert
+ first_hive_query:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveQueryNode
+ deps: first_hive_sync
+ second_upsert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 10
+ type: UpsertNode
+ deps: first_hive_query
+ second_hive_query:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ hive_queries:
+ query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
+ result1: 0
+ query2: "select count(*) from testdb.table1"
+ result2: 1100
+ type: HiveQueryNode
+ deps: second_upsert
+ first_schedule_compact:
+ config:
+ type: ScheduleCompactNode
+ deps: second_hive_query
+ third_upsert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 10
+ type: UpsertNode
+ deps: first_schedule_compact
+ first_compact:
+ config:
+ type: CompactNode
+ deps: first_schedule_compact
+ third_hive_query:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ hive_queries:
+ query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
+ result1: 0
+ query2: "select count(*) from testdb.table1"
+ result2: 1400
+ type: HiveQueryNode
+ deps: first_compact
diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml
b/docker/demo/config/test-suite/cow-long-running-example.yaml
new file mode 100644
index 0000000..b7026f2
--- /dev/null
+++ b/docker/demo/config/test-suite/cow-long-running-example.yaml
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: cow-long-running-example.yaml
+dag_rounds: 20
+dag_intermittent_delay_mins: 10
+dag_content:
+ first_insert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_validate:
+ config:
+ type: ValidateDatasetNode
+ deps: third_insert
+ first_upsert:
+ config:
+ record_size: 100
+ num_partitions_insert: 1
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 1
+ type: UpsertNode
+ deps: first_validate
+ first_delete:
+ config:
+ num_partitions_delete: 1
+ num_records_delete: 2000
+ type: DeleteNode
+ deps: first_upsert
+ second_validate:
+ config:
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: first_delete
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/test-source.properties
b/docker/demo/config/test-suite/test-source.properties
deleted file mode 100644
index cc18a39..0000000
--- a/docker/demo/config/test-suite/test-source.properties
+++ /dev/null
@@ -1,37 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# write configs
-hoodie.datasource.write.recordkey.field=_row_key
-hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
-hoodie.datasource.write.partitionpath.field=timestamp
-
-
-# deltastreamer configs
-hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
-hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
-hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-bench/input
-hoodie.deltastreamer.schemaprovider.source.schema.file=/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
-hoodie.deltastreamer.schemaprovider.target.schema.file=/var/hoodie/ws/docker/demo/config/bench/source.avsc
-
-#hive sync
-hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
-hoodie.datasource.hive_sync.database=testdb
-hoodie.datasource.hive_sync.table=table1
-hoodie.datasource.hive_sync.use_jdbc=false
-hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
-hoodie.datasource.hive_sync.assume_date_partitioning=true
-hoodie.datasource.hive_sync.use_pre_apache_input_format=true
diff --git a/docker/demo/config/test-suite/test.properties
b/docker/demo/config/test-suite/test.properties
index a7fd398..0aa0f45 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test.properties
@@ -1,3 +1,4 @@
+
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
@@ -8,6 +9,13 @@ hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.insert.shuffle.parallelism=100
+hoodie.upsert.shuffle.parallelism=100
+hoodie.bulkinsert.shuffle.parallelism=100
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
index a6cdd08..ff64ed1 100644
--- a/hudi-integ-test/README.md
+++ b/hudi-integ-test/README.md
@@ -142,7 +142,9 @@ Start the Hudi Docker demo:
docker/setup_demo.sh
```
-NOTE: We need to make a couple of environment changes for Hive 2.x support.
This will be fixed once Hudi moves to Spark 3.x
+NOTE: We need to make a couple of environment changes for Hive 2.x support.
This will be fixed once Hudi moves to Spark 3.x.
+Execute below if you are using Hudi query node in your dag. If not, below
section is not required.
+Also, for longer running tests, go to next section.
```
docker exec -it adhoc-2 bash
@@ -214,7 +216,7 @@ spark-submit \
--conf spark.sql.catalogImplementation=hive \
--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \
/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \
---source-ordering-field timestamp \
+--source-ordering-field test_suite_source_ordering_field \
--use-deltastreamer \
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
@@ -253,7 +255,7 @@ spark-submit \
--conf spark.sql.catalogImplementation=hive \
--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \
/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \
---source-ordering-field timestamp \
+--source-ordering-field test_suite_source_ordering_field \
--use-deltastreamer \
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
@@ -267,3 +269,182 @@ spark-submit \
--table-type MERGE_ON_READ \
--compact-scheduling-minshare 1
```
+
+For long running test suite, validation has to be done differently. Idea is to
run same dag in a repeated manner.
+Hence "ValidateDatasetNode" is introduced which will read entire input data
and compare it with hudi contents both via
+spark datasource and hive table via spark sql engine.
+
+If you have "ValidateDatasetNode" in your dag, do not replace hive jars as
instructed above. Spark sql engine does not
+go well w/ hive2* jars. So, after running docker setup, just copy
test.properties and your dag of interest and you are
+good to go ahead.
+
+For repeated runs, two additional configs need to be set. "dag_rounds" and
"dag_intermittent_delay_mins".
+This means that your dag will be repeated for N times w/ a delay of Y mins
between each round.
+
+Also, ValidateDatasetNode can be configured in two ways. Either with
"delete_input_data: true" set or not set.
+When "delete_input_data" is set for ValidateDatasetNode, once validation is
complete, entire input data will be deleted.
+So, suggestion is to use this ValidateDatasetNode as the last node in the dag
with "delete_input_data".
+Example dag:
+```
+ Insert
+ Upsert
+ ValidateDatasetNode with delete_input_data = true
+```
+
+If above dag is run with "dag_rounds" = 10 and "dag_intermittent_delay_mins" =
10, then this dag will run for 10 times
+with 10 mins delay between every run. At the end of every run, records written
as part of this round will be validated.
+At the end of each validation, all contents of input are deleted.
+For eg: incase of above dag,
+```
+Round1:
+ insert => inputPath/batch1
+ upsert -> inputPath/batch2
+ Validate with delete_input_data = true
+ Validates contents from batch1 and batch2 are in hudi and
ensures Row equality
+ Since "delete_input_data" is set, deletes contents from batch1
and batch2.
+Round2:
+ insert => inputPath/batch3
+ upsert -> inputPath/batch4
+ Validate with delete_input_data = true
+ Validates contents from batch3 and batch4 are in hudi and
ensures Row equality
+ Since "delete_input_data" is set, deletes contents from batch3
and batch4.
+Round3:
+ insert => inputPath/batch5
+ upsert -> inputPath/batch6
+ Validate with delete_input_data = true
+ Validates contents from batch5 and batch6 are in hudi and
ensures Row equality
+ Since "delete_input_data" is set, deletes contents from batch5
and batch6.
+.
+.
+```
+If you wish to do a cumulative validation, do not set delete_input_data in
ValidateDatasetNode. But remember that this
+may not scale beyond certain point since input data as well as hudi content's
keeps occupying the disk and grows for
+every cycle.
+
+Lets see an example where you don't set "delete_input_data" as part of
Validation.
+```
+Round1:
+ insert => inputPath/batch1
+ upsert -> inputPath/batch2
+ Validate: validates contents from batch1 and batch2 are in hudi and
ensures Row equality
+Round2:
+ insert => inputPath/batch3
+ upsert -> inputPath/batch4
+ Validate: validates contents from batch1 to batch4 are in hudi and ensures
Row equality
+Round3:
+ insert => inputPath/batch5
+ upsert -> inputPath/batch6
+ Validate: validates contents from batch1 and batch6 are in hudi and
ensures Row equality
+.
+.
+```
+
+You could also have validations in the middle of your dag and not set the
"delete_input_data". But set it only in the
+last node in the dag.
+```
+Round1:
+ insert => inputPath/batch1
+ upsert -> inputPath/batch2
+ Validate: validates contents from batch1 and batch2 are in hudi and
ensures Row equality
+ insert => inputPath/batch3
+ upsert -> inputPath/batch4
+ Validate with delete_input_data = true
+ Validates contents from batch1 to batch4 are in hudi and ensures
Row equality
+ since "delete_input_data" is set to true, this node deletes
contents from batch1 and batch4.
+Round2:
+ insert => inputPath/batch5
+ upsert -> inputPath/batch6
+ Validate: validates contents from batch5 and batch6 are in hudi and
ensures Row equality
+ insert => inputPath/batch7
+ upsert -> inputPath/batch8
+ Validate: validates contents from batch5 to batch8 are in hudi and ensures
Row equality
+ since "delete_input_data" is set to true, this node deletes
contents from batch5 to batch8.
+Round3:
+ insert => inputPath/batch9
+ upsert -> inputPath/batch10
+ Validate: validates contents from batch9 and batch10 are in hudi and
ensures Row equality
+ insert => inputPath/batch11
+ upsert -> inputPath/batch12
+ Validate with delete_input_data = true
+ Validates contents from batch9 to batch12 are in hudi and ensures
Row equality
+ Set "delete_input_data" to true. so this node deletes contents
from batch9 to batch12.
+.
+.
+```
+Above dag was just an example for illustration purposes. But you can make it
complex as per your needs.
+```
+ Insert
+ Upsert
+ Delete
+ Validate w/o deleting
+ Insert
+ Rollback
+ Validate w/o deleting
+ Upsert
+ Validate w/ deletion
+```
+With this dag, you can set the two additional configs "dag_rounds" and
"dag_intermittent_delay_mins" and have a long
+running test suite.
+
+```
+dag_rounds: 1
+dag_intermittent_delay_mins: 10
+dag_content:
+ Insert
+ Upsert
+ Delete
+ Validate w/o deleting
+ Insert
+ Rollback
+ Validate w/o deleting
+ Upsert
+ Validate w/ deletion
+
+```
+
+Sample COW command with repeated runs.
+```
+spark-submit \
+--packages org.apache.spark:spark-avro_2.11:2.4.0 \
+--conf spark.task.cpus=1 \
+--conf spark.executor.cores=1 \
+--conf spark.task.maxFailures=100 \
+--conf spark.memory.fraction=0.4 \
+--conf spark.rdd.compress=true \
+--conf spark.kryoserializer.buffer.max=2000m \
+--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.memory.storageFraction=0.1 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.sql.hive.convertMetastoreParquet=false \
+--conf spark.driver.maxResultSize=12g \
+--conf spark.executor.heartbeatInterval=120s \
+--conf spark.network.timeout=600s \
+--conf spark.yarn.max.executor.failures=10 \
+--conf spark.sql.catalogImplementation=hive \
+--conf spark.driver.extraClassPath=/var/demo/jars/* \
+--conf spark.executor.extraClassPath=/var/demo/jars/* \
+--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \
+/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \
+--source-ordering-field test_suite_source_ordering_field \
+--use-deltastreamer \
+--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
+--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
+--target-table table1 \
+--props test.properties \
+--schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
+--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
+--input-file-size 125829120 \
+--workload-yaml-path
file:/var/hoodie/ws/docker/demo/config/test-suite/complex-dag-cow.yaml \
+--workload-generator-classname
org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
+--table-type COPY_ON_WRITE \
+--compact-scheduling-minshare 1
+```
+
+A ready to use dag is available under docker/demo/config/test-suite/ that
could give you an idea for long running
+dags.
+cow-per-round-mixed-validate.yaml
+
+As of now, "ValidateDatasetNode" uses spark data source and hive tables for
comparison. Hence COW and real time view in
+MOR can be tested.
+
+
\ No newline at end of file
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 0387731..8d2f79d 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -25,13 +25,13 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.SchemaProvider;
+
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
- * Extends the {@link HoodieDeltaStreamer} to expose certain operations
helpful in running the Test Suite.
- * This is done to achieve 2 things 1) Leverage some components of {@link
HoodieDeltaStreamer} 2)
- * Piggyback on the suite to test {@link HoodieDeltaStreamer}
+ * Extends the {@link HoodieDeltaStreamer} to expose certain operations
helpful in running the Test Suite. This is done to achieve 2 things 1) Leverage
some components of {@link HoodieDeltaStreamer}
+ * 2) Piggyback on the suite to test {@link HoodieDeltaStreamer}
*/
public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 7b3324e..b5037e9 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -18,13 +18,6 @@
package org.apache.hudi.integ.testsuite;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
@@ -35,23 +28,30 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.DagUtils;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
+import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
-import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
-import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/**
- * This is the entry point for running a Hudi Test Suite. Although this class
has similarities with
- * {@link HoodieDeltaStreamer} this class does not extend it since do not want
to create a dependency on the changes in
- * DeltaStreamer.
+ * This is the entry point for running a Hudi Test Suite. Although this class
has similarities with {@link HoodieDeltaStreamer} this class does not extend it
since do not want to create a dependency
+ * on the changes in DeltaStreamer.
*/
public class HoodieTestSuiteJob {
@@ -133,10 +133,10 @@ public class HoodieTestSuiteJob {
public WorkflowDag createWorkflowDag() throws IOException {
WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ?
((WorkflowDagGenerator) ReflectionUtils
- .loadClass((this.cfg).workloadDagGenerator)).build()
- : DagUtils.convertYamlPathToDag(
- FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(),
true),
- this.cfg.workloadYamlPath);
+ .loadClass((this.cfg).workloadDagGenerator)).build()
+ : DagUtils.convertYamlPathToDag(
+ FSUtils.getFs(this.cfg.workloadYamlPath,
jsc.hadoopConfiguration(), true),
+ this.cfg.workloadYamlPath);
return workflowDag;
}
@@ -147,7 +147,7 @@ public class HoodieTestSuiteJob {
long startTime = System.currentTimeMillis();
WriterContext writerContext = new WriterContext(jsc, props, cfg,
keyGenerator, sparkSession);
writerContext.initContext(jsc);
- DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext);
+ DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext,
jsc);
dagScheduler.schedule();
log.info("Finished scheduling all tasks, Time taken {}",
System.currentTimeMillis() - startTime);
} catch (Exception e) {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 81f406b..329ef16 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -87,6 +87,7 @@ public class DeltaConfig implements Serializable {
private static String HIVE_LOCAL = "hive_local";
private static String REINIT_CONTEXT = "reinitialize_context";
private static String START_PARTITION = "start_partition";
+ private static String DELETE_INPUT_DATA = "delete_input_data";
private Map<String, Object> configsMap;
@@ -154,6 +155,10 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT,
false).toString());
}
+ public boolean isDeleteInputData() {
+ return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA,
false).toString());
+ }
+
public Map<String, Object> getOtherConfigs() {
if (configsMap == null) {
return new HashMap<>();
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java
index 24520a3..1e8acf5 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java
@@ -18,12 +18,14 @@
package org.apache.hudi.integ.testsuite.converter;
-import java.util.List;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.integ.testsuite.generator.LazyRecordGeneratorIterator;
import org.apache.hudi.integ.testsuite.generator.UpdateGeneratorIterator;
+
+import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
+import java.util.List;
+
/**
* This converter creates an update {@link GenericRecord} from an existing
{@link GenericRecord}.
*/
@@ -36,7 +38,7 @@ public class UpdateConverter implements
Converter<GenericRecord, GenericRecord>
private final List<String> recordKeyFields;
private final int minPayloadSize;
- public UpdateConverter(String schemaStr, int minPayloadSize, List<String>
partitionPathFields,
+ public UpdateConverter(String schemaStr, int minPayloadSize, List<String>
partitionPathFields,
List<String> recordKeyFields) {
this.schemaStr = schemaStr;
this.partitionPathFields = partitionPathFields;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
index d535823..1211c00 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java
@@ -48,6 +48,15 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
*/
public class DagUtils {
+ public static final String DAG_NAME = "dag_name";
+ public static final String DAG_ROUNDS = "dag_rounds";
+ public static final String DAG_INTERMITTENT_DELAY_MINS =
"dag_intermittent_delay_mins";
+ public static final String DAG_CONTENT = "dag_content";
+
+ public static int DEFAULT_DAG_ROUNDS = 1;
+ public static int DEFAULT_INTERMITTENT_DELAY_MINS = 10;
+ public static String DEFAULT_DAG_NAME = "TestDagName";
+
static final ObjectMapper MAPPER = new ObjectMapper();
/**
@@ -62,15 +71,38 @@ public class DagUtils {
* Converts a YAML representation to {@link WorkflowDag}.
*/
public static WorkflowDag convertYamlToDag(String yaml) throws IOException {
+ int dagRounds = DEFAULT_DAG_ROUNDS;
+ int intermittentDelayMins = DEFAULT_INTERMITTENT_DELAY_MINS;
+ String dagName = DEFAULT_DAG_NAME;
Map<String, DagNode> allNodes = new HashMap<>();
final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
final JsonNode jsonNode = yamlReader.readTree(yaml);
Iterator<Entry<String, JsonNode>> itr = jsonNode.fields();
while (itr.hasNext()) {
Entry<String, JsonNode> dagNode = itr.next();
- allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes,
dagNode.getKey(), dagNode.getValue()));
+ String key = dagNode.getKey();
+ switch (key) {
+ case DAG_NAME:
+ dagName = dagNode.getValue().asText();
+ break;
+ case DAG_ROUNDS:
+ dagRounds = dagNode.getValue().asInt();
+ break;
+ case DAG_INTERMITTENT_DELAY_MINS:
+ intermittentDelayMins = dagNode.getValue().asInt();
+ break;
+ case DAG_CONTENT:
+ JsonNode dagContent = dagNode.getValue();
+ Iterator<Entry<String, JsonNode>> contentItr = dagContent.fields();
+ while(contentItr.hasNext()) {
+ Entry<String, JsonNode> dagContentNode = contentItr.next();
+ allNodes.put(dagContentNode.getKey(),
convertJsonToDagNode(allNodes, dagContentNode.getKey(),
dagContentNode.getValue()));
+ }
+ default:
+ break;
+ }
}
- return new WorkflowDag(findRootNodes(allNodes));
+ return new WorkflowDag(dagName, dagRounds, intermittentDelayMins,
findRootNodes(allNodes));
}
/**
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java
index ad6e9cb..1fe2294 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java
@@ -18,8 +18,6 @@
package org.apache.hudi.integ.testsuite.dag;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
@@ -27,9 +25,11 @@ import
org.apache.hudi.integ.testsuite.dag.nodes.HiveQueryNode;
import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode;
import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode;
+import java.util.ArrayList;
+import java.util.List;
+
/**
- * An example of how to generate a workflow dag programmatically. This is also
used as the default workflow dag if
- * none is provided.
+ * An example of how to generate a workflow dag programmatically. This is also
used as the default workflow dag if none is provided.
*/
public class SimpleWorkflowDagGenerator implements WorkflowDagGenerator {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
index e9171fc..f622bb7 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
@@ -18,20 +18,47 @@
package org.apache.hudi.integ.testsuite.dag;
-import java.util.List;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
+import java.util.List;
+
+import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_NAME;
+import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_ROUNDS;
+import static
org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_INTERMITTENT_DELAY_MINS;
+
/**
* Workflow dag that encapsulates all execute nodes.
*/
public class WorkflowDag<O> {
+ private String dagName;
+ private int rounds;
+ private int intermittentDelayMins;
private List<DagNode<O>> nodeList;
public WorkflowDag(List<DagNode<O>> nodeList) {
+ this(DEFAULT_DAG_NAME, DEFAULT_DAG_ROUNDS,
DEFAULT_INTERMITTENT_DELAY_MINS, nodeList);
+ }
+
+ public WorkflowDag(String dagName, int rounds, int intermittentDelayMins,
List<DagNode<O>> nodeList) {
+ this.dagName = dagName;
+ this.rounds = rounds;
+ this.intermittentDelayMins = intermittentDelayMins;
this.nodeList = nodeList;
}
+ public String getDagName() {
+ return dagName;
+ }
+
+ public int getRounds() {
+ return rounds;
+ }
+
+ public int getIntermittentDelayMins() {
+ return intermittentDelayMins;
+ }
+
public List<DagNode<O>> getNodeList() {
return nodeList;
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
index e457f0a..650ab1e 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -21,15 +21,16 @@ package org.apache.hudi.integ.testsuite.dag;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
+import
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
+import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
-import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
-import
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -38,8 +39,7 @@ import org.apache.spark.sql.SparkSession;
import java.util.Map;
/**
- * WriterContext wraps the delta writer/data generator related configuration
needed
- * to init/reinit.
+ * WriterContext wraps the delta writer/data generator related configuration
needed to init/reinit.
*/
public class WriterContext {
@@ -53,8 +53,9 @@ public class WriterContext {
private BuiltinKeyGenerator keyGenerator;
private transient SparkSession sparkSession;
private transient JavaSparkContext jsc;
+
public WriterContext(JavaSparkContext jsc, TypedProperties props,
HoodieTestSuiteConfig cfg,
- BuiltinKeyGenerator keyGenerator, SparkSession
sparkSession) {
+ BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) {
this.cfg = cfg;
this.props = props;
this.keyGenerator = keyGenerator;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
index df54b4c..05ac242 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
@@ -41,6 +41,17 @@ public abstract class DagNode<O> implements
Comparable<DagNode<O>> {
protected Config config;
private boolean isCompleted;
+ public DagNode clone() {
+ List<DagNode<O>> tempChildNodes = new ArrayList<>();
+ for(DagNode dagNode: childNodes) {
+ tempChildNodes.add(dagNode.clone());
+ }
+ this.childNodes = tempChildNodes;
+ this.result = null;
+ this.isCompleted = false;
+ return this;
+ }
+
public DagNode<O> addChildNode(DagNode childNode) {
childNode.getParentNodes().add(this);
getChildNodes().add(childNode);
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
similarity index 55%
copy from
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
copy to
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
index e9171fc..c0671e8 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
@@ -16,24 +16,28 @@
* limitations under the License.
*/
-package org.apache.hudi.integ.testsuite.dag;
+package org.apache.hudi.integ.testsuite.dag.nodes;
-import java.util.List;
-import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Workflow dag that encapsulates all execute nodes.
+ * Delay Node to add delays between each group of test runs.
*/
-public class WorkflowDag<O> {
+public class DelayNode extends DagNode<Boolean> {
- private List<DagNode<O>> nodeList;
+ private static Logger log =
LoggerFactory.getLogger(ValidateDatasetNode.class);
+ private int delayMins;
- public WorkflowDag(List<DagNode<O>> nodeList) {
- this.nodeList = nodeList;
+ public DelayNode(int delayMins) {
+ this.delayMins = delayMins;
}
- public List<DagNode<O>> getNodeList() {
- return nodeList;
+ @Override
+ public void execute(ExecutionContext context) throws Exception {
+ log.warn("Waiting for "+ delayMins+" mins before going for next test run");
+ Thread.sleep(delayMins * 60 * 1000);
}
-
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
new file mode 100644
index 0000000..12fc525
--- /dev/null
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+/**
+ * This nodes validates contents from input path are in tact with Hudi. This
nodes uses spark datasource for comparison purposes. By default no configs are
required for this node. But there is an
+ * optional config "delete_input_data" that you can set for this node. If set,
once validation completes, contents from inputPath are deleted. This will come
in handy for long running test suites.
+ * README has more details under docker set up for usages of this node.
+ */
+public class ValidateDatasetNode extends DagNode<Boolean> {
+
+ private static Logger log =
LoggerFactory.getLogger(ValidateDatasetNode.class);
+
+ public ValidateDatasetNode(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void execute(ExecutionContext context) throws Exception {
+
+ SparkSession session =
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
+
+ // todo: Fix partitioning schemes. For now, assumes data based
partitioning.
+ String inputPath =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
+ String hudiPath =
context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
+ log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path "
+ hudiPath);
+ // listing batches to be validated
+ String inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+ FileSystem fs = new Path(inputPathStr)
+ .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+ FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
+ for (FileStatus fileStatus : fileStatuses) {
+ log.debug("Listing all Micro batches to be validated :: " +
fileStatus.getPath().toString());
+ }
+
+ String recordKeyField =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
+ String partitionPathField =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY());
+ // todo: fix hard coded fields from configs.
+ // read input and resolve insert, updates, etc.
+ Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
+ ExpressionEncoder encoder = getEncoder(inputDf.schema());
+ Dataset<Row> inputSnapshotDf = inputDf.groupByKey(
+ (MapFunction<Row, String>) value -> partitionPathField + "+" +
recordKeyField, Encoders.STRING())
+ .reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
+ int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
+ int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
+ if (ts1 > ts2) {
+ return v1;
+ } else {
+ return v2;
+ }
+ })
+ .map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2,
encoder)
+ .filter("_hoodie_is_deleted is NULL");
+
+ // read from hudi and remove meta columns.
+ Dataset<Row> hudiDf = session.read().format("hudi").load(hudiPath);
+ Dataset<Row> trimmedDf =
hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
+
+ Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedDf);
+ // the intersected df should be same as inputDf. if not, there is some
mismatch.
+ if (inputSnapshotDf.except(intersectionDf).count() != 0) {
+ log.error("Data set validation failed. Total count in hudi " +
trimmedDf.count() + ", input df count " + inputSnapshotDf.count());
+ throw new AssertionError("Hudi contents does not match contents input
data. ");
+ }
+
+ String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
+ String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
+ log.warn("Validating hive table with db : " + database + " and table : " +
tableName);
+ Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." +
tableName);
+ Dataset<Row> trimmedCowDf =
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
+ intersectionDf = inputSnapshotDf.intersect(trimmedDf);
+ // the intersected df should be same as inputDf. if not, there is some
mismatch.
+ if (inputSnapshotDf.except(intersectionDf).count() != 0) {
+ log.error("Data set validation failed for COW hive table. Total count in
hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
+ throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ }
+
+ // if delete input data is enabled, erase input data.
+ if (config.isDeleteInputData()) {
+ // clean up input data for current group of writes.
+ inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
+ fs = new Path(inputPathStr)
+
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+ fileStatuses = fs.listStatus(new Path(inputPathStr));
+ for (FileStatus fileStatus : fileStatuses) {
+ log.debug("Micro batch to be deleted " +
fileStatus.getPath().toString());
+ fs.delete(fileStatus.getPath(), true);
+ }
+ }
+ }
+
+ private ExpressionEncoder getEncoder(StructType schema) {
+ List<Attribute> attributes =
JavaConversions.asJavaCollection(schema.toAttributes()).stream()
+ .map(Attribute::toAttribute).collect(Collectors.toList());
+ return RowEncoder.apply(schema)
+
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
+ SimpleAnalyzer$.MODULE$);
+ }
+}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
index 5c70ea1..d4074bc 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
@@ -23,8 +23,10 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
+import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.hudi.metrics.Metrics;
+import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +52,9 @@ public class DagScheduler {
private WorkflowDag workflowDag;
private ExecutionContext executionContext;
- public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext) {
+ public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext,
JavaSparkContext jsc) {
this.workflowDag = workflowDag;
- this.executionContext = new ExecutionContext(null, writerContext);
+ this.executionContext = new ExecutionContext(jsc, writerContext);
}
/**
@@ -63,7 +65,7 @@ public class DagScheduler {
public void schedule() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
try {
- execute(service, workflowDag.getNodeList());
+ execute(service, workflowDag);
service.shutdown();
} finally {
if (!service.isShutdown()) {
@@ -77,33 +79,47 @@ public class DagScheduler {
* Method to start executing the nodes in workflow DAGs.
*
* @param service ExecutorService
- * @param nodes Nodes to be executed
+ * @param workflowDag instance of workflow dag that needs to be executed
* @throws Exception will be thrown if ant error occurred
*/
- private void execute(ExecutorService service, List<DagNode> nodes) throws
Exception {
+ private void execute(ExecutorService service, WorkflowDag workflowDag)
throws Exception {
// Nodes at the same level are executed in parallel
- Queue<DagNode> queue = new PriorityQueue<>(nodes);
log.info("Running workloads");
+ List<DagNode> nodes = workflowDag.getNodeList();
+ int curRound = 1;
do {
- List<Future> futures = new ArrayList<>();
- Set<DagNode> childNodes = new HashSet<>();
- while (queue.size() > 0) {
- DagNode nodeToExecute = queue.poll();
- log.info("Node to execute in dag scheduler " +
nodeToExecute.getConfig().toString());
- futures.add(service.submit(() -> executeNode(nodeToExecute)));
- if (nodeToExecute.getChildNodes().size() > 0) {
- childNodes.addAll(nodeToExecute.getChildNodes());
- }
+
log.warn("===================================================================");
+ log.warn("Running workloads for round num " + curRound);
+
log.warn("===================================================================");
+ Queue<DagNode> queue = new PriorityQueue<>();
+ for (DagNode dagNode : nodes) {
+ queue.add(dagNode.clone());
}
- queue.addAll(childNodes);
- childNodes.clear();
- for (Future future : futures) {
- future.get(1, TimeUnit.HOURS);
+ do {
+ List<Future> futures = new ArrayList<>();
+ Set<DagNode> childNodes = new HashSet<>();
+ while (queue.size() > 0) {
+ DagNode nodeToExecute = queue.poll();
+ log.warn("Executing node \"" +
nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " +
nodeToExecute.getConfig());
+ futures.add(service.submit(() -> executeNode(nodeToExecute)));
+ if (nodeToExecute.getChildNodes().size() > 0) {
+ childNodes.addAll(nodeToExecute.getChildNodes());
+ }
+ }
+ queue.addAll(childNodes);
+ childNodes.clear();
+ for (Future future : futures) {
+ future.get(1, TimeUnit.HOURS);
+ }
+ } while (queue.size() > 0);
+ log.info("Finished workloads for round num " + curRound);
+ if (curRound < workflowDag.getRounds()) {
+ new
DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext);
}
// After each level, report and flush the metrics
Metrics.flush();
- } while (queue.size() > 0);
+ } while (curRound++ < workflowDag.getRounds());
log.info("Finished workloads");
}
@@ -119,7 +135,6 @@ public class DagScheduler {
try {
int repeatCount = node.getConfig().getRepeatCount();
while (repeatCount > 0) {
- log.warn("executing node: \"" +
node.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" of type: " +
node.getClass() + " :: " + node.getConfig().toString());
node.execute(executionContext);
log.info("Finished executing {}", node.getName());
repeatCount--;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index 53af8eb..6242cbf 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -41,6 +41,10 @@ import
org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.converter.Converter;
import org.apache.hudi.integ.testsuite.converter.DeleteConverter;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
@@ -51,6 +55,7 @@ import
org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -58,6 +63,17 @@ import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
import scala.Tuple2;
@@ -77,7 +93,7 @@ public class DeltaGenerator implements Serializable {
private int batchId;
public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext
jsc, SparkSession sparkSession,
- String schemaStr, BuiltinKeyGenerator keyGenerator) {
+ String schemaStr, BuiltinKeyGenerator keyGenerator) {
this.deltaOutputConfig = deltaOutputConfig;
this.jsc = jsc;
this.sparkSession = sparkSession;
@@ -167,7 +183,6 @@ public class DeltaGenerator implements Serializable {
log.info("Repartitioning records into " + numPartition + " partitions
for updates");
adjustedRDD = adjustedRDD.repartition(numPartition);
log.info("Repartitioning records done for updates");
-
UpdateConverter converter = new UpdateConverter(schemaStr,
config.getRecordSize(),
partitionPathFieldNames, recordRowKeyFieldNames);
JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
index 256dfa4..5477371 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
@@ -20,6 +20,11 @@ package org.apache.hudi.integ.testsuite.generator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
import java.util.HashSet;
import java.util.Iterator;
@@ -67,7 +72,7 @@ public class FlexibleSchemaRecordGenerationIterator
implements Iterator<GenericR
lastRecord = record;
return record;
} else {
- return this.generator.randomize(lastRecord, partitionPathFieldNames);
+ return this.generator.randomize(lastRecord,
this.partitionPathFieldNames);
}
}
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index 7d5ca08..49a5f31 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -134,12 +134,16 @@ public class GenericRecordFullPayloadGenerator implements
Serializable {
protected GenericRecord create(Schema schema, Set<String>
partitionPathFieldNames) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
- if (isPartialLongField(f, partitionPathFieldNames)) {
- // This is a long field used as partition field. Set it to seconds
since epoch.
- long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS);
- result.put(f.name(), (long) value);
+ if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) {
+ result.put(f.name(), false);
} else {
- result.put(f.name(), typeConvert(f));
+ if (isPartialLongField(f, partitionPathFieldNames)) {
+ // This is a long field used as partition field. Set it to seconds
since epoch.
+ long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS);
+ result.put(f.name(), (long) value);
+ } else {
+ result.put(f.name(), typeConvert(f));
+ }
}
}
return result;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
index 51b1fd9..89cda65 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
@@ -18,16 +18,16 @@
package org.apache.hudi.integ.testsuite.generator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
/**
* A lazy update payload generator to generate {@link GenericRecord}s lazily.
*/
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index b7d71f5..94ff3a3 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -18,20 +18,6 @@
package org.apache.hudi.integ.testsuite.helpers;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -40,13 +26,26 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
/**
* A custom dfs path selector used only for the hudi test suite. To be used
only if workload is not run inline.
*/
public class DFSTestSuitePathSelector extends DFSPathSelector {
+
private static volatile Logger log =
LoggerFactory.getLogger(HoodieTestSuiteJob.class);
public DFSTestSuitePathSelector(TypedProperties props, Configuration
hadoopConf) {
@@ -67,6 +66,7 @@ public class DFSTestSuitePathSelector extends DFSPathSelector
{
lastBatchId = 0;
nextBatchId = 1;
}
+
// obtain all eligible files for the batch
List<FileStatus> eligibleFiles = new ArrayList<>();
FileStatus[] fileStatuses = fs.globStatus(
@@ -87,7 +87,8 @@ public class DFSTestSuitePathSelector extends DFSPathSelector
{
if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
.anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
- } else if
(fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0) {
+ } else if (Integer.parseInt(fileStatus.getPath().getName()) >
lastBatchId && Integer.parseInt(fileStatus.getPath()
+ .getName()) <= nextBatchId) {
RemoteIterator<LocatedFileStatus> files =
fs.listFiles(fileStatus.getPath(), true);
while (files.hasNext()) {
eligibleFiles.add(files.next());
@@ -95,7 +96,6 @@ public class DFSTestSuitePathSelector extends DFSPathSelector
{
}
}
- log.info("Reading " + eligibleFiles.size() + " files. ");
// no data to readAvro
if (eligibleFiles.size() == 0) {
return new ImmutablePair<>(Option.empty(),
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index bc7803d..43d5fde 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -18,26 +18,6 @@
package org.apache.hudi.integ.testsuite.reader;
-import static java.util.Map.Entry.comparingByValue;
-import static java.util.stream.Collectors.toMap;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -51,6 +31,12 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieMemoryConfig;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.spark.api.java.JavaPairRDD;
@@ -59,11 +45,27 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
import scala.Tuple2;
+import static java.util.Map.Entry.comparingByValue;
+import static java.util.stream.Collectors.toMap;
+
/**
- * This class helps to generate updates from an already existing hoodie
dataset. It supports generating updates in
- * across partitions, files and records.
+ * This class helps to generate updates from an already existing hoodie
dataset. It supports generating updates in across partitions, files and records.
*/
public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
@@ -148,16 +150,22 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
long recordsInSingleFile =
iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice)));
int numFilesToUpdate;
long numRecordsToUpdatePerFile;
- if (!numFiles.isPresent() || numFiles.get() == 0) {
+ if (!numFiles.isPresent() || numFiles.get() <= 0) {
// If num files are not passed, find the number of files to update based
on total records to update and records
// per file
- numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() /
recordsInSingleFile);
- // recordsInSingleFile is not average so we still need to account for
bias is records distribution
- // in the files. Limit to the maximum number of files available.
- int totalExistingFilesCount =
partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
- numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
- log.info("Files to update {}", numFilesToUpdate);
- numRecordsToUpdatePerFile = recordsInSingleFile;
+ numFilesToUpdate = (int) Math.floor((double) numRecordsToUpdate.get() /
recordsInSingleFile);
+ if (numFilesToUpdate > 0) {
+ // recordsInSingleFile is not average so we still need to account for
bias is records distribution
+ // in the files. Limit to the maximum number of files available.
+ int totalExistingFilesCount =
partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
+ numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
+ log.info("Files to update {}, records to update per file {}",
numFilesToUpdate, recordsInSingleFile);
+ numRecordsToUpdatePerFile = recordsInSingleFile;
+ } else {
+ numFilesToUpdate = 1;
+ numRecordsToUpdatePerFile = numRecordsToUpdate.get();
+ log.info("Total records passed in < records in single file. Hence
setting numFilesToUpdate to 1 and numRecordsToUpdate to {} ",
numRecordsToUpdatePerFile);
+ }
} else {
// If num files is passed, find the number of records per file based on
either percentage or total records to
// update and num files passed
@@ -171,6 +179,7 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap);
JavaRDD<GenericRecord> updates =
projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap,
partitionToFileSlice, numFilesToUpdate, (int)
numRecordsToUpdatePerFile));
+
if (numRecordsToUpdate.isPresent() && numFiles.isPresent() &&
numFiles.get() != 0 && numRecordsToUpdate.get()
!= numRecordsToUpdatePerFile * numFiles.get()) {
long remainingRecordsToAdd = (numRecordsToUpdate.get() -
(numRecordsToUpdatePerFile * numFiles.get()));
@@ -215,7 +224,7 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
LinkedHashMap::new));
// Limit files to be read per partition
- int numFilesPerPartition = (int) Math.ceil((double)numFiles /
numPartitions);
+ int numFilesPerPartition = (int) Math.ceil((double) numFiles /
numPartitions);
Map<String, Integer> adjustedPartitionToFileIdCountMap = new HashMap<>();
partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> {
if (e.getValue() <= numFilesPerPartition) {
@@ -283,9 +292,7 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
}
/**
- * Returns the number of elements remaining in {@code iterator}. The iterator
- * will be left exhausted: its {@code hasNext()} method will return
- * {@code false}.
+ * Returns the number of elements remaining in {@code iterator}. The
iterator will be left exhausted: its {@code hasNext()} method will return
{@code false}.
*/
private static int iteratorSize(Iterator<?> iterator) {
int count = 0;
@@ -297,11 +304,8 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
}
/**
- * Creates an iterator returning the first {@code limitSize} elements of the
- * given iterator. If the original iterator does not contain that many
- * elements, the returned iterator will have the same behavior as the
original
- * iterator. The returned iterator supports {@code remove()} if the original
- * iterator does.
+ * Creates an iterator returning the first {@code limitSize} elements of the
given iterator. If the original iterator does not contain that many elements,
the returned iterator will have the same
+ * behavior as the original iterator. The returned iterator supports {@code
remove()} if the original iterator does.
*
* @param iterator the iterator to limit
* @param limitSize the maximum number of elements in the returned iterator
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java
similarity index 66%
copy from
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
copy to
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java
index e9171fc..2de9452 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java
@@ -16,24 +16,10 @@
* limitations under the License.
*/
-package org.apache.hudi.integ.testsuite.dag;
+package org.apache.hudi.integ.testsuite.schema;
-import java.util.List;
-import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
+public class SchemaUtils {
-/**
- * Workflow dag that encapsulates all execute nodes.
- */
-public class WorkflowDag<O> {
-
- private List<DagNode<O>> nodeList;
-
- public WorkflowDag(List<DagNode<O>> nodeList) {
- this.nodeList = nodeList;
- }
-
- public List<DagNode<O>> getNodeList() {
- return nodeList;
- }
+ public static final String SOURCE_ORDERING_FIELD =
"test_suite_source_ordering_field";
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java
new file mode 100644
index 0000000..e67c5af
--- /dev/null
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.schema;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.integ.testsuite.dag.WriterContext;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Appends source ordering field to both source and target schemas. This is
required to assist in validation to differentiate records written in different
batches.
+ */
+public class TestSuiteFileBasedSchemaProvider extends FilebasedSchemaProvider {
+
+ protected static Logger log = LogManager.getLogger(WriterContext.class);
+
+ public TestSuiteFileBasedSchemaProvider(TypedProperties props,
JavaSparkContext jssc) {
+ super(props, jssc);
+ this.sourceSchema = addSourceOrderingFieldToSchema(sourceSchema);
+ this.targetSchema = addSourceOrderingFieldToSchema(targetSchema);
+ }
+
+ private Schema addSourceOrderingFieldToSchema(Schema schema) {
+ List<Field> fields = new ArrayList<>();
+ for (Schema.Field field : schema.getFields()) {
+ Schema.Field newField = new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultVal());
+ for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet())
{
+ newField.addProp(prop.getKey(), prop.getValue());
+ }
+ fields.add(newField);
+ }
+ Schema.Field sourceOrderingField =
+ new Schema.Field(SchemaUtils.SOURCE_ORDERING_FIELD,
Schema.create(Type.INT), "", 0);
+ fields.add(sourceOrderingField);
+ Schema mergedSchema = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(), false);
+ mergedSchema.setFields(fields);
+ return mergedSchema;
+ }
+
+}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java
index 65e4ee1..4bd096a 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java
@@ -18,6 +18,8 @@
package org.apache.hudi.integ.testsuite.writer;
+import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
+
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
@@ -30,22 +32,29 @@ import java.util.List;
*/
public class DFSDeltaWriterAdapter implements
DeltaWriterAdapter<GenericRecord> {
- private DeltaInputWriter deltaInputGenerator;
+ private DeltaInputWriter deltaInputWriter;
private List<DeltaWriteStats> metrics = new ArrayList<>();
+ private int preCombineFieldVal = 0;
+
+ public DFSDeltaWriterAdapter(DeltaInputWriter<GenericRecord>
deltaInputWriter, int preCombineFieldVal) {
+ this.deltaInputWriter = deltaInputWriter;
+ this.preCombineFieldVal = preCombineFieldVal;
+ }
- public DFSDeltaWriterAdapter(DeltaInputWriter<GenericRecord>
deltaInputGenerator) {
- this.deltaInputGenerator = deltaInputGenerator;
+ public DFSDeltaWriterAdapter(DeltaInputWriter<GenericRecord>
deltaInputWriter) {
+ this.deltaInputWriter = deltaInputWriter;
}
@Override
public List<DeltaWriteStats> write(Iterator<GenericRecord> input) throws
IOException {
while (input.hasNext()) {
GenericRecord next = input.next();
- if (this.deltaInputGenerator.canWrite()) {
- this.deltaInputGenerator.writeData(next);
- } else if (input.hasNext()) {
+ next.put(SchemaUtils.SOURCE_ORDERING_FIELD, preCombineFieldVal);
+ if (this.deltaInputWriter.canWrite()) {
+ this.deltaInputWriter.writeData(next);
+ } else {
rollOver();
- this.deltaInputGenerator.writeData(next);
+ this.deltaInputWriter.writeData(next);
}
}
close();
@@ -54,11 +63,11 @@ public class DFSDeltaWriterAdapter implements
DeltaWriterAdapter<GenericRecord>
public void rollOver() throws IOException {
close();
- this.deltaInputGenerator = this.deltaInputGenerator.getNewWriter();
+ this.deltaInputWriter = this.deltaInputWriter.getNewWriter();
}
private void close() throws IOException {
- this.deltaInputGenerator.close();
- this.metrics.add(this.deltaInputGenerator.getDeltaWriteStats());
+ this.deltaInputWriter.close();
+ this.metrics.add(this.deltaInputWriter.getDeltaWriteStats());
}
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java
index b4d9b9f..a00e8e1 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java
@@ -18,16 +18,17 @@
package org.apache.hudi.integ.testsuite.writer;
-import java.io.IOException;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.IOException;
+
/**
- * A factory to help instantiate different {@link DeltaWriterAdapter}s
depending on the {@link DeltaOutputMode} and
- * {@link DeltaInputType}.
+ * A factory to help instantiate different {@link DeltaWriterAdapter}s
depending on the {@link DeltaOutputMode} and {@link DeltaInputType}.
*/
public class DeltaWriterFactory {
@@ -44,9 +45,9 @@ public class DeltaWriterFactory {
DeltaInputWriter<GenericRecord> fileDeltaInputGenerator = new
AvroFileDeltaInputWriter(
dfsDeltaConfig.getConfiguration(),
StringUtils
- .join(new String[]{dfsDeltaConfig.getDeltaBasePath(),
dfsDeltaConfig.getBatchId().toString()},
+ .join(new String[] {dfsDeltaConfig.getDeltaBasePath(),
dfsDeltaConfig.getBatchId().toString()},
"/"), dfsDeltaConfig.getSchemaStr(),
dfsDeltaConfig.getMaxFileSize());
- return new DFSDeltaWriterAdapter(fileDeltaInputGenerator);
+ return new DFSDeltaWriterAdapter(fileDeltaInputGenerator, batchId);
default:
throw new IllegalArgumentException("Invalid delta input format " +
config.getDeltaInputType());
}
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java
index 1e5ca68..8235099 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java
@@ -18,18 +18,19 @@
package org.apache.hudi.integ.testsuite.configuration;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode;
import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode;
-import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
+
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertEquals;
+
/**
* Unit test for the build process of {@link DagNode} and {@link WorkflowDag}.
*/
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java
index d941744..70e6da7 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java
@@ -47,6 +47,9 @@ public class TestDagUtils {
public void testConvertYamlToDag() throws Exception {
WorkflowDag dag = DagUtils.convertYamlToDag(UtilitiesTestBase.Helpers
.readFileFromAbsolutePath((System.getProperty("user.dir") + "/.." +
COW_DAG_DOCKER_DEMO_RELATIVE_PATH)));
+ assertEquals(dag.getDagName(), "unit-test-cow-dag");
+ assertEquals(dag.getRounds(), 1);
+ assertEquals(dag.getIntermittentDelayMins(), 10);
assertEquals(dag.getNodeList().size(), 1);
Assertions.assertEquals(((DagNode)
dag.getNodeList().get(0)).getParentNodes().size(), 0);
assertEquals(((DagNode) dag.getNodeList().get(0)).getChildNodes().size(),
1);
diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
index 96a6c82..2369165 100644
--- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
+++ b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml
@@ -13,58 +13,62 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-first_insert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- repeat_count: 2
- num_records_insert: 100
- type: InsertNode
- deps: none
-second_insert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- repeat_count: 1
- num_records_insert: 100
- type: InsertNode
- deps: first_insert
-first_rollback:
- config:
- deps: second_insert
- type: RollbackNode
-third_insert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- repeat_count: 1
- num_records_insert: 100
- type: InsertNode
- deps: first_rollback
-first_upsert:
- config:
- record_size: 70000
- num_partitions_upsert: 1
- repeat_count: 1
- num_records_upsert: 100
- type: UpsertNode
- deps: third_insert
-first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_upsert
-first_hive_query:
- config:
- hive_props:
- prop2: "set spark.yarn.queue="
- prop3: "set hive.strict.checks.large.query=false"
- prop4: "set hive.stats.autogather=false"
- hive_queries:
- query1: "select count(*) from testdb1.table1"
- result1: 300
- query2: "select count(*) from testdb1.table1 group by `_row_key`
having count(*) > 1"
- result2: 0
- type: HiveQueryNode
- deps: first_hive_sync
\ No newline at end of file
+dag_name: unit-test-cow-dag
+dag_rounds: 1
+dag_intermittent_delay_mins: 10
+dag_content:
+ first_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 2
+ num_records_insert: 100
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: first_insert
+ first_rollback:
+ config:
+ deps: second_insert
+ type: RollbackNode
+ third_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: first_rollback
+ first_upsert:
+ config:
+ record_size: 70000
+ num_partitions_upsert: 1
+ repeat_count: 1
+ num_records_upsert: 100
+ type: UpsertNode
+ deps: third_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_upsert
+ first_hive_query:
+ config:
+ hive_props:
+ prop2: "set spark.yarn.queue="
+ prop3: "set hive.strict.checks.large.query=false"
+ prop4: "set hive.stats.autogather=false"
+ hive_queries:
+ query1: "select count(*) from testdb1.table1"
+ result1: 300
+ query2: "select count(*) from testdb1.table1 group by `_row_key`
having count(*) > 1"
+ result2: 0
+ type: HiveQueryNode
+ deps: first_hive_sync
\ No newline at end of file
diff --git a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml
b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml
index 96a6c82..2ba4245 100644
--- a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml
+++ b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml
@@ -13,58 +13,62 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-first_insert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- repeat_count: 2
- num_records_insert: 100
- type: InsertNode
- deps: none
-second_insert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- repeat_count: 1
- num_records_insert: 100
- type: InsertNode
- deps: first_insert
-first_rollback:
- config:
- deps: second_insert
- type: RollbackNode
-third_insert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- repeat_count: 1
- num_records_insert: 100
- type: InsertNode
- deps: first_rollback
-first_upsert:
- config:
- record_size: 70000
- num_partitions_upsert: 1
- repeat_count: 1
- num_records_upsert: 100
- type: UpsertNode
- deps: third_insert
-first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_upsert
-first_hive_query:
- config:
- hive_props:
- prop2: "set spark.yarn.queue="
- prop3: "set hive.strict.checks.large.query=false"
- prop4: "set hive.stats.autogather=false"
- hive_queries:
- query1: "select count(*) from testdb1.table1"
- result1: 300
- query2: "select count(*) from testdb1.table1 group by `_row_key`
having count(*) > 1"
- result2: 0
- type: HiveQueryNode
- deps: first_hive_sync
\ No newline at end of file
+dag_name: unit-test-mor-dag
+dag_rounds: 1
+dag_intermittent_delay_mins: 10
+dag_content:
+ first_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 2
+ num_records_insert: 100
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: first_insert
+ first_rollback:
+ config:
+ deps: second_insert
+ type: RollbackNode
+ third_insert:
+ config:
+ record_size: 70000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 100
+ type: InsertNode
+ deps: first_rollback
+ first_upsert:
+ config:
+ record_size: 70000
+ num_partitions_upsert: 1
+ repeat_count: 1
+ num_records_upsert: 100
+ type: UpsertNode
+ deps: third_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_upsert
+ first_hive_query:
+ config:
+ hive_props:
+ prop2: "set spark.yarn.queue="
+ prop3: "set hive.strict.checks.large.query=false"
+ prop4: "set hive.stats.autogather=false"
+ hive_queries:
+ query1: "select count(*) from testdb1.table1"
+ result1: 300
+ query2: "select count(*) from testdb1.table1 group by `_row_key`
having count(*) > 1"
+ result2: 0
+ type: HiveQueryNode
+ deps: first_hive_sync
\ No newline at end of file
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 43f2ff2..7542755 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -46,9 +46,9 @@ public class FilebasedSchemaProvider extends SchemaProvider {
private final FileSystem fs;
- private final Schema sourceSchema;
+ protected Schema sourceSchema;
- private Schema targetSchema;
+ protected Schema targetSchema;
public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc)
{
super(props, jssc);