This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new af1128acf9 [HUDI-4084] Add support to test async table services with
integ test suite framework (#5557)
af1128acf9 is described below
commit af1128acf95ade0e52920638c2a7a0badaf53869
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon May 23 23:05:56 2022 -0400
[HUDI-4084] Add support to test async table services with integ test suite
framework (#5557)
* Add support to test async table services with integ test suite framework
* Make await time for validation configurable
---
...treamer-long-running-multi-partitions-hive.yaml | 2 +
...mer-long-running-multi-partitions-metadata.yaml | 2 +
...eltastreamer-long-running-multi-partitions.yaml | 2 +
.../deltastreamer-medium-clustering.yaml | 2 +
...ltastreamer-medium-full-dataset-validation.yaml | 2 +
.../detlastreamer-long-running-example.yaml | 2 +
.../spark-long-running-non-partitioned.yaml | 2 +
.../demo/config/test-suite/spark-long-running.yaml | 2 +
hudi-integ-test/README.md | 50 +++++
.../testsuite/HoodieContinousTestSuiteWriter.java | 157 ++++++++++++++++
...riter.java => HoodieInlineTestSuiteWriter.java} | 118 +-----------
.../hudi/integ/testsuite/HoodieTestSuiteJob.java | 9 +-
.../integ/testsuite/HoodieTestSuiteWriter.java | 206 ++++-----------------
.../integ/testsuite/configuration/DeltaConfig.java | 5 +
.../hudi/integ/testsuite/dag/WriterContext.java | 43 ++++-
.../dag/nodes/BaseValidateDatasetNode.java | 66 ++++++-
.../integ/testsuite/generator/DeltaGenerator.java | 23 ++-
17 files changed, 395 insertions(+), 298 deletions(-)
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 7617220386..8b82415982 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -74,10 +74,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: true
delete_input_data: true
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: second_hive_sync
last_validate:
config:
execute_itr_count: 50
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
index dc1e99a431..031664cd15 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
@@ -62,10 +62,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 30
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
index eca4eac1c7..c23775b2ce 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -62,10 +62,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 50
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
index 81c21a7be6..2fc68596d8 100644
--- a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
@@ -64,10 +64,12 @@ dag_content:
config:
validate_hive: false
delete_input_data: true
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 20
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
index a2d85a7a4d..db7edb8f8f 100644
---
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -65,10 +65,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: false
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 20
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git
a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 1c2f44b060..102807ec43 100644
--- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -62,10 +62,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 50
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git
a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
index dfbfba0a15..947bbdab86 100644
--- a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
+++ b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
@@ -45,10 +45,12 @@ dag_content:
config:
validate_hive: false
delete_input_data: true
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 6
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git a/docker/demo/config/test-suite/spark-long-running.yaml
b/docker/demo/config/test-suite/spark-long-running.yaml
index 00fea43f45..2ffef55781 100644
--- a/docker/demo/config/test-suite/spark-long-running.yaml
+++ b/docker/demo/config/test-suite/spark-long-running.yaml
@@ -46,10 +46,12 @@ dag_content:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 30
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
index 5d26d03a20..687ad9a2a9 100644
--- a/hudi-integ-test/README.md
+++ b/hudi-integ-test/README.md
@@ -593,6 +593,56 @@ Sample spark-submit command to test one delta streamer and
a spark data source w
--use-hudi-data-to-generate-updates
```
+=======
+### Testing async table services
+We can test async table services with deltastreamer using below command. 3
additional arguments are required to test async
+table services comapared to previous command.
+
+```shell
+--continuous \
+--test-continuous-mode \
+--min-sync-interval-seconds 20
+```
+
+Here is the full command:
+```shell
+./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ --conf spark.task.cpus=1 --conf spark.executor.cores=1 \
+--conf spark.task.maxFailures=100 \
+--conf spark.memory.fraction=0.4 \
+--conf spark.rdd.compress=true \
+--conf spark.kryoserializer.buffer.max=2000m \
+--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.memory.storageFraction=0.1 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.sql.hive.convertMetastoreParquet=false \
+--conf spark.driver.maxResultSize=12g \
+--conf spark.executor.heartbeatInterval=120s \
+--conf spark.network.timeout=600s \
+--conf spark.yarn.max.executor.failures=10 \
+--conf spark.sql.catalogImplementation=hive \
+--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob
<PATH_TO_BUNDLE>/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \
+--source-ordering-field test_suite_source_ordering_field \
+--use-deltastreamer \
+--target-base-path /tmp/hudi/output \
+--input-base-path /tmp/hudi/input \
+--target-table table1 \
+-props file:/tmp/test.properties \
+--schemaprovider-class
org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \
+--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
+--input-file-size 125829120 \
+--workload-yaml-path file:/tmp/simple-deltastreamer.yaml \
+--workload-generator-classname
org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
+--table-type COPY_ON_WRITE \
+--compact-scheduling-minshare 1 \
+--clean-input \
+--clean-output \
+--continuous \
+--test-continuous-mode \
+--min-sync-interval-seconds 20
+```
+
+We can use any yaml and properties file w/ above spark-submit command to test
deltastreamer w/ async table services.
## Automated tests for N no of yamls in Local Docker environment
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java
new file mode 100644
index 0000000000..1bf69aaf83
--- /dev/null
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
+import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.rdd.RDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Test suite Writer that assists in testing async table operations with
Deltastreamer continuous mode.
+ *
+ * Sample command
+ * ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --conf spark.task.cpus=1 --conf spark.executor.cores=1 \
+ * --conf spark.task.maxFailures=100 \
+ * --conf spark.memory.fraction=0.4 \
+ * --conf spark.rdd.compress=true \
+ * --conf spark.kryoserializer.buffer.max=2000m \
+ * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+ * --conf spark.memory.storageFraction=0.1 \
+ * --conf spark.shuffle.service.enabled=true \
+ * --conf spark.sql.hive.convertMetastoreParquet=false \
+ * --conf spark.driver.maxResultSize=12g \
+ * --conf spark.executor.heartbeatInterval=120s \
+ * --conf spark.network.timeout=600s \
+ * --conf spark.yarn.max.executor.failures=10 \
+ * --conf spark.sql.catalogImplementation=hive \
+ * --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob
<PATH_TO_BUNDLE>/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \
+ * --source-ordering-field test_suite_source_ordering_field \
+ * --use-deltastreamer \
+ * --target-base-path /tmp/hudi/output \
+ * --input-base-path /tmp/hudi/input \
+ * --target-table table1 \
+ * -props file:/tmp/test.properties \
+ * --schemaprovider-class
org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \
+ * --source-class org.apache.hudi.utilities.sources.AvroDFSSource \
+ * --input-file-size 125829120 \
+ * --workload-yaml-path file:/tmp/simple-deltastreamer.yaml \
+ * --workload-generator-classname
org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
+ * --table-type COPY_ON_WRITE \
+ * --compact-scheduling-minshare 1 \
+ * --clean-input \
+ * --clean-output \
+ * --continuous \
+ * --test-continuous-mode \
+ * --min-sync-interval-seconds 20
+ */
+public class HoodieContinousTestSuiteWriter extends HoodieTestSuiteWriter {
+
+ private static Logger log =
LoggerFactory.getLogger(HoodieContinousTestSuiteWriter.class);
+
+ public HoodieContinousTestSuiteWriter(JavaSparkContext jsc, Properties
props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws
Exception {
+ super(jsc, props, cfg, schema);
+ }
+
+ @Override
+ public void shutdownResources() {
+ log.info("Shutting down deltastreamer gracefully ");
+ this.deltaStreamerWrapper.shutdownGracefully();
+ }
+
+ @Override
+ public RDD<GenericRecord> getNextBatch() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchSource() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Option<String> startCommit() {
+ return null;
+ }
+
+ public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime)
throws Exception {
+ return null;
+ }
+
+ @Override
+ public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime)
throws Exception {
+ return null;
+ }
+
+ @Override
+ public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public void inlineClustering() {
+ }
+
+ @Override
+ public Option<String> scheduleCompaction(Option<Map<String, String>>
previousCommitExtraMetadata) throws
+ Exception {
+ return Option.empty();
+ }
+
+ @Override
+ public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats>
generatedDataStats,
+ Option<String> instantTime) {
+ }
+
+ @Override
+ public void commitCompaction(JavaRDD<WriteStatus> records,
JavaRDD<DeltaWriteStats> generatedDataStats,
+ Option<String> instantTime) throws IOException {
+ }
+}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
similarity index 67%
copy from
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
copy to
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
index a98c7f2aec..63805e71a5 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
@@ -38,10 +38,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
-import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
-import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
-import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
-import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -58,77 +54,26 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
/**
* A writer abstraction for the Hudi test suite. This class wraps different
implementations of writers used to perform write operations into the target
hudi dataset. Current supported writers are
* {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
*/
-public class HoodieTestSuiteWriter implements Serializable {
+public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter {
- private static Logger log =
LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
+ private static Logger log =
LoggerFactory.getLogger(HoodieInlineTestSuiteWriter.class);
- private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
- private HoodieWriteConfig writeConfig;
- private SparkRDDWriteClient writeClient;
- protected HoodieTestSuiteConfig cfg;
- private Option<String> lastCheckpoint;
- private HoodieReadClient hoodieReadClient;
- private Properties props;
- private String schema;
- private transient Configuration configuration;
- private transient JavaSparkContext sparkContext;
- private static Set<String>
VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
- Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(),
ScheduleCompactNode.class.getName()));
private static final String GENERATED_DATA_PATH = "generated.data.path";
- public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props,
HoodieTestSuiteConfig cfg, String schema) throws Exception {
- // We ensure that only 1 instance of HoodieWriteClient is instantiated for
a HoodieTestSuiteWriter
- // This does not instantiate a HoodieWriteClient until a
- // {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)}
is invoked.
- HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
- this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
- this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath);
- this.writeConfig = getHoodieClientConfig(cfg, props, schema);
- if (!cfg.useDeltaStreamer) {
- this.writeClient = new SparkRDDWriteClient(context, writeConfig);
- }
- this.cfg = cfg;
- this.configuration = jsc.hadoopConfiguration();
- this.sparkContext = jsc;
- this.props = props;
- this.schema = schema;
- }
-
- public HoodieWriteConfig getWriteConfig() {
- return this.writeConfig;
+ public HoodieInlineTestSuiteWriter(JavaSparkContext jsc, Properties props,
HoodieTestSuiteConfig cfg, String schema) throws Exception {
+ super(jsc, props, cfg, schema);
}
- private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg,
Properties props, String schema) {
- HoodieWriteConfig.Builder builder =
- HoodieWriteConfig.newBuilder().combineInput(true,
true).withPath(cfg.targetBasePath)
- .withAutoCommit(false)
-
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
-
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
- .build())
- .forTable(cfg.targetTableName)
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
- .withProps(props);
- builder = builder.withSchema(schema);
- return builder.build();
- }
-
- private boolean allowWriteClientAccess(DagNode dagNode) {
- if
(VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE.contains(dagNode.getClass().getName()))
{
- return true;
- }
- return false;
+ public void shutdownResources() {
+ // no-op for non continuous mode test suite writer.
}
public RDD<GenericRecord> getNextBatch() throws Exception {
@@ -139,13 +84,6 @@ public class HoodieTestSuiteWriter implements Serializable {
.getInsertValue(new Schema.Parser().parse(schema)).get()).rdd();
}
- public void getNextBatchForDeletes() throws Exception {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
- inputRDD.collect();
- }
-
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchSource() throws Exception {
return this.deltaStreamerWrapper.fetchSource();
}
@@ -254,7 +192,7 @@ public class HoodieTestSuiteWriter implements Serializable {
}
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats>
generatedDataStats,
- Option<String> instantTime) {
+ Option<String> instantTime) {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
@@ -269,7 +207,7 @@ public class HoodieTestSuiteWriter implements Serializable {
}
public void commitCompaction(JavaRDD<WriteStatus> records,
JavaRDD<DeltaWriteStats> generatedDataStats,
- Option<String> instantTime) throws IOException {
+ Option<String> instantTime) throws IOException {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
@@ -284,44 +222,4 @@ public class HoodieTestSuiteWriter implements Serializable
{
writeClient.commitCompaction(instantTime.get(), metadata,
Option.of(extraMetadata));
}
}
-
- public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws
IllegalAccessException {
- if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) {
- throw new IllegalAccessException("cannot access write client when
testing in deltastreamer mode");
- }
- synchronized (this) {
- if (writeClient == null) {
- this.writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(this.sparkContext), getHoodieClientConfig(cfg, props,
schema));
- }
- }
- return writeClient;
- }
-
- public HoodieDeltaStreamerWrapper getDeltaStreamerWrapper() {
- return deltaStreamerWrapper;
- }
-
- public HoodieTestSuiteConfig getCfg() {
- return cfg;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
-
- public JavaSparkContext getSparkContext() {
- return sparkContext;
- }
-
- public Option<String> getLastCheckpoint() {
- return lastCheckpoint;
- }
-
- public Properties getProps() {
- return props;
- }
-
- public String getSchema() {
- return schema;
- }
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index 2d9f841ae3..5e2f9812ba 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -190,11 +190,12 @@ public class HoodieTestSuiteJob {
}
public void runTestSuite() {
+ WriterContext writerContext = null;
try {
WorkflowDag workflowDag = createWorkflowDag();
log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
long startTime = System.currentTimeMillis();
- WriterContext writerContext = new WriterContext(jsc, props, cfg,
keyGenerator, sparkSession);
+ writerContext = new WriterContext(jsc, props, cfg, keyGenerator,
sparkSession);
writerContext.initContext(jsc);
startOtherServicesIfNeeded(writerContext);
if (this.cfg.saferSchemaEvolution) {
@@ -217,6 +218,9 @@ public class HoodieTestSuiteJob {
log.error("Failed to run Test Suite ", e);
throw new HoodieException("Failed to run Test Suite ", e);
} finally {
+ if (writerContext != null) {
+ writerContext.shutdownResources();
+ }
if (stopJsc) {
stopQuietly();
}
@@ -310,5 +314,8 @@ public class HoodieTestSuiteJob {
@Parameter(names = {"--use-hudi-data-to-generate-updates"}, description =
"Use data from hudi to generate updates for new batches ")
public Boolean useHudiToGenerateUpdates = false;
+
+ @Parameter(names = {"--test-continuous-mode"}, description = "Tests
continuous mode in deltastreamer.")
+ public Boolean testContinousMode = false;
}
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index a98c7f2aec..7a9e122e86 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -18,37 +18,25 @@
package org.apache.hudi.integ.testsuite;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
-import
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
@@ -57,38 +45,31 @@ import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-/**
- * A writer abstraction for the Hudi test suite. This class wraps different
implementations of writers used to perform write operations into the target
hudi dataset. Current supported writers are
- * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
- */
-public class HoodieTestSuiteWriter implements Serializable {
+public abstract class HoodieTestSuiteWriter implements Serializable {
private static Logger log =
LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
- private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
- private HoodieWriteConfig writeConfig;
- private SparkRDDWriteClient writeClient;
- protected HoodieTestSuiteConfig cfg;
- private Option<String> lastCheckpoint;
- private HoodieReadClient hoodieReadClient;
- private Properties props;
- private String schema;
- private transient Configuration configuration;
- private transient JavaSparkContext sparkContext;
- private static Set<String>
VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
+ protected HoodieDeltaStreamerWrapper deltaStreamerWrapper;
+ protected HoodieWriteConfig writeConfig;
+ protected SparkRDDWriteClient writeClient;
+ protected HoodieTestSuiteJob.HoodieTestSuiteConfig cfg;
+ protected Option<String> lastCheckpoint;
+ protected HoodieReadClient hoodieReadClient;
+ protected Properties props;
+ protected String schema;
+ protected transient Configuration configuration;
+ protected transient JavaSparkContext sparkContext;
+ protected static Set<String>
VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(),
ScheduleCompactNode.class.getName()));
- private static final String GENERATED_DATA_PATH = "generated.data.path";
- public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props,
HoodieTestSuiteConfig cfg, String schema) throws Exception {
+ public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props,
HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception {
// We ensure that only 1 instance of HoodieWriteClient is instantiated for
a HoodieTestSuiteWriter
// This does not instantiate a HoodieWriteClient until a
// {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)}
is invoked.
@@ -110,7 +91,7 @@ public class HoodieTestSuiteWriter implements Serializable {
return this.writeConfig;
}
- private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg,
Properties props, String schema) {
+ private HoodieWriteConfig
getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, Properties
props, String schema) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true,
true).withPath(cfg.targetBasePath)
.withAutoCommit(false)
@@ -131,159 +112,35 @@ public class HoodieTestSuiteWriter implements
Serializable {
return false;
}
- public RDD<GenericRecord> getNextBatch() throws Exception {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
- return inputRDD.map(r -> (GenericRecord) ((HoodieAvroRecord) r).getData()
- .getInsertValue(new Schema.Parser().parse(schema)).get()).rdd();
- }
+ public abstract void shutdownResources();
- public void getNextBatchForDeletes() throws Exception {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
- inputRDD.collect();
- }
+ public abstract RDD<GenericRecord> getNextBatch() throws Exception;
- public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchSource() throws Exception {
- return this.deltaStreamerWrapper.fetchSource();
- }
+ public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchSource() throws Exception ;
- public Option<String> startCommit() {
- if (cfg.useDeltaStreamer) {
- return Option.of(HoodieActiveTimeline.createNewInstantTime());
- } else {
- return Option.of(writeClient.startCommit());
- }
- }
+ public abstract Option<String> startCommit();
- public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws
Exception {
- if (cfg.useDeltaStreamer) {
- return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT);
- } else {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- return writeClient.upsert(nextBatch.getRight().getRight(),
instantTime.get());
- }
- }
+ public abstract JavaRDD<WriteStatus> upsert(Option<String> instantTime)
throws Exception;
- public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws
Exception {
- if (cfg.useDeltaStreamer) {
- return deltaStreamerWrapper.insert();
- } else {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- return writeClient.insert(nextBatch.getRight().getRight(),
instantTime.get());
- }
- }
+ public abstract JavaRDD<WriteStatus> insert(Option<String> instantTime)
throws Exception;
- public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime)
throws Exception {
- if (cfg.useDeltaStreamer) {
- return deltaStreamerWrapper.insertOverwrite();
- } else {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- return writeClient.insertOverwrite(nextBatch.getRight().getRight(),
instantTime.get()).getWriteStatuses();
- }
- }
+ public abstract JavaRDD<WriteStatus> insertOverwrite(Option<String>
instantTime) throws Exception;
- public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime)
throws Exception {
- if (cfg.useDeltaStreamer) {
- return deltaStreamerWrapper.insertOverwriteTable();
- } else {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- return writeClient.insertOverwriteTable(nextBatch.getRight().getRight(),
instantTime.get()).getWriteStatuses();
- }
- }
+ public abstract JavaRDD<WriteStatus> insertOverwriteTable(Option<String>
instantTime) throws Exception;
- public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws
Exception {
- if (cfg.useDeltaStreamer) {
- return deltaStreamerWrapper.bulkInsert();
- } else {
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch =
fetchSource();
- lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
- return writeClient.bulkInsert(nextBatch.getRight().getRight(),
instantTime.get());
- }
- }
+ public abstract JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime)
throws Exception;
- public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws
Exception {
- if (cfg.useDeltaStreamer) {
- return deltaStreamerWrapper.compact();
- } else {
- if (!instantTime.isPresent()) {
- Option<Pair<String, HoodieCompactionPlan>> compactionPlanPair = Option
- .fromJavaOptional(hoodieReadClient.getPendingCompactions()
- .stream().findFirst());
- if (compactionPlanPair.isPresent()) {
- instantTime = Option.of(compactionPlanPair.get().getLeft());
- }
- }
- if (instantTime.isPresent()) {
- HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
writeClient.compact(instantTime.get());
- return compactionMetadata.getWriteStatuses();
- } else {
- return null;
- }
- }
- }
+ public abstract JavaRDD<WriteStatus> compact(Option<String> instantTime)
throws Exception;
- public void inlineClustering() {
- if (!cfg.useDeltaStreamer) {
- Option<String> clusteringInstantOpt =
writeClient.scheduleClustering(Option.empty());
- clusteringInstantOpt.ifPresent(clusteringInstant -> {
- // inline cluster should auto commit as the user is never given control
- log.warn("Clustering instant :: " + clusteringInstant);
- writeClient.cluster(clusteringInstant, true);
- });
- } else {
- // TODO: fix clustering to be done async
https://issues.apache.org/jira/browse/HUDI-1590
- throw new IllegalArgumentException("Clustering cannot be triggered with
deltastreamer");
- }
- }
+ public abstract void inlineClustering() throws Exception ;
- public Option<String> scheduleCompaction(Option<Map<String, String>>
previousCommitExtraMetadata) throws
- Exception {
- if (cfg.useDeltaStreamer) {
- deltaStreamerWrapper.scheduleCompact();
- return Option.empty();
- } else {
- return writeClient.scheduleCompaction(previousCommitExtraMetadata);
- }
- }
+ public abstract Option<String> scheduleCompaction(Option<Map<String,
String>> previousCommitExtraMetadata) throws Exception;
- public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats>
generatedDataStats,
- Option<String> instantTime) {
- if (!cfg.useDeltaStreamer) {
- Map<String, String> extraMetadata = new HashMap<>();
- /** Store the checkpoint in the commit metadata just like
- * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD,
Option)} **/
- extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY,
lastCheckpoint.get());
- if (generatedDataStats != null && generatedDataStats.count() > 1) {
- // Just stores the path where this batch of data is generated to
- extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s ->
s.getFilePath()).collect().get(0));
- }
- writeClient.commit(instantTime.get(), records, Option.of(extraMetadata));
- }
- }
+ public abstract void commit(JavaRDD<WriteStatus> records,
JavaRDD<DeltaWriteStats> generatedDataStats,
+ Option<String> instantTime);
- public void commitCompaction(JavaRDD<WriteStatus> records,
JavaRDD<DeltaWriteStats> generatedDataStats,
- Option<String> instantTime) throws IOException {
- if (!cfg.useDeltaStreamer) {
- Map<String, String> extraMetadata = new HashMap<>();
- /** Store the checkpoint in the commit metadata just like
- * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD,
Option)} **/
- extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY,
lastCheckpoint.get());
- if (generatedDataStats != null && generatedDataStats.count() > 1) {
- // Just stores the path where this batch of data is generated to
- extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s ->
s.getFilePath()).collect().get(0));
- }
- HoodieSparkTable<HoodieRecordPayload> table =
HoodieSparkTable.create(writeClient.getConfig(),
writeClient.getEngineContext());
- HoodieCommitMetadata metadata =
CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(),
HoodieJavaRDD.of(records), writeClient.getConfig().getSchema());
- writeClient.commitCompaction(instantTime.get(), metadata,
Option.of(extraMetadata));
- }
- }
+ public abstract void commitCompaction(JavaRDD<WriteStatus> records,
JavaRDD<DeltaWriteStats> generatedDataStats,
+ Option<String> instantTime) throws
Exception;
public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws
IllegalAccessException {
if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) {
@@ -301,7 +158,7 @@ public class HoodieTestSuiteWriter implements Serializable {
return deltaStreamerWrapper;
}
- public HoodieTestSuiteConfig getCfg() {
+ public HoodieTestSuiteJob.HoodieTestSuiteConfig getCfg() {
return cfg;
}
@@ -325,3 +182,4 @@ public class HoodieTestSuiteWriter implements Serializable {
return schema;
}
}
+
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 1578e86be4..a781d19cb7 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -103,6 +103,7 @@ public class DeltaConfig implements Serializable {
private static String DELETE_INPUT_DATA_EXCEPT_LATEST =
"delete_input_data_except_latest";
private static String PARTITIONS_TO_DELETE = "partitions_to_delete";
private static String INPUT_PARTITIONS_TO_SKIP_VALIDATE =
"input_partitions_to_skip_validate";
+ private static String MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS =
"max_wait_time_for_deltastreamer_catch_up_ms";
// Spark SQL Create Table
private static String TABLE_TYPE = "table_type";
@@ -253,6 +254,10 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING,
false).toString());
}
+ public long maxWaitTimeForDeltastreamerToCatchupMs() {
+ return
Long.valueOf(configsMap.getOrDefault(MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS,
5 * 60 * 1000).toString());
+ }
+
public Option<String> getTableType() {
return !configsMap.containsKey(TABLE_TYPE) ? Option.empty()
: Option.of(configsMap.get(TABLE_TYPE).toString());
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
index d31ef195ec..83b5751c86 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -21,7 +21,9 @@ package org.apache.hudi.integ.testsuite.dag;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.integ.testsuite.HoodieContinousTestSuiteWriter;
import
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
+import org.apache.hudi.integ.testsuite.HoodieInlineTestSuiteWriter;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
@@ -37,6 +39,8 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* WriterContext wraps the delta writer/data generator related configuration
needed to init/reinit.
@@ -53,6 +57,7 @@ public class WriterContext {
private BuiltinKeyGenerator keyGenerator;
private transient SparkSession sparkSession;
private transient JavaSparkContext jsc;
+ private ExecutorService executorService;
public WriterContext(JavaSparkContext jsc, TypedProperties props,
HoodieTestSuiteConfig cfg,
BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) {
@@ -67,7 +72,8 @@ public class WriterContext {
try {
this.schemaProvider =
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
String schemaStr = schemaProvider.getSourceSchema().toString();
- this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg,
schemaStr);
+ this.hoodieTestSuiteWriter = (cfg.testContinousMode &&
cfg.useDeltaStreamer) ? new HoodieContinousTestSuiteWriter(jsc, props, cfg,
schemaStr)
+ : new HoodieInlineTestSuiteWriter(jsc, props, cfg, schemaStr);
int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism :
jsc.defaultParallelism();
this.deltaGenerator = new DeltaGenerator(
new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName),
DeltaInputType.valueOf(cfg.inputFormatName),
@@ -75,6 +81,10 @@ public class WriterContext {
schemaStr, cfg.limitFileSize, inputParallelism,
cfg.deleteOldInput, cfg.useHudiToGenerateUpdates),
jsc, sparkSession, schemaStr, keyGenerator);
log.info(String.format("Initialized writerContext with: %s", schemaStr));
+ if (cfg.testContinousMode) {
+ executorService = Executors.newFixedThreadPool(1);
+ executorService.execute(new
TestSuiteWriterRunnable(hoodieTestSuiteWriter));
+ }
} catch (Exception e) {
throw new HoodieException("Failed to reinitialize writerContext", e);
}
@@ -113,4 +123,35 @@ public class WriterContext {
public SparkSession getSparkSession() {
return sparkSession;
}
+
+ public void shutdownResources() {
+ this.hoodieTestSuiteWriter.shutdownResources();
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
+
+ /**
+ * TestSuiteWriterRunnable to spin up a thread to execute deltastreamer with
async table services.
+ */
+ class TestSuiteWriterRunnable implements Runnable {
+ private HoodieTestSuiteWriter hoodieTestSuiteWriter;
+
+ TestSuiteWriterRunnable(HoodieTestSuiteWriter hoodieTestSuiteWriter) {
+ this.hoodieTestSuiteWriter = hoodieTestSuiteWriter;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(20000);
+ log.info("Starting continuous sync with deltastreamer ");
+ hoodieTestSuiteWriter.getDeltaStreamerWrapper().sync();
+ log.info("Completed continuous sync with deltastreamer ");
+ } catch (Exception e) {
+ log.error("Deltastreamer failed in continuous mode " + e.getMessage());
+ throw new HoodieException("Shutting down deltastreamer in continuous
mode failed ", e);
+ }
+ }
+ }
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
index a0ebdc5754..15c209e475 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java
@@ -20,10 +20,17 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,6 +47,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,6 +57,8 @@ import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
+import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
/**
* This nodes validates contents from input path are in tact with Hudi. By
default no configs are required for this node. But there is an
* optional config "delete_input_data" that you can set for this node. If set,
once validation completes, contents from inputPath are deleted. This will come
in handy for long running test suites.
@@ -78,6 +90,12 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
(itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) ==
0))) {
+ FileSystem fs = new
Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath)
+
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
+ if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) {
+ awaitUntilDeltaStreamerCaughtUp(context,
context.getHoodieTestSuiteWriter().getCfg().targetBasePath, fs,
+ context.getHoodieTestSuiteWriter().getCfg().inputBasePath);
+ }
SparkSession session =
SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
// todo: Fix partitioning schemes. For now, assumes data based
partitioning.
String inputPath =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
@@ -85,8 +103,6 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
// listing batches to be validated
String inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
if (log.isDebugEnabled()) {
- FileSystem fs = new Path(inputPathStr)
-
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
log.info("fileStatuses length: " + fileStatuses.length);
for (FileStatus fileStatus : fileStatuses) {
@@ -145,8 +161,6 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
if (config.isDeleteInputData()) {
// clean up input data for current group of writes.
inputPathStr =
context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
- FileSystem fs = new Path(inputPathStr)
-
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
for (FileStatus fileStatus : fileStatuses) {
log.debug("Micro batch to be deleted " +
fileStatus.getPath().toString());
@@ -157,6 +171,50 @@ public abstract class BaseValidateDatasetNode extends
DagNode<Boolean> {
}
}
+ private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context,
String hudiTablePath, FileSystem fs, String inputPath) throws IOException,
InterruptedException {
+ HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new
Configuration(fs.getConf())).setBasePath(hudiTablePath).build();
+ HoodieTimeline commitTimeline =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ Option<String> latestCheckpoint = getLatestCheckpoint(commitTimeline);
+ FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
+ List<FileStatus> subDirList = Arrays.asList(subDirs);
+ subDirList.sort(Comparator.comparingLong(entry ->
Long.parseLong(entry.getPath().getName())));
+ String latestSubDir = subDirList.get(subDirList.size()
-1).getPath().getName();
+ log.info("Latest sub directory in input path " + latestSubDir + ", latest
checkpoint from deltastreamer " +
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
+ long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs();
+ long waitedSoFar = 0;
+ while (!(latestCheckpoint.isPresent() &&
latestCheckpoint.get().equals(latestSubDir))) {
+ log.warn("Sleeping for 20 secs awaiting for deltastreamer to catch up
with ingested data");
+ Thread.sleep(20000);
+ meta.reloadActiveTimeline();
+ commitTimeline =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ latestCheckpoint = getLatestCheckpoint(commitTimeline);
+ waitedSoFar += 20000;
+ if (waitedSoFar >= maxWaitTime) {
+ throw new AssertionError("DeltaStreamer has not caught up after 5 mins
of wait time. Last known checkpoint " +
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") +
", expected checkpoint to have caugth up " + latestSubDir);
+ }
+ log.info("Latest sub directory in input path " + latestSubDir + ",
latest checkpoint from deltastreamer " +
+ (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
+ }
+ }
+
+ private Option<String> getLatestCheckpoint(HoodieTimeline timeline) {
+ return (Option<String>) timeline.getReverseOrderedInstants().map(instant
-> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(timeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
+ return Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
+ } else {
+ return Option.empty();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to parse HoodieCommitMetadata for
" + instant.toString(), e);
+ }
+ }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+ }
+
private Dataset<Row> getInputDf(ExecutionContext context, SparkSession
session, String inputPath) {
String recordKeyField =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
String partitionPathField =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index c30be2a2a5..20e12e9030 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -216,15 +216,22 @@ public class DeltaGenerator implements Serializable {
adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc,
config.getNumRecordsDelete());
} else {
- deltaInputReader =
- new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig)
deltaOutputConfig).getDatasetOutputPath(),
- schemaStr);
- if (config.getFractionUpsertPerFile() > 0) {
- adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(),
config.getNumUpsertFiles(),
- config.getFractionUpsertPerFile());
+ if (((DFSDeltaConfig)
deltaOutputConfig).shouldUseHudiToGenerateUpdates()) {
+ deltaInputReader =
+ new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig)
deltaOutputConfig).getDatasetOutputPath(),
+ schemaStr);
+ if (config.getFractionUpsertPerFile() > 0) {
+ adjustedRDD =
deltaInputReader.read(config.getNumDeletePartitions(),
config.getNumUpsertFiles(),
+ config.getFractionUpsertPerFile());
+ } else {
+ adjustedRDD =
deltaInputReader.read(config.getNumDeletePartitions(),
config.getNumUpsertFiles(), config
+ .getNumRecordsDelete());
+ }
} else {
- adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(),
config.getNumUpsertFiles(), config
- .getNumRecordsDelete());
+ deltaInputReader = new DFSAvroDeltaInputReader(sparkSession,
schemaStr,
+ ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(),
Option.empty(), Option.empty());
+ adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete());
+ adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc,
config.getNumRecordsDelete());
}
}