This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 35d9093921 [HUDI-5198] Reduce test run time in hudi-utilities and
locking related tests (#7180)
35d9093921 is described below
commit 35d90939211c2c846d7c7441fc7ea20bbd788a4c
Author: Tim Brown <[email protected]>
AuthorDate: Sat Nov 12 00:25:12 2022 -0800
[HUDI-5198] Reduce test run time in hudi-utilities and locking related
tests (#7180)
---
.../client/transaction/TestTransactionManager.java | 5 +++
.../hudi/testutils/HoodieClientTestUtils.java | 4 +-
.../SparkClientFunctionalTestHarness.java | 13 +++---
.../hudi/utilities/TestSchemaPostProcessor.java | 12 ++++--
.../hudi/utilities/sources/TestJdbcSource.java | 6 +++
.../debezium/TestAbstractDebeziumSource.java | 26 ++++--------
.../sources/helpers/TestKafkaOffsetGen.java | 48 +++++++++++-----------
.../utilities/testutils/UtilitiesTestBase.java | 37 +++++++++--------
.../sources/AbstractDFSSourceTestBase.java | 12 ------
9 files changed, 80 insertions(+), 83 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index afbedc0de3..4222754a19 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -63,6 +63,11 @@ public class TestTransactionManager extends
HoodieCommonTestHarness {
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(InProcessLockProvider.class)
+ .withLockWaitTimeInMillis(50L)
+ .withNumRetries(2)
+ .withRetryWaitTimeInMillis(10L)
+ .withClientNumRetries(2)
+ .withClientRetryWaitTimeInMillis(10L)
.build())
.build();
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 458af3ad9e..6cc936df98 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -92,7 +92,9 @@ public class HoodieClientTestUtils {
*/
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
- .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
+ .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").setMaster("local[4]")
+ .set("spark.sql.shuffle.partitions", "4")
+ .set("spark.default.parallelism", "4");
String evlogDir = System.getProperty("SPARK_EVLOG_DIR");
if (evlogDir != null) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index cb7b2e6b3c..4684747161 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -19,12 +19,6 @@
package org.apache.hudi.testutils;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
@@ -59,6 +53,13 @@ import
org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index 52413ce938..81217ce904 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -20,13 +20,13 @@ package org.apache.hudi.utilities;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
-import
org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
-import
org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
-import
org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
+import
org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
+import
org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
+import
org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
import
org.apache.hudi.utilities.schema.postprocessor.add.BaseSchemaPostProcessorConfig;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.transform.FlatteningTransformer;
@@ -34,6 +34,7 @@ import
org.apache.hudi.utilities.transform.FlatteningTransformer;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -67,6 +68,11 @@ public class TestSchemaPostProcessor extends
UtilitiesTestBase {
return Stream.of(types).map(Arguments::of);
}
+ @BeforeAll
+ public static void setupOnce() throws Exception {
+ initTestServices();
+ }
+
@Test
public void testPostProcessor() throws IOException {
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
DummySchemaPostProcessor.class.getName());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
index 62aebe3d3e..4c8b264fe1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
@@ -36,6 +36,7 @@ import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,6 +65,11 @@ public class TestJdbcSource extends UtilitiesTestBase {
private static final HoodieTestDataGenerator DATA_GENERATOR = new
HoodieTestDataGenerator();
private static Connection connection;
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ UtilitiesTestBase.initTestServices(false, false, false);
+ }
+
@BeforeEach
public void setup() throws Exception {
super.setup();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index 2ac1b8b0bf..113805d24d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -39,9 +39,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -57,37 +55,27 @@ import static org.mockito.Mockito.mock;
public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase {
- private static final String TEST_TOPIC_NAME = "hoodie_test";
+ private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
private final HoodieDeltaStreamerMetrics metrics =
mock(HoodieDeltaStreamerMetrics.class);
- private KafkaTestUtils testUtils;
+ private static KafkaTestUtils testUtils;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices();
+ testUtils = new KafkaTestUtils();
+ testUtils.setup();
}
@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
- }
-
- @BeforeEach
- public void setup() throws Exception {
- super.setup();
- testUtils = new KafkaTestUtils();
- testUtils.setup();
- }
-
- @AfterEach
- public void teardown() throws Exception {
- super.teardown();
testUtils.teardown();
}
private TypedProperties createPropsForJsonSource() {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.kafka.topic",
TEST_TOPIC_NAME);
+ props.setProperty("hoodie.deltastreamer.source.kafka.topic",
testTopicName);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "earliest");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -115,13 +103,13 @@ public abstract class TestAbstractDebeziumSource extends
UtilitiesTestBase {
String sourceClass = getSourceClass();
// topic setup.
- testUtils.createTopic(TEST_TOPIC_NAME, 2);
+ testUtils.createTopic(testTopicName, 2);
TypedProperties props = createPropsForJsonSource();
SchemaProvider schemaProvider = new MockSchemaRegistryProvider(props, jsc,
this);
SourceFormatAdapter debeziumSource = new
SourceFormatAdapter(UtilHelpers.createSource(sourceClass, props, jsc,
sparkSession, schemaProvider, metrics));
- testUtils.sendMessages(TEST_TOPIC_NAME, new String[]
{generateDebeziumEvent(operation).toString()});
+ testUtils.sendMessages(testTopicName, new String[]
{generateDebeziumEvent(operation).toString()});
InputBatch<Dataset<Row>> fetch =
debeziumSource.fetchNewDataInRowFormat(Option.empty(), 10);
assertEquals(1, fetch.getBatch().get().count());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index eff9b24b2b..c3018bb7ba 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -30,8 +30,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.UUID;
@@ -47,18 +47,18 @@ import static org.mockito.Mockito.mock;
*/
public class TestKafkaOffsetGen {
- private static String TEST_TOPIC_NAME = "hoodie_test";
- private KafkaTestUtils testUtils;
+ private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
+ private static KafkaTestUtils testUtils;
private HoodieDeltaStreamerMetrics metrics =
mock(HoodieDeltaStreamerMetrics.class);
- @BeforeEach
- public void setup() throws Exception {
+ @BeforeAll
+ public static void setup() throws Exception {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterEach
- public void teardown() throws Exception {
+ @AfterAll
+ public static void teardown() throws Exception {
testUtils.teardown();
}
@@ -66,7 +66,7 @@ public class TestKafkaOffsetGen {
TypedProperties props = new TypedProperties();
props.put("hoodie.deltastreamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
props.put("auto.offset.reset", autoOffsetReset);
- props.put("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
+ props.put("hoodie.deltastreamer.source.kafka.topic", testTopicName);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer",
StringDeserializer.class.getName());
@@ -77,8 +77,8 @@ public class TestKafkaOffsetGen {
@Test
public void testGetNextOffsetRangesFromEarliest() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.createTopic(TEST_TOPIC_NAME, 1);
- testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.createTopic(testTopicName, 1);
+ testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
@@ -95,8 +95,8 @@ public class TestKafkaOffsetGen {
@Test
public void testGetNextOffsetRangesFromLatest() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.createTopic(TEST_TOPIC_NAME, 1);
- testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.createTopic(testTopicName, 1);
+ testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, metrics);
assertEquals(1, nextOffsetRanges.length);
@@ -106,10 +106,10 @@ public class TestKafkaOffsetGen {
@Test
public void testGetNextOffsetRangesFromCheckpoint() {
- String lastCheckpointString = TEST_TOPIC_NAME + ",0:250";
+ String lastCheckpointString = testTopicName + ",0:250";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.createTopic(TEST_TOPIC_NAME, 1);
- testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.createTopic(testTopicName, 1);
+ testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.of(lastCheckpointString), 500,
metrics);
@@ -121,8 +121,8 @@ public class TestKafkaOffsetGen {
@Test
public void testGetNextOffsetRangesFromTimestampCheckpointType() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.createTopic(TEST_TOPIC_NAME, 1);
- testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.createTopic(testTopicName, 1);
+ testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("latest", "timestamp"));
@@ -135,8 +135,8 @@ public class TestKafkaOffsetGen {
@Test
public void testGetNextOffsetRangesFromMultiplePartitions() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.createTopic(TEST_TOPIC_NAME, 2);
- testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.createTopic(testTopicName, 2);
+ testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("earliest", "string"));
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 499, metrics);
assertEquals(2, nextOffsetRanges.length);
@@ -149,10 +149,10 @@ public class TestKafkaOffsetGen {
@Test
public void testGetNextOffsetRangesFromGroup() {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- testUtils.createTopic(TEST_TOPIC_NAME, 2);
- testUtils.sendMessages(TEST_TOPIC_NAME,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+ testUtils.createTopic(testTopicName, 2);
+ testUtils.sendMessages(testTopicName,
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
KafkaOffsetGen kafkaOffsetGen = new
KafkaOffsetGen(getConsumerConfigs("group", "string"));
- String lastCheckpointString = TEST_TOPIC_NAME + ",0:250,1:249";
+ String lastCheckpointString = testTopicName + ",0:250,1:249";
kafkaOffsetGen.commitOffsetToKafka(lastCheckpointString);
// don't pass lastCheckpointString as we want to read from group committed
offset
OffsetRange[] nextOffsetRanges =
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
@@ -174,7 +174,7 @@ public class TestKafkaOffsetGen {
public void testCheckTopicExists() {
TypedProperties props = getConsumerConfigs("latest", "string");
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
- testUtils.createTopic(TEST_TOPIC_NAME, 1);
+ testUtils.createTopic(testTopicName, 1);
boolean topicExists = kafkaOffsetGen.checkTopicExists(new
KafkaConsumer(props));
assertTrue(topicExists);
props.put("hoodie.deltastreamer.source.kafka.topic", "random");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 534e14cc73..cc61a35886 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -114,10 +114,10 @@ public class UtilitiesTestBase {
protected static ZookeeperTestService zookeeperTestService;
private static final ObjectMapper MAPPER = new ObjectMapper();
- protected transient JavaSparkContext jsc;
- protected transient HoodieSparkEngineContext context;
- protected transient SparkSession sparkSession;
- protected transient SQLContext sqlContext;
+ protected static JavaSparkContext jsc;
+ protected static HoodieSparkEngineContext context;
+ protected static SparkSession sparkSession;
+ protected static SQLContext sqlContext;
@BeforeAll
public static void setLogLevel() {
@@ -155,6 +155,11 @@ public class UtilitiesTestBase {
zookeeperTestService = new ZookeeperTestService(hadoopConf);
zookeeperTestService.start();
}
+
+ jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() +
"-hoodie", "local[4]");
+ context = new HoodieSparkEngineContext(jsc);
+ sqlContext = new SQLContext(jsc);
+ sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
}
@AfterAll
@@ -175,20 +180,6 @@ public class UtilitiesTestBase {
zookeeperTestService.stop();
zookeeperTestService = null;
}
- }
-
- @BeforeEach
- public void setup() throws Exception {
- TestDataSource.initDataGen();
- jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie",
"local[2]");
- context = new HoodieSparkEngineContext(jsc);
- sqlContext = new SQLContext(jsc);
- sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
- }
-
- @AfterEach
- public void teardown() throws Exception {
- TestDataSource.resetDataGen();
if (jsc != null) {
jsc.stop();
jsc = null;
@@ -202,6 +193,16 @@ public class UtilitiesTestBase {
}
}
+ @BeforeEach
+ public void setup() throws Exception {
+ TestDataSource.initDataGen();
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ TestDataSource.resetDataGen();
+ }
+
/**
* Helper to get hive sync config.
*
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index d49d197fd5..e7843221e0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -37,8 +37,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -65,22 +63,12 @@ public abstract class AbstractDFSSourceTestBase extends
UtilitiesTestBase {
UtilitiesTestBase.initTestServices(true, false, false);
}
- @AfterAll
- public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
- }
-
@BeforeEach
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(),
jsc);
}
- @AfterEach
- public void teardown() throws Exception {
- super.teardown();
- }
-
/**
* Prepares the specific {@link Source} to test, by passing in necessary
configurations.
*