This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7058d12e748 [MINOR] Fix zookeeper session expiration bug (#10671)
7058d12e748 is described below
commit 7058d12e74832dc420975269b698782add5e4fff
Author: Lin Liu <[email protected]>
AuthorDate: Thu Feb 15 16:38:29 2024 -0800
[MINOR] Fix zookeeper session expiration bug (#10671)
---
.../TestDFSHoodieTestSuiteWriterAdapter.java | 2 +-
.../integ/testsuite/TestFileDeltaInputWriter.java | 2 +-
.../testsuite/job/TestHoodieTestSuiteJob.java | 3 +-
.../reader/TestDFSAvroDeltaInputReader.java | 2 +-
.../reader/TestDFSHoodieDatasetInputReader.java | 3 +-
.../callback/TestKafkaCallbackProvider.java | 17 +++++++--
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 13 +++----
.../deltastreamer/TestHoodieDeltaStreamer.java | 4 +--
...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 1 -
.../schema/TestFilebasedSchemaProvider.java | 2 +-
.../utilities/sources/BaseTestKafkaSource.java | 14 ++++----
.../utilities/sources/TestAvroKafkaSource.java | 17 +++++----
.../utilities/sources/TestSqlFileBasedSource.java | 40 ++++++++++++++--------
.../hudi/utilities/sources/TestSqlSource.java | 2 +-
.../debezium/TestAbstractDebeziumSource.java | 18 ++++++++--
.../sources/helpers/TestKafkaOffsetGen.java | 14 ++++----
.../utilities/testutils/UtilitiesTestBase.java | 11 +++++-
.../AbstractCloudObjectsSourceTestBase.java | 2 +-
.../transform/TestSqlFileBasedTransformer.java | 36 ++++++++++---------
19 files changed, 129 insertions(+), 74 deletions(-)
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index 70430328553..f2ec458bf2d 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -69,7 +69,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends
UtilitiesTestBase {
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
index 4f99292b3fd..d8e54984367 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
@@ -63,7 +63,7 @@ public class TestFileDeltaInputWriter extends
UtilitiesTestBase {
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index 087ffb8e400..9a4a2eee619 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.util.UUID;
import java.util.stream.Stream;
@@ -134,7 +135,7 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
index 089a9d9fb55..8f93a82865a 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
@@ -48,7 +48,7 @@ public class TestDFSAvroDeltaInputReader extends
UtilitiesTestBase {
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
index 3a11de9f0b5..40e1f58698d 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
@@ -55,7 +56,7 @@ public class TestDFSHoodieDatasetInputReader extends
UtilitiesTestBase {
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
index 70897aecf30..e2c3c86cd5b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
@@ -30,9 +30,12 @@ import
org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackCo
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.List;
import java.util.UUID;
@@ -43,19 +46,27 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class TestKafkaCallbackProvider extends UtilitiesTestBase {
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
- private static KafkaTestUtils testUtils;
+ private KafkaTestUtils testUtils;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices();
+ }
+
+ @BeforeEach
+ public void setup() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
+ @AfterEach
+ public void tearDown() {
+ testUtils.teardown();
+ }
+
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
- testUtils.teardown();
}
@Test
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 4f2208c7801..864c3502825 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -49,6 +49,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
@@ -127,14 +128,15 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
static final String HOODIE_CONF_PARAM = "--hoodie-conf";
static final String HOODIE_CONF_VALUE1 =
"hoodie.datasource.hive_sync.table=test_table";
static final String HOODIE_CONF_VALUE2 =
"hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
- public static KafkaTestUtils testUtils;
protected static String topicName;
protected static String defaultSchemaProviderClassName =
FilebasedSchemaProvider.class.getName();
protected static int testNum = 1;
Map<String, String> hudiOpts = new HashMap<>();
+ public KafkaTestUtils testUtils;
- protected static void prepareTestSetup() throws IOException {
+ @BeforeEach
+ protected void prepareTestSetup() throws IOException {
PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
ORC_SOURCE_ROOT = basePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
@@ -242,16 +244,15 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, true, false);
- prepareTestSetup();
}
@AfterAll
- public static void tearDown() {
- cleanupKafkaTestUtils();
+ public static void tearDown() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
- public static void cleanupKafkaTestUtils() {
+ @AfterEach
+ public void cleanupKafkaTestUtils() {
if (testUtils != null) {
testUtils.teardown();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 449cf2bb98a..4bfbee6e0e0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1705,11 +1705,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertEquals(1000, c);
}
- private static void prepareJsonKafkaDFSFiles(int numRecords, boolean
createTopic, String topicName) {
+ private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName) {
prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
}
- private static void prepareJsonKafkaDFSFiles(int numRecords, boolean
createTopic, String topicName, int numPartitions) {
+ private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic,
String topicName, int numPartitions) {
if (createTopic) {
try {
testUtils.createTopic(topicName, numPartitions);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 6168912b003..3670c5b6007 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -136,7 +136,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
@AfterAll
static void teardownAll() {
defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
- HoodieDeltaStreamerTestBase.cleanupKafkaTestUtils();
}
protected HoodieStreamer deltaStreamer;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
index 389282ddcdb..945ce6f774a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
@@ -51,7 +51,7 @@ public class TestFilebasedSchemaProvider extends
UtilitiesTestBase {
}
@AfterAll
- public static void cleanUpUtilitiesTestServices() {
+ public static void cleanUpUtilitiesTestServices() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index f340120ca8d..b5cbf2738f6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -38,8 +38,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -58,20 +58,20 @@ import static org.mockito.Mockito.mock;
*/
abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness {
protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
- protected static KafkaTestUtils testUtils;
protected final HoodieIngestionMetrics metrics =
mock(HoodieIngestionMetrics.class);
protected SchemaProvider schemaProvider;
+ protected KafkaTestUtils testUtils;
- @BeforeAll
- public static void initClass() {
+ @BeforeEach
+ public void initClass() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterAll
- public static void cleanupClass() {
+ @AfterEach
+ public void cleanupClass() {
testUtils.teardown();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 3daa9505538..558181f4258 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -45,8 +45,9 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -68,8 +69,6 @@ import static org.mockito.Mockito.mock;
public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_";
- protected static KafkaTestUtils testUtils;
-
protected static HoodieTestDataGenerator dataGen;
protected static String SCHEMA_PATH = "/tmp/schema_file.avsc";
@@ -78,15 +77,21 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
protected SchemaProvider schemaProvider;
+ protected KafkaTestUtils testUtils;
+
@BeforeAll
public static void initClass() {
- testUtils = new KafkaTestUtils();
dataGen = new HoodieTestDataGenerator(0xDEED);
+ }
+
+ @BeforeEach
+ public void setup() {
+ testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterAll
- public static void cleanupClass() {
+ @AfterEach
+ public void tearDown() {
testUtils.teardown();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
index c718e7a12e8..3f106fce994 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
@@ -28,7 +28,6 @@ import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.AnalysisException;
@@ -64,17 +63,10 @@ public class TestSqlFileBasedSource extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, true, false);
- FileSystem fs = UtilitiesTestBase.fs;
- UtilitiesTestBase.Helpers.copyToDFS(
- "streamer-config/sql-file-based-source.sql", fs,
- UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
- UtilitiesTestBase.Helpers.copyToDFS(
- "streamer-config/sql-file-based-source-invalid-table.sql", fs,
- UtilitiesTestBase.basePath +
"/sql-file-based-source-invalid-table.sql");
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@@ -113,7 +105,11 @@ public class TestSqlFileBasedSource extends
UtilitiesTestBase {
* @throws IOException
*/
@Test
- public void testSqlFileBasedSourceAvroFormat() {
+ public void testSqlFileBasedSourceAvroFormat() throws IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-based-source.sql", fs,
+ UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath +
"/sql-file-based-source.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession,
schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -136,7 +132,11 @@ public class TestSqlFileBasedSource extends
UtilitiesTestBase {
* @throws IOException
*/
@Test
- public void testSqlFileBasedSourceRowFormat() {
+ public void testSqlFileBasedSourceRowFormat() throws IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-based-source.sql", fs,
+ UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath +
"/sql-file-based-source.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession,
schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -154,7 +154,11 @@ public class TestSqlFileBasedSource extends
UtilitiesTestBase {
* @throws IOException
*/
@Test
- public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() {
+ public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() throws
IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-based-source.sql", fs,
+ UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath +
"/sql-file-based-source.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession,
schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -171,7 +175,11 @@ public class TestSqlFileBasedSource extends
UtilitiesTestBase {
* @throws IOException
*/
@Test
- public void testSqlFileBasedSourceInvalidTable() {
+ public void testSqlFileBasedSourceInvalidTable() throws IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-based-source-invalid-table.sql", fs,
+ UtilitiesTestBase.basePath +
"/sql-file-based-source-invalid-table.sql");
+
props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath +
"/sql-file-based-source-invalid-table.sql");
sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession,
schemaProvider);
sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -182,7 +190,11 @@ public class TestSqlFileBasedSource extends
UtilitiesTestBase {
}
@Test
- public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() {
+ public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint()
throws IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-based-source.sql", fs,
+ UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath +
"/sql-file-based-source.sql");
props.setProperty(sqlFileSourceConfigEmitChkPointConf, "true");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index 37ab549ea76..64578f3bae3 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -64,7 +64,7 @@ public class TestSqlSource extends UtilitiesTestBase {
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index e6aa9d8862e..c9f46144e96 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -39,11 +39,14 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.util.UUID;
import java.util.stream.Stream;
@@ -57,19 +60,28 @@ public abstract class TestAbstractDebeziumSource extends
UtilitiesTestBase {
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
private final HoodieIngestionMetrics metrics =
mock(HoodieIngestionMetrics.class);
- private static KafkaTestUtils testUtils;
+ private KafkaTestUtils testUtils;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices();
+ }
+
+ @BeforeEach
+ public void setUpKafkaTestUtils() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
+ @AfterEach
+ public void tearDownKafkaTestUtils() {
+ testUtils.teardown();
+ testUtils = null;
+ }
+
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
- testUtils.teardown();
}
private TypedProperties createPropsForJsonSource() {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index e3d2ec5a602..6ad6a4c09db 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -31,8 +31,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.UUID;
@@ -49,17 +49,17 @@ import static org.mockito.Mockito.mock;
public class TestKafkaOffsetGen {
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
- private static KafkaTestUtils testUtils;
private HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
+ private KafkaTestUtils testUtils;
- @BeforeAll
- public static void setup() throws Exception {
+ @BeforeEach
+ public void setup() throws Exception {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterAll
- public static void teardown() throws Exception {
+ @AfterEach
+ public void teardown() throws Exception {
testUtils.teardown();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index c7ca90b9259..89037ff507d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -163,7 +163,12 @@ public class UtilitiesTestBase {
}
@AfterAll
- public static void cleanUpUtilitiesTestServices() {
+ public static void cleanUpUtilitiesTestServices() throws IOException {
+ if (fs != null) {
+ fs.delete(new Path(basePath), true);
+ fs.close();
+ fs = null;
+ }
if (hdfsTestService != null) {
hdfsTestService.stop();
hdfsTestService = null;
@@ -196,6 +201,10 @@ public class UtilitiesTestBase {
@BeforeEach
public void setup() throws Exception {
TestDataSource.initDataGen();
+ // This prevents test methods from using existing files or folders.
+ if (fs != null) {
+ fs.delete(new Path(basePath), true);
+ }
}
@AfterEach
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
index bdb6c85ce72..11a00ebeb2c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -58,7 +58,7 @@ public abstract class AbstractCloudObjectsSourceTestBase
extends UtilitiesTestBa
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index b3cbe1d6108..1b0cc7f52a6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -51,22 +52,10 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices();
- UtilitiesTestBase.Helpers.copyToDFS(
- "streamer-config/sql-file-transformer.sql",
- UtilitiesTestBase.fs,
- UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
- UtilitiesTestBase.Helpers.copyToDFS(
- "streamer-config/sql-file-transformer-invalid.sql",
- UtilitiesTestBase.fs,
- UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
- UtilitiesTestBase.Helpers.copyToDFS(
- "streamer-config/sql-file-transformer-empty.sql",
- UtilitiesTestBase.fs,
- UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@@ -106,7 +95,12 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
}
@Test
- public void testSqlFileBasedTransformerInvalidSQL() {
+ public void testSqlFileBasedTransformerInvalidSQL() throws IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-transformer-invalid.sql",
+ UtilitiesTestBase.fs,
+ UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
+
// Test if the SQL file based transformer works as expected for the
invalid SQL statements.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
@@ -117,7 +111,12 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
}
@Test
- public void testSqlFileBasedTransformerEmptyDataset() {
+ public void testSqlFileBasedTransformerEmptyDataset() throws IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-transformer-empty.sql",
+ UtilitiesTestBase.fs,
+ UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
+
// Test if the SQL file based transformer works as expected for the empty
SQL statements.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
@@ -129,7 +128,12 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
}
@Test
- public void testSqlFileBasedTransformer() {
+ public void testSqlFileBasedTransformer() throws IOException {
+ UtilitiesTestBase.Helpers.copyToDFS(
+ "streamer-config/sql-file-transformer.sql",
+ UtilitiesTestBase.fs,
+ UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
+
// Test if the SQL file based transformer works as expected for the
correct input.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",