This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7058d12e748 [MINOR] Fix zookeeper session expiration bug (#10671)
7058d12e748 is described below

commit 7058d12e74832dc420975269b698782add5e4fff
Author: Lin Liu <[email protected]>
AuthorDate: Thu Feb 15 16:38:29 2024 -0800

    [MINOR] Fix zookeeper session expiration bug (#10671)
---
 .../TestDFSHoodieTestSuiteWriterAdapter.java       |  2 +-
 .../integ/testsuite/TestFileDeltaInputWriter.java  |  2 +-
 .../testsuite/job/TestHoodieTestSuiteJob.java      |  3 +-
 .../reader/TestDFSAvroDeltaInputReader.java        |  2 +-
 .../reader/TestDFSHoodieDatasetInputReader.java    |  3 +-
 .../callback/TestKafkaCallbackProvider.java        | 17 +++++++--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java | 13 +++----
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  4 +--
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |  1 -
 .../schema/TestFilebasedSchemaProvider.java        |  2 +-
 .../utilities/sources/BaseTestKafkaSource.java     | 14 ++++----
 .../utilities/sources/TestAvroKafkaSource.java     | 17 +++++----
 .../utilities/sources/TestSqlFileBasedSource.java  | 40 ++++++++++++++--------
 .../hudi/utilities/sources/TestSqlSource.java      |  2 +-
 .../debezium/TestAbstractDebeziumSource.java       | 18 ++++++++--
 .../sources/helpers/TestKafkaOffsetGen.java        | 14 ++++----
 .../utilities/testutils/UtilitiesTestBase.java     | 11 +++++-
 .../AbstractCloudObjectsSourceTestBase.java        |  2 +-
 .../transform/TestSqlFileBasedTransformer.java     | 36 ++++++++++---------
 19 files changed, 129 insertions(+), 74 deletions(-)

diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index 70430328553..f2ec458bf2d 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -69,7 +69,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends 
UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
index 4f99292b3fd..d8e54984367 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
@@ -63,7 +63,7 @@ public class TestFileDeltaInputWriter extends 
UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index 087ffb8e400..9a4a2eee619 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.IOException;
 import java.util.UUID;
 import java.util.stream.Stream;
 
@@ -134,7 +135,7 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
index 089a9d9fb55..8f93a82865a 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
@@ -48,7 +48,7 @@ public class TestDFSAvroDeltaInputReader extends 
UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
index 3a11de9f0b5..40e1f58698d 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 
@@ -55,7 +56,7 @@ public class TestDFSHoodieDatasetInputReader extends 
UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
index 70897aecf30..e2c3c86cd5b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
@@ -30,9 +30,12 @@ import 
org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackCo
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 
@@ -43,19 +46,27 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 public class TestKafkaCallbackProvider extends UtilitiesTestBase {
   private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
 
-  private static KafkaTestUtils testUtils;
+  private KafkaTestUtils testUtils;
 
   @BeforeAll
   public static void initClass() throws Exception {
     UtilitiesTestBase.initTestServices();
+  }
+
+  @BeforeEach
+  public void setup() {
     testUtils = new KafkaTestUtils();
     testUtils.setup();
   }
 
+  @AfterEach
+  public void tearDown() {
+    testUtils.teardown();
+  }
+
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
-    testUtils.teardown();
   }
 
   @Test
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 4f2208c7801..864c3502825 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -49,6 +49,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
@@ -127,14 +128,15 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   static final String HOODIE_CONF_PARAM = "--hoodie-conf";
   static final String HOODIE_CONF_VALUE1 = 
"hoodie.datasource.hive_sync.table=test_table";
   static final String HOODIE_CONF_VALUE2 = 
"hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
-  public static KafkaTestUtils testUtils;
   protected static String topicName;
   protected static String defaultSchemaProviderClassName = 
FilebasedSchemaProvider.class.getName();
   protected static int testNum = 1;
 
   Map<String, String> hudiOpts = new HashMap<>();
+  public KafkaTestUtils testUtils;
 
-  protected static void prepareTestSetup() throws IOException {
+  @BeforeEach
+  protected void prepareTestSetup() throws IOException {
     PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
     ORC_SOURCE_ROOT = basePath + "/orcFiles";
     JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
@@ -242,16 +244,15 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   @BeforeAll
   public static void initClass() throws Exception {
     UtilitiesTestBase.initTestServices(false, true, false);
-    prepareTestSetup();
   }
 
   @AfterAll
-  public static void tearDown() {
-    cleanupKafkaTestUtils();
+  public static void tearDown() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
-  public static void cleanupKafkaTestUtils() {
+  @AfterEach
+  public void cleanupKafkaTestUtils() {
     if (testUtils != null) {
       testUtils.teardown();
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 449cf2bb98a..4bfbee6e0e0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1705,11 +1705,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertEquals(1000, c);
   }
 
-  private static void prepareJsonKafkaDFSFiles(int numRecords, boolean 
createTopic, String topicName) {
+  private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName) {
     prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2);
   }
 
-  private static void prepareJsonKafkaDFSFiles(int numRecords, boolean 
createTopic, String topicName, int numPartitions) {
+  private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, 
String topicName, int numPartitions) {
     if (createTopic) {
       try {
         testUtils.createTopic(topicName, numPartitions);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 6168912b003..3670c5b6007 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -136,7 +136,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
   @AfterAll
   static void teardownAll() {
     defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
-    HoodieDeltaStreamerTestBase.cleanupKafkaTestUtils();
   }
 
   protected HoodieStreamer deltaStreamer;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
index 389282ddcdb..945ce6f774a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
@@ -51,7 +51,7 @@ public class TestFilebasedSchemaProvider extends 
UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanUpUtilitiesTestServices() {
+  public static void cleanUpUtilitiesTestServices() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index f340120ca8d..b5cbf2738f6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -38,8 +38,8 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -58,20 +58,20 @@ import static org.mockito.Mockito.mock;
  */
 abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness {
   protected static final String TEST_TOPIC_PREFIX = "hoodie_test_";
-  protected static KafkaTestUtils testUtils;
 
   protected final HoodieIngestionMetrics metrics = 
mock(HoodieIngestionMetrics.class);
 
   protected SchemaProvider schemaProvider;
+  protected KafkaTestUtils testUtils;
 
-  @BeforeAll
-  public static void initClass() {
+  @BeforeEach
+  public void initClass() {
     testUtils = new KafkaTestUtils();
     testUtils.setup();
   }
 
-  @AfterAll
-  public static void cleanupClass() {
+  @AfterEach
+  public void cleanupClass() {
     testUtils.teardown();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 3daa9505538..558181f4258 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -45,8 +45,9 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -68,8 +69,6 @@ import static org.mockito.Mockito.mock;
 public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
   protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_";
 
-  protected static KafkaTestUtils testUtils;
-
   protected static HoodieTestDataGenerator dataGen;
 
   protected static String SCHEMA_PATH = "/tmp/schema_file.avsc";
@@ -78,15 +77,21 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
 
   protected SchemaProvider schemaProvider;
 
+  protected KafkaTestUtils testUtils;
+
   @BeforeAll
   public static void initClass() {
-    testUtils = new KafkaTestUtils();
     dataGen = new HoodieTestDataGenerator(0xDEED);
+  }
+
+  @BeforeEach
+  public void setup() {
+    testUtils = new KafkaTestUtils();
     testUtils.setup();
   }
 
-  @AfterAll
-  public static void cleanupClass() {
+  @AfterEach
+  public void tearDown() {
     testUtils.teardown();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
index c718e7a12e8..3f106fce994 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
@@ -28,7 +28,6 @@ import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.AnalysisException;
@@ -64,17 +63,10 @@ public class TestSqlFileBasedSource extends 
UtilitiesTestBase {
   @BeforeAll
   public static void initClass() throws Exception {
     UtilitiesTestBase.initTestServices(false, true, false);
-    FileSystem fs = UtilitiesTestBase.fs;
-    UtilitiesTestBase.Helpers.copyToDFS(
-        "streamer-config/sql-file-based-source.sql", fs,
-        UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
-    UtilitiesTestBase.Helpers.copyToDFS(
-        "streamer-config/sql-file-based-source-invalid-table.sql", fs,
-        UtilitiesTestBase.basePath + 
"/sql-file-based-source-invalid-table.sql");
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
@@ -113,7 +105,11 @@ public class TestSqlFileBasedSource extends 
UtilitiesTestBase {
    * @throws IOException
    */
   @Test
-  public void testSqlFileBasedSourceAvroFormat() {
+  public void testSqlFileBasedSourceAvroFormat() throws IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-based-source.sql", fs,
+        UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
     props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + 
"/sql-file-based-source.sql");
     sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, 
schemaProvider);
     sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -136,7 +132,11 @@ public class TestSqlFileBasedSource extends 
UtilitiesTestBase {
    * @throws IOException
    */
   @Test
-  public void testSqlFileBasedSourceRowFormat() {
+  public void testSqlFileBasedSourceRowFormat() throws IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-based-source.sql", fs,
+        UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
     props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + 
"/sql-file-based-source.sql");
     sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, 
schemaProvider);
     sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -154,7 +154,11 @@ public class TestSqlFileBasedSource extends 
UtilitiesTestBase {
    * @throws IOException
    */
   @Test
-  public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() {
+  public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() throws 
IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-based-source.sql", fs,
+        UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
     props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + 
"/sql-file-based-source.sql");
     sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, 
schemaProvider);
     sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -171,7 +175,11 @@ public class TestSqlFileBasedSource extends 
UtilitiesTestBase {
    * @throws IOException
    */
   @Test
-  public void testSqlFileBasedSourceInvalidTable() {
+  public void testSqlFileBasedSourceInvalidTable() throws IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-based-source-invalid-table.sql", fs,
+        UtilitiesTestBase.basePath + 
"/sql-file-based-source-invalid-table.sql");
+
     props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + 
"/sql-file-based-source-invalid-table.sql");
     sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, 
schemaProvider);
     sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource);
@@ -182,7 +190,11 @@ public class TestSqlFileBasedSource extends 
UtilitiesTestBase {
   }
 
   @Test
-  public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() {
+  public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() 
throws IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-based-source.sql", fs,
+        UtilitiesTestBase.basePath + "/sql-file-based-source.sql");
+
     props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + 
"/sql-file-based-source.sql");
     props.setProperty(sqlFileSourceConfigEmitChkPointConf, "true");
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index 37ab549ea76..64578f3bae3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -64,7 +64,7 @@ public class TestSqlSource extends UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index e6aa9d8862e..c9f46144e96 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -39,11 +39,14 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.IOException;
 import java.util.UUID;
 import java.util.stream.Stream;
 
@@ -57,19 +60,28 @@ public abstract class TestAbstractDebeziumSource extends 
UtilitiesTestBase {
   private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
 
   private final HoodieIngestionMetrics metrics = 
mock(HoodieIngestionMetrics.class);
-  private static KafkaTestUtils testUtils;
+  private KafkaTestUtils testUtils;
 
   @BeforeAll
   public static void initClass() throws Exception {
     UtilitiesTestBase.initTestServices();
+  }
+
+  @BeforeEach
+  public void setUpKafkaTestUtils() {
     testUtils = new KafkaTestUtils();
     testUtils.setup();
   }
 
+  @AfterEach
+  public void tearDownKafkaTestUtils() {
+    testUtils.teardown();
+    testUtils = null;
+  }
+
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
-    testUtils.teardown();
   }
 
   private TypedProperties createPropsForJsonSource() {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index e3d2ec5a602..6ad6a4c09db 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -31,8 +31,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
 import org.apache.spark.streaming.kafka010.OffsetRange;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.UUID;
@@ -49,17 +49,17 @@ import static org.mockito.Mockito.mock;
 public class TestKafkaOffsetGen {
 
   private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
-  private static KafkaTestUtils testUtils;
   private HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
+  private KafkaTestUtils testUtils;
 
-  @BeforeAll
-  public static void setup() throws Exception {
+  @BeforeEach
+  public void setup() throws Exception {
     testUtils = new KafkaTestUtils();
     testUtils.setup();
   }
 
-  @AfterAll
-  public static void teardown() throws Exception {
+  @AfterEach
+  public void teardown() throws Exception {
     testUtils.teardown();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index c7ca90b9259..89037ff507d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -163,7 +163,12 @@ public class UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanUpUtilitiesTestServices() {
+  public static void cleanUpUtilitiesTestServices() throws IOException {
+    if (fs != null) {
+      fs.delete(new Path(basePath), true);
+      fs.close();
+      fs = null;
+    }
     if (hdfsTestService != null) {
       hdfsTestService.stop();
       hdfsTestService = null;
@@ -196,6 +201,10 @@ public class UtilitiesTestBase {
   @BeforeEach
   public void setup() throws Exception {
     TestDataSource.initDataGen();
+    // This prevents test methods from using existing files or folders.
+    if (fs != null) {
+      fs.delete(new Path(basePath), true);
+    }
   }
 
   @AfterEach
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
index bdb6c85ce72..11a00ebeb2c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -58,7 +58,7 @@ public abstract class AbstractCloudObjectsSourceTestBase 
extends UtilitiesTestBa
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index b3cbe1d6108..1b0cc7f52a6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -51,22 +52,10 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
   @BeforeAll
   public static void initClass() throws Exception {
     UtilitiesTestBase.initTestServices();
-    UtilitiesTestBase.Helpers.copyToDFS(
-        "streamer-config/sql-file-transformer.sql",
-        UtilitiesTestBase.fs,
-        UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
-    UtilitiesTestBase.Helpers.copyToDFS(
-        "streamer-config/sql-file-transformer-invalid.sql",
-        UtilitiesTestBase.fs,
-        UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
-    UtilitiesTestBase.Helpers.copyToDFS(
-        "streamer-config/sql-file-transformer-empty.sql",
-        UtilitiesTestBase.fs,
-        UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanupClass() throws IOException {
     UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
@@ -106,7 +95,12 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
   }
 
   @Test
-  public void testSqlFileBasedTransformerInvalidSQL() {
+  public void testSqlFileBasedTransformerInvalidSQL() throws IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-transformer-invalid.sql",
+        UtilitiesTestBase.fs,
+        UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
+
     // Test if the SQL file based transformer works as expected for the 
invalid SQL statements.
     props.setProperty(
         "hoodie.deltastreamer.transformer.sql.file",
@@ -117,7 +111,12 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
   }
 
   @Test
-  public void testSqlFileBasedTransformerEmptyDataset() {
+  public void testSqlFileBasedTransformerEmptyDataset() throws IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-transformer-empty.sql",
+        UtilitiesTestBase.fs,
+        UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
+
     // Test if the SQL file based transformer works as expected for the empty 
SQL statements.
     props.setProperty(
         "hoodie.deltastreamer.transformer.sql.file",
@@ -129,7 +128,12 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
   }
 
   @Test
-  public void testSqlFileBasedTransformer() {
+  public void testSqlFileBasedTransformer() throws IOException {
+    UtilitiesTestBase.Helpers.copyToDFS(
+        "streamer-config/sql-file-transformer.sql",
+        UtilitiesTestBase.fs,
+        UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
+
     // Test if the SQL file based transformer works as expected for the 
correct input.
     props.setProperty(
         "hoodie.deltastreamer.transformer.sql.file",

Reply via email to