This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d5f2028 Adding fixes to test suite framework. Adding clustering node
and validate async operations node. (#2400)
d5f2028 is described below
commit d5f202821b11659693a2e934d4b8f4c99f19755e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Feb 12 12:29:21 2021 -0500
Adding fixes to test suite framework. Adding clustering node and validate
async operations node. (#2400)
---
docker/demo/config/test-suite/complex-dag-cow.yaml | 30 +++--
docker/demo/config/test-suite/complex-dag-mor.yaml | 91 +++++----------
...ex-dag-cow.yaml => cow-clustering-example.yaml} | 52 +++++----
.../test-suite/cow-long-running-example.yaml | 39 +++++--
...yaml => cow-long-running-multi-partitions.yaml} | 49 +++++---
docker/demo/config/test-suite/test.properties | 4 +
.../hudi/client/AbstractHoodieWriteClient.java | 3 +-
hudi-integ-test/pom.xml | 11 ++
.../hudi/integ/testsuite/HoodieTestSuiteJob.java | 1 +
.../integ/testsuite/HoodieTestSuiteWriter.java | 39 +++++--
.../integ/testsuite/configuration/DeltaConfig.java | 20 ++++
.../hudi/integ/testsuite/dag/ExecutionContext.java | 2 +-
.../hudi/integ/testsuite/dag/nodes/CleanNode.java | 2 +-
.../nodes/{CleanNode.java => ClusteringNode.java} | 22 ++--
.../integ/testsuite/dag/nodes/CompactNode.java | 3 +-
.../hudi/integ/testsuite/dag/nodes/DagNode.java | 3 +-
.../hudi/integ/testsuite/dag/nodes/DelayNode.java | 2 +-
.../integ/testsuite/dag/nodes/HiveQueryNode.java | 2 +-
.../integ/testsuite/dag/nodes/HiveSyncNode.java | 2 +-
.../hudi/integ/testsuite/dag/nodes/InsertNode.java | 2 +-
.../integ/testsuite/dag/nodes/RollbackNode.java | 4 +-
.../testsuite/dag/nodes/ScheduleCompactNode.java | 4 +-
.../testsuite/dag/nodes/SparkSQLQueryNode.java | 3 +-
.../dag/nodes/ValidateAsyncOperations.java | 124 +++++++++++++++++++++
.../testsuite/dag/nodes/ValidateDatasetNode.java | 26 +++--
.../integ/testsuite/dag/nodes/ValidateNode.java | 3 +-
.../testsuite/dag/scheduler/DagScheduler.java | 9 +-
.../FlexibleSchemaRecordGenerationIterator.java | 2 +-
.../GenericRecordFullPayloadGenerator.java | 2 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 7 +-
packaging/hudi-integ-test-bundle/pom.xml | 1 +
31 files changed, 398 insertions(+), 166 deletions(-)
diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml
b/docker/demo/config/test-suite/complex-dag-cow.yaml
index 5fa8596..acbe287 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/complex-dag-cow.yaml
@@ -13,13 +13,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: cow-long-running-example.yaml
-dag_rounds: 2
+dag_name: complex-dag-cow.yaml
+dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
deps: none
second_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
@@ -35,19 +35,26 @@ dag_content:
type: InsertNode
third_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_insert
first_validate:
config:
+ validate_hive: true
type: ValidateDatasetNode
- deps: third_insert
+ deps: first_hive_sync
first_upsert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
@@ -61,8 +68,15 @@ dag_content:
num_records_delete: 2000
type: DeleteNode
deps: first_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
second_validate:
config:
+ validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
- deps: first_delete
\ No newline at end of file
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/complex-dag-mor.yaml
b/docker/demo/config/test-suite/complex-dag-mor.yaml
index 505e5e2..24f3a9c 100644
--- a/docker/demo/config/test-suite/complex-dag-mor.yaml
+++ b/docker/demo/config/test-suite/complex-dag-mor.yaml
@@ -15,105 +15,70 @@
# limitations under the License.
dag_name: complex-dag-mor.yaml
dag_rounds: 1
-dag_intermittent_delay_mins: 10
+dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
- record_size: 70000
+ record_size: 1000
num_partitions_insert: 1
- repeat_count: 5
+ repeat_count: 1
num_records_insert: 100
type: InsertNode
deps: none
second_insert:
config:
- record_size: 70000
+ record_size: 1000
num_partitions_insert: 1
- repeat_count: 5
- num_records_insert: 100
+ repeat_count: 1
+ num_records_insert: 1000
deps: first_insert
type: InsertNode
third_insert:
config:
- record_size: 70000
+ record_size: 1000
num_partitions_insert: 1
- repeat_count: 2
+ repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
- first_rollback:
- config:
- deps: third_insert
- type: RollbackNode
- first_upsert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_partitions_upsert: 10
- type: UpsertNode
- deps: first_rollback
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
- deps: first_upsert
- first_hive_query:
+ deps: third_insert
+ first_validate:
config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveQueryNode
+ type: ValidateDatasetNode
deps: first_hive_sync
- second_upsert:
+ first_upsert:
config:
- record_size: 70000
+ record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 100
- num_partitions_upsert: 10
+ num_partitions_upsert: 1
type: UpsertNode
- deps: first_hive_query
- second_hive_query:
- config:
- queue_name: "adhoc"
- engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 1100
- type: HiveQueryNode
- deps: second_upsert
+ deps: first_validate
first_schedule_compact:
config:
type: ScheduleCompactNode
- deps: second_hive_query
- third_upsert:
- config:
- record_size: 70000
- num_partitions_insert: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_partitions_upsert: 10
- type: UpsertNode
- deps: first_schedule_compact
- first_compact:
+ deps: first_upsert
+ first_delete:
config:
- type: CompactNode
+ num_partitions_delete: 1
+ num_records_delete: 500
+ type: DeleteNode
deps: first_schedule_compact
- third_hive_query:
+ second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
- hive_queries:
- query1: "select count(*) from testdb.table1 group by `_row_key` having
count(*) > 1"
- result1: 0
- query2: "select count(*) from testdb.table1"
- result2: 1400
- type: HiveQueryNode
- deps: first_compact
+ type: HiveSyncNode
+ deps: first_delete
+ second_validate:
+ config:
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml
b/docker/demo/config/test-suite/cow-clustering-example.yaml
similarity index 71%
copy from docker/demo/config/test-suite/complex-dag-cow.yaml
copy to docker/demo/config/test-suite/cow-clustering-example.yaml
index 5fa8596..939e16f 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/cow-clustering-example.yaml
@@ -13,13 +13,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: cow-long-running-example.yaml
-dag_rounds: 2
-dag_intermittent_delay_mins: 1
+dag_name: cow-clustering-example.yaml
+dag_rounds: 3
+dag_intermittent_delay_mins: 0
dag_content:
first_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
deps: none
second_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
@@ -35,34 +35,42 @@ dag_content:
type: InsertNode
third_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
+ first_delete:
+ config:
+ num_partitions_delete: 1
+ num_records_delete: 9000
+ type: DeleteNode
+ deps: third_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
first_validate:
config:
+ validate_hive: true
type: ValidateDatasetNode
- deps: third_insert
- first_upsert:
+ deps: first_hive_sync
+ first_cluster:
config:
- record_size: 100
- num_partitions_insert: 1
- num_records_insert: 300
- repeat_count: 1
- num_records_upsert: 100
- num_partitions_upsert: 1
- type: UpsertNode
+ execute_itr_count: 2
+ type: ClusteringNode
deps: first_validate
- first_delete:
+ second_hive_sync:
config:
- num_partitions_delete: 1
- num_records_delete: 2000
- type: DeleteNode
- deps: first_upsert
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_cluster
second_validate:
config:
- delete_input_data: true
+ validate_hive: true
type: ValidateDatasetNode
- deps: first_delete
\ No newline at end of file
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml
b/docker/demo/config/test-suite/cow-long-running-example.yaml
index b7026f2..71a34f8 100644
--- a/docker/demo/config/test-suite/cow-long-running-example.yaml
+++ b/docker/demo/config/test-suite/cow-long-running-example.yaml
@@ -14,12 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-long-running-example.yaml
-dag_rounds: 20
-dag_intermittent_delay_mins: 10
+dag_rounds: 50
+dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
deps: none
second_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
@@ -35,19 +35,26 @@ dag_content:
type: InsertNode
third_insert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_insert
first_validate:
config:
+ validate_hive: true
type: ValidateDatasetNode
- deps: third_insert
+ deps: first_hive_sync
first_upsert:
config:
- record_size: 100
+ record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
@@ -58,11 +65,25 @@ dag_content:
first_delete:
config:
num_partitions_delete: 1
- num_records_delete: 2000
+ num_records_delete: 8000
type: DeleteNode
deps: first_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
second_validate:
config:
+ validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
- deps: first_delete
\ No newline at end of file
+ deps: second_hive_sync
+ last_validate:
+ config:
+ execute_itr_count: 50
+ validate_clean: true
+ validate_archival: true
+ type: ValidateAsyncOperations
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml
b/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
similarity index 65%
copy from docker/demo/config/test-suite/complex-dag-cow.yaml
copy to docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
index 5fa8596..b071c46 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
@@ -13,42 +13,49 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: cow-long-running-example.yaml
-dag_rounds: 2
+dag_name: cow-long-running-multi-partitions.yaml
+dag_rounds: 50
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
- record_size: 100
- num_partitions_insert: 1
+ record_size: 1000
+ num_partitions_insert: 5
repeat_count: 1
num_records_insert: 1000
type: InsertNode
deps: none
second_insert:
config:
- record_size: 100
- num_partitions_insert: 1
+ record_size: 1000
+ num_partitions_insert: 50
repeat_count: 1
num_records_insert: 10000
deps: first_insert
type: InsertNode
third_insert:
config:
- record_size: 100
- num_partitions_insert: 1
+ record_size: 1000
+ num_partitions_insert: 2
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_insert
first_validate:
config:
+ validate_hive: true
type: ValidateDatasetNode
- deps: third_insert
+ deps: first_hive_sync
first_upsert:
config:
- record_size: 100
- num_partitions_insert: 1
+ record_size: 1000
+ num_partitions_insert: 2
num_records_insert: 300
repeat_count: 1
num_records_upsert: 100
@@ -57,12 +64,26 @@ dag_content:
deps: first_validate
first_delete:
config:
- num_partitions_delete: 1
- num_records_delete: 2000
+ num_partitions_delete: 50
+ num_records_delete: 8000
type: DeleteNode
deps: first_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
second_validate:
config:
+ validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
- deps: first_delete
\ No newline at end of file
+ deps: second_hive_sync
+ last_validate:
+ config:
+ execute_itr_count: 50
+ validate_clean: true
+ validate_archival: true
+ type: ValidateAsyncOperations
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/test.properties
b/docker/demo/config/test-suite/test.properties
index 0aa0f45..9dfb465 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test.properties
@@ -20,6 +20,10 @@ hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
+hoodie.clustering.plan.strategy.sort.columns=_row_key
+hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
+hoodie.clustering.inline.max.commits=1
+
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 63d0bff..25ce039 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -371,7 +371,6 @@ public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload, I
* Common method containing steps to be performed before write
(upsert/insert/...
*
* @param instantTime Instant Time
- * @param hoodieTable Hoodie Table
* @return Write Status
*/
protected void preWrite(String instantTime, WriteOperationType
writeOperationType) {
@@ -719,7 +718,7 @@ public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload, I
*/
protected abstract void completeCompaction(HoodieCommitMetadata metadata, O
writeStatuses,
HoodieTable<T, I, K, O> table,
String compactionCommitTime);
-
+
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert
the .inflight file to the .requested file
*
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index b48dd81..d93c663 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -92,6 +92,11 @@
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-spark-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
@@ -201,6 +206,12 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-spark-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
<!-- Fasterxml -->
<dependency>
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index b5037e9..aeb8748 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -82,6 +82,7 @@ public class HoodieTestSuiteJob {
private BuiltinKeyGenerator keyGenerator;
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc)
throws IOException {
+ log.warn("Running spark job w/ app id " + jsc.sc().applicationId());
this.cfg = cfg;
this.jsc = jsc;
cfg.propsFilePath =
FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index a06c281..3c61291 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -18,8 +18,6 @@
package org.apache.hudi.integ.testsuite;
-import org.apache.hadoop.conf.Configuration;
-
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
@@ -30,6 +28,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
@@ -41,9 +40,14 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
@@ -53,13 +57,15 @@ import java.util.Properties;
import java.util.Set;
/**
- * A writer abstraction for the Hudi test suite. This class wraps different
implementations of writers used to perform
- * write operations into the target hudi dataset. Current supported writers
are {@link HoodieDeltaStreamerWrapper}
- * and {@link SparkRDDWriteClient}.
+ * A writer abstraction for the Hudi test suite. This class wraps different
implementations of writers used to perform write operations into the target
hudi dataset. Current supported writers are
+ * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
*/
public class HoodieTestSuiteWriter {
+ private static Logger log =
LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
+
private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
+ private HoodieWriteConfig writeConfig;
private SparkRDDWriteClient writeClient;
protected HoodieTestSuiteConfig cfg;
private Option<String> lastCheckpoint;
@@ -85,8 +91,9 @@ public class HoodieTestSuiteWriter {
HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath);
+ this.writeConfig = getHoodieClientConfig(cfg, props, schema);
if (!cfg.useDeltaStreamer) {
- this.writeClient = new SparkRDDWriteClient(context,
getHoodieClientConfig(cfg, props, schema), rollbackInflight);
+ this.writeClient = new SparkRDDWriteClient(context, writeConfig,
rollbackInflight);
}
this.cfg = cfg;
this.configuration = jsc.hadoopConfiguration();
@@ -95,6 +102,10 @@ public class HoodieTestSuiteWriter {
this.schema = schema;
}
+ public HoodieWriteConfig getWriteConfig() {
+ return this.writeConfig;
+ }
+
private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg,
Properties props, String schema) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true,
true).withPath(cfg.targetBasePath)
@@ -178,6 +189,20 @@ public class HoodieTestSuiteWriter {
}
}
+ public void inlineClustering() {
+ if (!cfg.useDeltaStreamer) {
+ Option<String> clusteringInstantOpt =
writeClient.scheduleClustering(Option.empty());
+ clusteringInstantOpt.ifPresent(clusteringInstant -> {
+ // inline cluster should auto commit as the user is never given control
+ log.warn("Clustering instant :: " + clusteringInstant);
+ writeClient.cluster(clusteringInstant, true);
+ });
+ } else {
+ // TODO: fix clustering to be done async
https://issues.apache.org/jira/browse/HUDI-1590
+ throw new IllegalArgumentException("Clustering cannot be triggered with
deltastreamer");
+ }
+ }
+
public Option<String> scheduleCompaction(Option<Map<String, String>>
previousCommitExtraMetadata) throws
Exception {
if (!cfg.useDeltaStreamer) {
@@ -189,7 +214,7 @@ public class HoodieTestSuiteWriter {
}
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats>
generatedDataStats,
- Option<String> instantTime) {
+ Option<String> instantTime) {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 329ef16..193bf2c 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -88,6 +88,10 @@ public class DeltaConfig implements Serializable {
private static String REINIT_CONTEXT = "reinitialize_context";
private static String START_PARTITION = "start_partition";
private static String DELETE_INPUT_DATA = "delete_input_data";
+ private static String VALIDATE_HIVE = "validate_hive";
+ private static String EXECUTE_ITR_COUNT = "execute_itr_count";
+ private static String VALIDATE_ARCHIVAL = "validate_archival";
+ private static String VALIDATE_CLEAN = "validate_clean";
private Map<String, Object> configsMap;
@@ -159,6 +163,22 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA,
false).toString());
}
+ public boolean isValidateHive() {
+ return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE,
false).toString());
+ }
+
+ public int getIterationCountToExecute() {
+ return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT,
-1).toString());
+ }
+
+ public boolean validateArchival() {
+ return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_ARCHIVAL,
false).toString());
+ }
+
+ public boolean validateClean() {
+ return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_CLEAN,
false).toString());
+ }
+
public Map<String, Object> getOtherConfigs() {
if (configsMap == null) {
return new HashMap<>();
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
index 17148f5..e4cf84a 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaSparkContext;
/**
* This wraps the context needed for an execution of
- * a {@link DagNode#execute(ExecutionContext)}.
+ * a {@link DagNode#execute(ExecutionContext, int)}.
*/
public class ExecutionContext implements Serializable {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
index 83a8d5e..0f449a8 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
@@ -32,7 +32,7 @@ public class CleanNode extends DagNode<Boolean> {
}
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
log.info("Executing clean node {}", this.getName());
executionContext.getHoodieTestSuiteWriter().getWriteClient(this).clean();
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java
similarity index 55%
copy from
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
copy to
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java
index 83a8d5e..9ee5ca2 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java
@@ -18,23 +18,31 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
- * Represents a clean node in the DAG of operations for a workflow. Clean up
any stale/old files/data lying around
- * (either on file storage or index storage) based on configurations and
CleaningPolicy used.
+ * Triggers inline clustering. Works only with writeClient. Also, add this as
last node and end with validation if possible. As of now, after clustering,
further inserts/upserts may not work since we
+ * call deltaStreamer.
*/
-public class CleanNode extends DagNode<Boolean> {
+public class ClusteringNode extends DagNode<Option<String>> {
- public CleanNode(Config config) {
+ public ClusteringNode(Config config) {
this.config = config;
}
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
- log.info("Executing clean node {}", this.getName());
- executionContext.getHoodieTestSuiteWriter().getWriteClient(this).clean();
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
+ if (config.getIterationCountToExecute() == curItrCount) {
+ try {
+ log.warn("Executing ClusteringNode node {}", this.getName());
+ executionContext.getHoodieTestSuiteWriter().inlineClustering();
+ } catch (Exception e) {
+ log.warn("Exception thrown in ClusteringNode Node :: " + e.getCause()
+ ", msg :: " + e.getMessage());
+ throw e;
+ }
+ }
}
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
index 4c3ad61..7c9090d 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
@@ -40,10 +40,11 @@ public class CompactNode extends
DagNode<JavaRDD<WriteStatus>> {
* if it has one.
*
* @param executionContext Execution context to run this compaction
+ * @param curItrCount cur interation count.
* @throws Exception will be thrown if any error occurred.
*/
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline()
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
index 05ac242..8630243 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
@@ -91,9 +91,10 @@ public abstract class DagNode<O> implements
Comparable<DagNode<O>> {
* Execute the {@link DagNode}.
*
* @param context The context needed for an execution of a node.
+ * @param curItrCount iteration count for executing the node.
* @throws Exception Thrown if the execution failed.
*/
- public abstract void execute(ExecutionContext context) throws Exception;
+ public abstract void execute(ExecutionContext context, int curItrCount)
throws Exception;
public boolean isCompleted() {
return isCompleted;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
index c0671e8..01b8d4c 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java
@@ -36,7 +36,7 @@ public class DelayNode extends DagNode<Boolean> {
}
@Override
- public void execute(ExecutionContext context) throws Exception {
+ public void execute(ExecutionContext context, int curItrCount) throws
Exception {
log.warn("Waiting for "+ delayMins+" mins before going for next test run");
Thread.sleep(delayMins * 60 * 1000);
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index f36b7d4..bdde58a 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -43,7 +43,7 @@ public class HiveQueryNode extends DagNode<Boolean> {
}
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
log.info("Executing hive query node {}", this.getName());
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
HiveSyncConfig hiveSyncConfig = DataSourceUtils
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
index a2b4ee5..97a1bee 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
@@ -35,7 +35,7 @@ public class HiveSyncNode extends DagNode<Boolean> {
}
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
log.info("Executing hive sync node");
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
index 1571349..5ca98cc 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
@@ -39,7 +39,7 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>>
{
}
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
// if the insert node has schema override set, reinitialize the table with
new schema.
if (this.config.getReinitContext()) {
log.info(String.format("Reinitializing table with %s",
this.config.getOtherConfigs().toString()));
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
index 12588ac..1824cb8 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector;
@@ -42,10 +41,11 @@ public class RollbackNode extends
DagNode<Option<HoodieInstant>> {
* Method helps to rollback the last commit instant in the timeline, if it
has one.
*
* @param executionContext Execution context to perform this rollback
+ * @param curItrCount current iteration count.
* @throws Exception will be thrown if any error occurred
*/
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
log.info("Executing rollback node {}", this.getName());
// Can only be done with an instantiation of a new WriteClient hence
cannot be done during DeltaStreamer
// testing for now
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
index 0aa67f4..c54b25a 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
@@ -35,7 +35,7 @@ public class ScheduleCompactNode extends
DagNode<Option<String>> {
}
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
log.info("Executing schedule compact node {}", this.getName());
// Can only be done with an instantiation of a new WriteClient hence
cannot be done during DeltaStreamer
// testing for now
@@ -48,7 +48,7 @@ public class ScheduleCompactNode extends
DagNode<Option<String>> {
HoodieCommitMetadata metadata =
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(metaClient
.getActiveTimeline().getInstantDetails(lastInstant.get()).get(),
HoodieCommitMetadata.class);
Option<String> scheduledInstant =
executionContext.getHoodieTestSuiteWriter().scheduleCompaction(Option.of(metadata
- .getExtraMetadata()));
+ .getExtraMetadata()));
if (scheduledInstant.isPresent()) {
log.info("Scheduling compaction instant {}", scheduledInstant.get());
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
index e06d6de..8efd96c 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
@@ -42,10 +42,11 @@ public class SparkSQLQueryNode extends DagNode<Boolean> {
* Method helps to execute a sparkSql query from a hive table.
*
* @param executionContext Execution context to perform this query.
+ * @param curItrCount current iteration count.
* @throws Exception will be thrown if ant error occurred
*/
@Override
- public void execute(ExecutionContext executionContext) throws Exception {
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
log.info("Executing spark sql query node");
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
new file mode 100644
index 0000000..de8855f
--- /dev/null
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Node to validate data set sanity like total file versions retained, has
cleaning happened, has archival happened, etc.
+ */
+public class ValidateAsyncOperations extends DagNode<Option<String>> {
+
+ private static Logger log =
LoggerFactory.getLogger(ValidateAsyncOperations.class);
+
+ public ValidateAsyncOperations(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
+ if (config.getIterationCountToExecute() == curItrCount) {
+ try {
+ log.warn("Executing ValidateHoodieAsyncOperations node {} with target
base path {} ", this.getName(),
+
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+ String basePath =
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath;
+
+ int maxCommitsRetained =
executionContext.getHoodieTestSuiteWriter().getWriteConfig().getCleanerCommitsRetained()
+ 1;
+ FileSystem fs = FSUtils.getFs(basePath,
executionContext.getHoodieTestSuiteWriter().getConfiguration());
+ Map<String, Integer> fileIdCount = new HashMap<>();
+
+ AtomicInteger maxVal = new AtomicInteger();
+ List<String> partitionPaths =
FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, basePath);
+ for (String partitionPath : partitionPaths) {
+ List<FileStatus> fileStatuses =
Arrays.stream(FSUtils.getAllDataFilesInPartition(fs, new Path(basePath + "/" +
partitionPath))).collect(Collectors.toList());
+ fileStatuses.forEach(entry -> {
+ String fileId = FSUtils.getFileId(entry.getPath().getName());
+ fileIdCount.computeIfAbsent(fileId, k -> 0);
+ fileIdCount.put(fileId, fileIdCount.get(fileId) + 1);
+ maxVal.set(Math.max(maxVal.get(), fileIdCount.get(fileId)));
+ });
+ }
+ if (maxVal.get() > maxCommitsRetained) {
+ throw new AssertionError("Total commits (" + maxVal + ") retained
exceeds max value of " + maxCommitsRetained + ", total commits : ");
+ }
+
+ if (config.validateArchival() || config.validateClean()) {
+ Pattern ARCHIVE_FILE_PATTERN =
+ Pattern.compile("\\.commits_\\.archive\\..*");
+ Pattern CLEAN_FILE_PATTERN =
+ Pattern.compile(".*\\.clean\\..*");
+
+ String metadataPath =
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath +
"/.hoodie";
+ FileStatus[] metaFileStatuses = fs.listStatus(new
Path(metadataPath));
+ boolean archFound = false;
+ boolean cleanFound = false;
+ for (FileStatus fileStatus : metaFileStatuses) {
+ Matcher archFileMatcher =
ARCHIVE_FILE_PATTERN.matcher(fileStatus.getPath().getName());
+ if (archFileMatcher.matches()) {
+ archFound = true;
+ if (config.validateArchival() && !config.validateClean()) {
+ break;
+ }
+ }
+ Matcher cleanFileMatcher =
CLEAN_FILE_PATTERN.matcher(fileStatus.getPath().getName());
+ if (cleanFileMatcher.matches()) {
+ cleanFound = true;
+ if (!config.validateArchival() && config.validateClean()) {
+ break;
+ }
+ }
+ if (config.validateClean() && config.validateArchival()) {
+ if (archFound && cleanFound) {
+ break;
+ }
+ }
+ }
+ if (config.validateArchival() && !archFound) {
+ throw new AssertionError("Archival NotFound in " + metadataPath);
+ }
+
+ if (config.validateClean() && !cleanFound) {
+ throw new AssertionError("Clean commits NotFound in " +
metadataPath);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Exception thrown in ValidateHoodieAsyncOperations Node :: "
+ e.getCause() + ", msg :: " + e.getMessage());
+ throw e;
+ }
+ }
+ }
+}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
index 12fc525..22fea92 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
@@ -62,7 +62,7 @@ public class ValidateDatasetNode extends DagNode<Boolean> {
}
@Override
- public void execute(ExecutionContext context) throws Exception {
+ public void execute(ExecutionContext context, int curItrCount) throws
Exception {
SparkSession session =
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
@@ -111,17 +111,19 @@ public class ValidateDatasetNode extends DagNode<Boolean>
{
throw new AssertionError("Hudi contents does not match contents input
data. ");
}
- String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
- String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
- log.warn("Validating hive table with db : " + database + " and table : " +
tableName);
- Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." +
tableName);
- Dataset<Row> trimmedCowDf =
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
- intersectionDf = inputSnapshotDf.intersect(trimmedDf);
- // the intersected df should be same as inputDf. if not, there is some
mismatch.
- if (inputSnapshotDf.except(intersectionDf).count() != 0) {
- log.error("Data set validation failed for COW hive table. Total count in
hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
- throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ if (config.isValidateHive()) {
+ String database =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
+ String tableName =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
+ log.warn("Validating hive table with db : " + database + " and table : "
+ tableName);
+ Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." +
tableName);
+ Dataset<Row> trimmedCowDf =
cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
+ intersectionDf = inputSnapshotDf.intersect(trimmedDf);
+ // the intersected df should be same as inputDf. if not, there is some
mismatch.
+ if (inputSnapshotDf.except(intersectionDf).count() != 0) {
+ log.error("Data set validation failed for COW hive table. Total count
in hudi " + trimmedCowDf.count() + ", input df count " +
inputSnapshotDf.count());
+ throw new AssertionError("Hudi hive table contents does not match
contents input data. ");
+ }
}
// if delete input data is enabled, erase input data.
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
index 37244c0..e4c4adb 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
@@ -40,9 +40,10 @@ public class ValidateNode<R> extends DagNode {
* was set to true or default, but the parent nodes have not completed yet.
*
* @param executionContext Context to execute this node
+ * @param curItrCount current iteration count.
*/
@Override
- public void execute(ExecutionContext executionContext) {
+ public void execute(ExecutionContext executionContext, int curItrCount) {
if (this.getParentNodes().size() > 0 && (Boolean)
this.config.getOtherConfigs().getOrDefault("WAIT_FOR_PARENTS",
true)) {
for (DagNode node : (List<DagNode>) this.getParentNodes()) {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
index d4074bc..34cb9bc 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
@@ -101,7 +101,8 @@ public class DagScheduler {
while (queue.size() > 0) {
DagNode nodeToExecute = queue.poll();
log.warn("Executing node \"" +
nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " +
nodeToExecute.getConfig());
- futures.add(service.submit(() -> executeNode(nodeToExecute)));
+ int finalCurRound = curRound;
+ futures.add(service.submit(() -> executeNode(nodeToExecute,
finalCurRound)));
if (nodeToExecute.getChildNodes().size() > 0) {
childNodes.addAll(nodeToExecute.getChildNodes());
}
@@ -114,7 +115,7 @@ public class DagScheduler {
} while (queue.size() > 0);
log.info("Finished workloads for round num " + curRound);
if (curRound < workflowDag.getRounds()) {
- new
DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext);
+ new
DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext,
curRound);
}
// After each level, report and flush the metrics
@@ -128,14 +129,14 @@ public class DagScheduler {
*
* @param node The node to be executed
*/
- private void executeNode(DagNode node) {
+ private void executeNode(DagNode node, int curRound) {
if (node.isCompleted()) {
throw new RuntimeException("DagNode already completed! Cannot
re-execute");
}
try {
int repeatCount = node.getConfig().getRepeatCount();
while (repeatCount > 0) {
- node.execute(executionContext);
+ node.execute(executionContext, curRound);
log.info("Finished executing {}", node.getName());
repeatCount--;
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
index 787ec84..00928f3 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
@@ -70,7 +70,7 @@ public class FlexibleSchemaRecordGenerationIterator
implements Iterator<GenericR
if (lastRecord == null) {
GenericRecord record = partitionPathsNonEmpty
?
this.generator.getNewPayloadWithTimestamp(this.firstPartitionPathField)
- : this.generator.getNewPayload();
+ : this.generator.getNewPayload(partitionPathFieldNames);
lastRecord = record;
return record;
} else {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index 510fc49..c900a8e 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -331,7 +331,7 @@ public class GenericRecordFullPayloadGenerator implements
Serializable {
*/
public GenericRecord updateTimestamp(GenericRecord record, String fieldName)
{
long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex %
numDatePartitions, TimeUnit.DAYS);
- record.put(fieldName, System.currentTimeMillis() - delta);
+ record.put(fieldName, (System.currentTimeMillis() - delta)/1000);
return record;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 0ce9aca..9eb5c9d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -309,7 +309,10 @@ public class DeltaSync implements Serializable {
if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
resumeCheckpointStr =
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
}
- } else if
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+ } else if (commitMetadata.getOperationType() ==
WriteOperationType.CLUSTER) {
+ // incase of CLUSTER commit, no checkpoint will be available in
metadata.
+ resumeCheckpointStr = Option.empty();
+ } else if
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this
table "
@@ -373,7 +376,7 @@ public class DeltaSync implements Serializable {
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
LOG.info("No new data, source checkpoint has not changed. Nothing to
commit. Old checkpoint=("
- + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+ + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr +
")");
return null;
}
diff --git a/packaging/hudi-integ-test-bundle/pom.xml
b/packaging/hudi-integ-test-bundle/pom.xml
index f11e95d..db2635a 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -70,6 +70,7 @@
<include>org.apache.hudi:hudi-common</include>
<include>org.apache.hudi:hudi-client-common</include>
<include>org.apache.hudi:hudi-spark-client</include>
+ <include>org.apache.hudi:hudi-spark-common</include>
<include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>