This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.14.1-spark35-scala213 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8aebd827d2614cb8d874ce29d77bb7764fb31dab Author: Lin Liu <[email protected]> AuthorDate: Thu Feb 15 16:38:29 2024 -0800 [MINOR] Fix zookeeper session expiration bug (#10671) --- .../TestDFSHoodieTestSuiteWriterAdapter.java | 2 +- .../integ/testsuite/TestFileDeltaInputWriter.java | 2 +- .../testsuite/job/TestHoodieTestSuiteJob.java | 3 +- .../reader/TestDFSAvroDeltaInputReader.java | 2 +- .../reader/TestDFSHoodieDatasetInputReader.java | 3 +- .../callback/TestKafkaCallbackProvider.java | 17 +++++++-- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 14 +++++--- .../deltastreamer/TestHoodieDeltaStreamer.java | 4 +-- ...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 1 - .../schema/TestFilebasedSchemaProvider.java | 2 +- .../utilities/sources/BaseTestKafkaSource.java | 14 ++++---- .../utilities/sources/TestAvroKafkaSource.java | 17 +++++---- .../utilities/sources/TestSqlFileBasedSource.java | 40 ++++++++++++++-------- .../hudi/utilities/sources/TestSqlSource.java | 2 +- .../debezium/TestAbstractDebeziumSource.java | 18 ++++++++-- .../sources/helpers/TestKafkaOffsetGen.java | 14 ++++---- .../utilities/testutils/UtilitiesTestBase.java | 11 +++++- .../AbstractCloudObjectsSourceTestBase.java | 2 +- .../transform/TestSqlFileBasedTransformer.java | 36 ++++++++++--------- 19 files changed, 132 insertions(+), 72 deletions(-) diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java index 0c0e920305d..bac3be2e956 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java @@ -69,7 +69,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java index f2d582ca806..fe0167dae8d 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java @@ -62,7 +62,7 @@ public class TestFileDeltaInputWriter extends UtilitiesTestBase { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java index 087ffb8e400..9a4a2eee619 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java @@ -49,6 +49,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.UUID; import java.util.stream.Stream; @@ -134,7 +135,7 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java index 0bc1044fd4c..4e739da4188 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java @@ -47,7 +47,7 @@ public class TestDFSAvroDeltaInputReader extends UtilitiesTestBase { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java index 3a11de9f0b5..40e1f58698d 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.HashSet; import java.util.List; @@ -55,7 +56,7 @@ public class TestDFSHoodieDatasetInputReader extends UtilitiesTestBase { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java index 70897aecf30..e2c3c86cd5b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java @@ -30,9 +30,12 @@ import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackCo import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.List; import java.util.UUID; @@ -43,19 +46,27 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public class TestKafkaCallbackProvider extends UtilitiesTestBase { private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); - private static KafkaTestUtils testUtils; + private KafkaTestUtils testUtils; @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(); + } + + @BeforeEach + public void setup() { testUtils = new KafkaTestUtils(); testUtils.setup(); } + @AfterEach + public void tearDown() { + testUtils.teardown(); + } + @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); - testUtils.teardown(); } @Test diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 87f875642be..bc92481f966 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -51,6 +51,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; @@ -129,14 +130,15 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { static final String HOODIE_CONF_PARAM = "--hoodie-conf"; static final String HOODIE_CONF_VALUE1 = "hoodie.datasource.hive_sync.table=test_table"; static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; - public static KafkaTestUtils testUtils; protected static String topicName; protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); protected static int testNum = 1; Map<String, String> hudiOpts = new HashMap<>(); + public KafkaTestUtils testUtils; - protected static void prepareTestSetup() throws IOException { + @BeforeEach + protected void prepareTestSetup() throws IOException { PARQUET_SOURCE_ROOT = basePath + "/parquetFiles"; ORC_SOURCE_ROOT = basePath + "/orcFiles"; JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles"; @@ -244,11 +246,15 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(false, true, false); - prepareTestSetup(); } @AfterAll - public static void cleanupKafkaTestUtils() { + public static void tearDown() throws IOException { + UtilitiesTestBase.cleanUpUtilitiesTestServices(); + } + + @AfterEach + public void cleanupKafkaTestUtils() { if (testUtils != null) { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 60ed1b6732a..024a08910bf 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1716,11 +1716,11 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertEquals(1000, c); } - private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) { + private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) { prepareJsonKafkaDFSFiles(numRecords, createTopic, topicName, 2); } - private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) { + private void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName, int numPartitions) { if (createTopic) { try { testUtils.createTopic(topicName, numPartitions); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java index 87dc5b89da0..9b0acfb6c9f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java @@ -114,7 +114,6 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase extends HoodieDeltaStrea @AfterAll static void teardownAll() { defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); - HoodieDeltaStreamerTestBase.cleanupKafkaTestUtils(); } protected HoodieStreamer deltaStreamer; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java index 389282ddcdb..945ce6f774a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java @@ -51,7 +51,7 @@ public class TestFilebasedSchemaProvider extends UtilitiesTestBase { } @AfterAll - public static void cleanUpUtilitiesTestServices() { + public static void cleanUpUtilitiesTestServices() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java index f340120ca8d..b5cbf2738f6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java @@ -38,8 +38,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -58,20 +58,20 @@ import static org.mockito.Mockito.mock; */ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness { protected static final String TEST_TOPIC_PREFIX = "hoodie_test_"; - protected static KafkaTestUtils testUtils; protected final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); protected SchemaProvider schemaProvider; + protected KafkaTestUtils testUtils; - @BeforeAll - public static void initClass() { + @BeforeEach + public void initClass() { testUtils = new KafkaTestUtils(); testUtils.setup(); } - @AfterAll - public static void cleanupClass() { + @AfterEach + public void cleanupClass() { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java index 3daa9505538..558181f4258 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java @@ -45,8 +45,9 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -68,8 +69,6 @@ import static org.mockito.Mockito.mock; public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { protected static final String TEST_TOPIC_PREFIX = "hoodie_avro_test_"; - protected static KafkaTestUtils testUtils; - protected static HoodieTestDataGenerator dataGen; protected static String SCHEMA_PATH = "/tmp/schema_file.avsc"; @@ -78,15 +77,21 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { protected SchemaProvider schemaProvider; + protected KafkaTestUtils testUtils; + @BeforeAll public static void initClass() { - testUtils = new KafkaTestUtils(); dataGen = new HoodieTestDataGenerator(0xDEED); + } + + @BeforeEach + public void setup() { + testUtils = new KafkaTestUtils(); testUtils.setup(); } - @AfterAll - public static void cleanupClass() { + @AfterEach + public void tearDown() { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java index c718e7a12e8..3f106fce994 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java @@ -28,7 +28,6 @@ import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.AnalysisException; @@ -64,17 +63,10 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase { @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(false, true, false); - FileSystem fs = UtilitiesTestBase.fs; - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-based-source.sql", fs, - UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-based-source-invalid-table.sql", fs, - UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql"); } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } @@ -113,7 +105,11 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase { * @throws IOException */ @Test - public void testSqlFileBasedSourceAvroFormat() { + public void testSqlFileBasedSourceAvroFormat() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -136,7 +132,11 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase { * @throws IOException */ @Test - public void testSqlFileBasedSourceRowFormat() { + public void testSqlFileBasedSourceRowFormat() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -154,7 +154,11 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase { * @throws IOException */ @Test - public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() { + public void testSqlFileBasedSourceMoreRecordsThanSourceLimit() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -171,7 +175,11 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase { * @throws IOException */ @Test - public void testSqlFileBasedSourceInvalidTable() { + public void testSqlFileBasedSourceInvalidTable() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source-invalid-table.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source-invalid-table.sql"); sqlFileSource = new SqlFileBasedSource(props, jsc, sparkSession, schemaProvider); sourceFormatAdapter = new SourceFormatAdapter(sqlFileSource); @@ -182,7 +190,11 @@ public class TestSqlFileBasedSource extends UtilitiesTestBase { } @Test - public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() { + public void shouldSetCheckpointForSqlFileBasedSourceWithEpochCheckpoint() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-based-source.sql", fs, + UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); + props.setProperty(sqlFileSourceConfig, UtilitiesTestBase.basePath + "/sql-file-based-source.sql"); props.setProperty(sqlFileSourceConfigEmitChkPointConf, "true"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java index 37ab549ea76..64578f3bae3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java @@ -64,7 +64,7 @@ public class TestSqlSource extends UtilitiesTestBase { } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java index e6aa9d8862e..c9f46144e96 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java @@ -39,11 +39,14 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.UUID; import java.util.stream.Stream; @@ -57,19 +60,28 @@ public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase { private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); private final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); - private static KafkaTestUtils testUtils; + private KafkaTestUtils testUtils; @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(); + } + + @BeforeEach + public void setUpKafkaTestUtils() { testUtils = new KafkaTestUtils(); testUtils.setup(); } + @AfterEach + public void tearDownKafkaTestUtils() { + testUtils.teardown(); + testUtils = null; + } + @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); - testUtils.teardown(); } private TypedProperties createPropsForJsonSource() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index e3d2ec5a602..6ad6a4c09db 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -31,8 +31,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.apache.spark.streaming.kafka010.OffsetRange; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.UUID; @@ -49,17 +49,17 @@ import static org.mockito.Mockito.mock; public class TestKafkaOffsetGen { private final String testTopicName = "hoodie_test_" + UUID.randomUUID(); - private static KafkaTestUtils testUtils; private HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class); + private KafkaTestUtils testUtils; - @BeforeAll - public static void setup() throws Exception { + @BeforeEach + public void setup() throws Exception { testUtils = new KafkaTestUtils(); testUtils.setup(); } - @AfterAll - public static void teardown() throws Exception { + @AfterEach + public void teardown() throws Exception { testUtils.teardown(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 24f645c404a..0406ccddc4a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -164,7 +164,12 @@ public class UtilitiesTestBase { } @AfterAll - public static void cleanUpUtilitiesTestServices() { + public static void cleanUpUtilitiesTestServices() throws IOException { + if (fs != null) { + fs.delete(new Path(basePath), true); + fs.close(); + fs = null; + } if (hdfsTestService != null) { hdfsTestService.stop(); hdfsTestService = null; @@ -197,6 +202,10 @@ public class UtilitiesTestBase { @BeforeEach public void setup() throws Exception { TestDataSource.initDataGen(); + // This prevents test methods from using existing files or folders. + if (fs != null) { + fs.delete(new Path(basePath), true); + } } @AfterEach diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java index bdb6c85ce72..11a00ebeb2c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java @@ -58,7 +58,7 @@ public abstract class AbstractCloudObjectsSourceTestBase extends UtilitiesTestBa } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java index b3cbe1d6108..1b0cc7f52a6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java @@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -51,22 +52,10 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { @BeforeAll public static void initClass() throws Exception { UtilitiesTestBase.initTestServices(); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-transformer.sql", - UtilitiesTestBase.fs, - UtilitiesTestBase.basePath + "/sql-file-transformer.sql"); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-transformer-invalid.sql", - UtilitiesTestBase.fs, - UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql"); - UtilitiesTestBase.Helpers.copyToDFS( - "streamer-config/sql-file-transformer-empty.sql", - UtilitiesTestBase.fs, - UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql"); } @AfterAll - public static void cleanupClass() { + public static void cleanupClass() throws IOException { UtilitiesTestBase.cleanUpUtilitiesTestServices(); } @@ -106,7 +95,12 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { } @Test - public void testSqlFileBasedTransformerInvalidSQL() { + public void testSqlFileBasedTransformerInvalidSQL() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-transformer-invalid.sql", + UtilitiesTestBase.fs, + UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql"); + // Test if the SQL file based transformer works as expected for the invalid SQL statements. props.setProperty( "hoodie.deltastreamer.transformer.sql.file", @@ -117,7 +111,12 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { } @Test - public void testSqlFileBasedTransformerEmptyDataset() { + public void testSqlFileBasedTransformerEmptyDataset() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-transformer-empty.sql", + UtilitiesTestBase.fs, + UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql"); + // Test if the SQL file based transformer works as expected for the empty SQL statements. props.setProperty( "hoodie.deltastreamer.transformer.sql.file", @@ -129,7 +128,12 @@ public class TestSqlFileBasedTransformer extends UtilitiesTestBase { } @Test - public void testSqlFileBasedTransformer() { + public void testSqlFileBasedTransformer() throws IOException { + UtilitiesTestBase.Helpers.copyToDFS( + "streamer-config/sql-file-transformer.sql", + UtilitiesTestBase.fs, + UtilitiesTestBase.basePath + "/sql-file-transformer.sql"); + // Test if the SQL file based transformer works as expected for the correct input. props.setProperty( "hoodie.deltastreamer.transformer.sql.file",
