This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8130415b457 [HUDI-5476] Fix utilities test failure with zookeeper 
(#7560)
8130415b457 is described below

commit 8130415b457c9a43ad8555664d24140e5d50aef6
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Dec 27 22:18:40 2022 +0800

    [HUDI-5476] Fix utilities test failure with zookeeper (#7560)
---
 .../TestZookeeperBasedLockProvider.java            | 19 ++++++-
 .../TestDFSHoodieTestSuiteWriterAdapter.java       |  2 +-
 .../integ/testsuite/TestFileDeltaInputWriter.java  |  2 +-
 .../testsuite/job/TestHoodieTestSuiteJob.java      |  2 +-
 .../reader/TestDFSAvroDeltaInputReader.java        |  2 +-
 .../reader/TestDFSHoodieDatasetInputReader.java    |  2 +-
 .../apache/hudi/hive/testutils/HiveTestUtil.java   | 10 +---
 hudi-utilities/pom.xml                             |  4 ++
 .../utilities/HoodieMetadataTableValidator.java    |  8 ++-
 .../hudi/utilities/TestHiveIncrementalPuller.java  | 38 +++++++------
 .../HoodieDeltaStreamerTestBase.java               | 39 ++++++-------
 .../TestHoodieDeltaStreamer.java                   | 66 +++++++---------------
 .../TestHoodieDeltaStreamerWithMultiWriter.java    | 15 ++---
 .../TestHoodieMultiTableDeltaStreamer.java         | 19 +++----
 .../multisync/TestMultipleMetaSync.java            | 25 +-------
 .../hudi/utilities/sources/TestSqlSource.java      |  2 +-
 .../debezium/TestAbstractDebeziumSource.java       |  2 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  2 +-
 .../AbstractCloudObjectsSourceTestBase.java        |  2 +-
 .../transform/TestSqlFileBasedTransformer.java     |  2 +-
 .../short_trip_uber_config.properties              |  4 +-
 21 files changed, 110 insertions(+), 157 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
index e9ab49a2966..c80ade210d7 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
@@ -18,19 +18,22 @@
 
 package org.apache.hudi.client.transaction;
 
+import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.exception.HoodieLockException;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
-import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
-import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.exception.HoodieLockException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -75,6 +78,16 @@ public class TestZookeeperBasedLockProvider {
     lockConfiguration = new LockConfiguration(properties);
   }
 
+  @AfterAll
+  public static void tearDown() throws IOException {
+    if (server != null) {
+      server.close();
+    }
+    if (client != null) {
+      client.close();
+    }
+  }
+
   @Test
   public void testAcquireLock() {
     ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(lockConfiguration, client);
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index 9c21ee6bd44..0c0e920305d 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -70,7 +70,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends 
UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
index 36c88d02428..f2d582ca806 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
@@ -63,7 +63,7 @@ public class TestFileDeltaInputWriter extends 
UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index 2bc86f2f8d0..9dd6d9fb6f7 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -137,7 +137,7 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
index 631601fb2cd..0bc1044fd4c 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
@@ -48,7 +48,7 @@ public class TestDFSAvroDeltaInputReader extends 
UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
index cf2921d41ee..3a11de9f0b5 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
@@ -56,7 +56,7 @@ public class TestDFSHoodieDatasetInputReader extends 
UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 131fdab6db2..33c530d67ea 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -24,11 +24,11 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -154,14 +154,6 @@ public class HiveTestUtil {
     clear();
   }
 
-  public static void clearIncrementalPullSetup(String path1, String path2) 
throws IOException, HiveException, MetaException {
-    fileSystem.delete(new Path(path1), true);
-    if (path2 != null) {
-      fileSystem.delete(new Path(path2), true);
-    }
-    clear();
-  }
-
   public static void clear() throws IOException, HiveException, MetaException {
     fileSystem.delete(new Path(basePath), true);
     HoodieTableMetaClient.withPropertyBuilder()
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index ac49ba6f9fd..3811dedfeff 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -240,6 +240,10 @@
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-api</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 69e50cbdeda..17c46d14bc9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -62,7 +62,6 @@ import org.apache.hudi.utilities.util.BloomFilterData;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
-import jline.internal.Log;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -940,6 +939,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
    * verified in the {@link HoodieMetadataTableValidator}.
    */
   private static class HoodieMetadataValidationContext implements Serializable 
{
+
+    private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataValidationContext.class);
+
     private final HoodieTableMetaClient metaClient;
     private final HoodieTableFileSystemView fileSystemView;
     private final HoodieTableMetadata tableMetadata;
@@ -1050,11 +1052,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(metaClient.getHadoopConf(),
 path)) {
         bloomFilter = fileReader.readBloomFilter();
         if (bloomFilter == null) {
-          Log.error("Failed to read bloom filter for " + path);
+          LOG.error("Failed to read bloom filter for " + path);
           return Option.empty();
         }
       } catch (IOException e) {
-        Log.error("Failed to get file reader for " + path + " " + 
e.getMessage());
+        LOG.error("Failed to get file reader for " + path + " " + 
e.getMessage());
         return Option.empty();
       }
       return Option.of(BloomFilterData.builder()
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
index 1dcca13a825..d2506effd09 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
@@ -25,11 +25,11 @@ import org.apache.hudi.hive.HoodieHiveSyncClient;
 import org.apache.hudi.hive.testutils.HiveTestUtil;
 import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
 
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.time.Instant;
 
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
@@ -53,34 +52,37 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHiveIncrementalPuller {
 
-  private HiveIncrementalPuller.Config config;
-  private String targetBasePath = null;
+  private final HiveIncrementalPuller.Config config = new 
HiveIncrementalPuller.Config();
+  @TempDir
+  java.nio.file.Path tempDir;
+
+  @AfterAll
+  public static void cleanUpClass() throws Exception {
+    HiveTestUtil.shutdown();
+  }
 
   @BeforeEach
-  public void setup() throws HiveException, IOException, InterruptedException, 
MetaException {
-    config = new HiveIncrementalPuller.Config();
+  public void setUp() throws Exception {
     HiveTestUtil.setUp();
   }
 
   @AfterEach
-  public void teardown() throws Exception {
-    HiveTestUtil.clearIncrementalPullSetup(config.hoodieTmpDir, 
targetBasePath);
+  public void tearDown() throws Exception {
+    HiveTestUtil.clear();
   }
 
   @Test
   public void testInitHiveIncrementalPuller() {
-
     assertDoesNotThrow(() -> {
       new HiveIncrementalPuller(config);
     }, "Unexpected exception while initing HiveIncrementalPuller.");
-
   }
 
   private HiveIncrementalPuller.Config getHivePullerConfig(String 
incrementalSql) throws IOException {
     config.hiveJDBCUrl = hiveSyncProps.getString(HIVE_URL.key());
     config.hiveUsername = hiveSyncProps.getString(HIVE_USER.key());
     config.hivePassword = hiveSyncProps.getString(HIVE_PASS.key());
-    config.hoodieTmpDir = 
Files.createTempDirectory("hivePullerTest").toUri().toString();
+    config.hoodieTmpDir = 
tempDir.resolve("hivePullerTest").toAbsolutePath().toString();
     config.sourceDb = hiveSyncProps.getString(META_SYNC_DATABASE_NAME.key());
     config.sourceTable = hiveSyncProps.getString(META_SYNC_TABLE_NAME.key());
     config.targetDb = "tgtdb";
@@ -113,9 +115,9 @@ public class TestHiveIncrementalPuller {
 
   private void createTargetTable() throws IOException, URISyntaxException {
     String instantTime = "100";
-    targetBasePath = Files.createTempDirectory("hivesynctest1" + 
Instant.now().toEpochMilli()).toUri().toString();
+    String targetBasePath = 
tempDir.resolve("target_table").toAbsolutePath().toString();
     HiveTestUtil.createCOWTable(instantTime, 5, true,
-            targetBasePath, "tgtdb", "test2");
+        targetBasePath, "tgtdb", "test2");
     HiveSyncTool tool = new 
HiveSyncTool(getTargetHiveSyncConfig(targetBasePath), 
HiveTestUtil.getHiveConf());
     tool.syncHoodieTable();
   }
@@ -145,9 +147,9 @@ public class TestHiveIncrementalPuller {
   public void testPullerWithoutIncrementalClause() throws IOException, 
URISyntaxException {
     createTables();
     HiveIncrementalPuller puller = new 
HiveIncrementalPuller(getHivePullerConfig(
-            "select name from testdb.test1"));
+        "select name from testdb.test1"));
     Exception e = assertThrows(HoodieIncrementalPullSQLException.class, 
puller::saveDelta,
-            "Should fail when incremental clause not provided!");
+        "Should fail when incremental clause not provided!");
     assertTrue(e.getMessage().contains("Incremental SQL does not have clause 
`_hoodie_commit_time` > '%s', which means its not pulling incrementally"));
   }
 
@@ -155,9 +157,9 @@ public class TestHiveIncrementalPuller {
   public void testPullerWithoutSourceInSql() throws IOException, 
URISyntaxException {
     createTables();
     HiveIncrementalPuller puller = new 
HiveIncrementalPuller(getHivePullerConfig(
-            "select name from tgtdb.test2 where `_hoodie_commit_time` > 
'%s'"));
+        "select name from tgtdb.test2 where `_hoodie_commit_time` > '%s'"));
     Exception e = assertThrows(HoodieIncrementalPullSQLException.class, 
puller::saveDelta,
-            "Should fail when source db and table names not provided!");
+        "Should fail when source db and table names not provided!");
     assertTrue(e.getMessage().contains("Incremental SQL does not have 
testdb.test1"));
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
similarity index 96%
rename from 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
rename to 
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index d7bf48053fe..3527a95781a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -7,16 +7,17 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -37,7 +38,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 
@@ -57,7 +57,6 @@ import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
 
 public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
 
-
   static final Random RANDOM = new Random();
   static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
   static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
@@ -102,9 +101,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   protected static String defaultSchemaProviderClassName = 
FilebasedSchemaProvider.class.getName();
   protected static int testNum = 1;
 
-  @BeforeAll
-  public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, true, true);
+  protected static void prepareTestSetup() throws IOException {
     PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
     ORC_SOURCE_ROOT = basePath + "/orcFiles";
     JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
@@ -112,7 +109,6 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     testUtils.setup();
     topicName = "topic" + testNum;
     prepareInitialConfigs(fs, basePath, testUtils.brokerAddress());
-
     prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
     prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
   }
@@ -197,23 +193,22 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE);
   }
 
-  @BeforeEach
-  public void setup() throws Exception {
-    super.setup();
-    TestDataSource.returnEmptyBatch = false;
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initTestServices(false, true, false);
+    prepareTestSetup();
   }
 
   @AfterAll
-  public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+  public static void cleanupKafkaTestUtils() {
     if (testUtils != null) {
       testUtils.teardown();
     }
   }
 
-  @AfterEach
-  public void teardown() throws Exception {
-    super.teardown();
+  @BeforeEach
+  public void resetTestDataSource() {
+    TestDataSource.returnEmptyBatch = false;
   }
 
   protected static void 
populateInvalidTableConfigFilePathProps(TypedProperties props, String 
dfsBasePath) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
similarity index 99%
rename from 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
rename to 
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d911e8bacb0..60b870b6549 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -7,30 +7,22 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.HoodieSparkUtils$;
 import org.apache.hudi.HoodieSparkRecordMerger;
+import org.apache.hudi.HoodieSparkUtils$;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -73,9 +65,6 @@ import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.utilities.DummySchemaProvider;
 import org.apache.hudi.utilities.HoodieClusteringJob;
 import org.apache.hudi.utilities.HoodieIndexer;
-import org.apache.hudi.utilities.deltastreamer.DeltaSync;
-import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
-import org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
@@ -95,6 +84,16 @@ import 
org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
 import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
 import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -109,12 +108,8 @@ import org.apache.spark.sql.api.java.UDF4;
 import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -162,7 +157,6 @@ import static 
org.junit.jupiter.params.provider.Arguments.arguments;
 /**
  * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, 
upserts, inserts. Check counts at the end.
  */
-@Tag("functional")
 public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
@@ -192,26 +186,6 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
   }
 
-  @AfterAll
-  public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
-    if (testUtils != null) {
-      testUtils.teardown();
-    }
-  }
-
-  @Override
-  @BeforeEach
-  public void setup() throws Exception {
-    super.setup();
-  }
-
-  @Override
-  @AfterEach
-  public void teardown() throws Exception {
-    super.teardown();
-  }
-
   static class TestHelpers {
 
     static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, 
WriteOperationType op) {
@@ -468,7 +442,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" + 
PROPS_FILENAME_TEST_SOURCE)).getProps();
     assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
     assertEquals("_row_key", 
props.getString("hoodie.datasource.write.recordkey.field"));
-    
assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
+    
assertEquals("org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator",
         props.getString("hoodie.datasource.write.keygenerator.class"));
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
similarity index 97%
rename from 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
rename to 
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 389c687619d..fd684e95b2d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
@@ -29,7 +29,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
-import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
@@ -39,7 +38,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -61,13 +59,12 @@ import static 
org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELIS
 import static 
org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE;
 import static 
org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
-import static 
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
-import static 
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.addCommitToTimeline;
-import static 
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
-import static 
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
-import static 
org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
+import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
+import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.addCommitToTimeline;
+import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
+import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
+import static 
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
 
-@Tag("functional")
 public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctionalTestHarness {
 
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
similarity index 96%
rename from 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
rename to 
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index 8ac8b616b5c..a2b480ac1ba 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -7,23 +7,22 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
-import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
@@ -34,7 +33,6 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -45,7 +43,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@Tag("functional")
 public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
index 3db11be49da..e68aae3c53a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
@@ -21,15 +21,12 @@ package org.apache.hudi.utilities.deltastreamer.multisync;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
-import org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
 
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -45,26 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestMultipleMetaSync extends HoodieDeltaStreamerTestBase {
 
-  @AfterAll
-  public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
-    if (testUtils != null) {
-      testUtils.teardown();
-    }
-  }
-
-  @Override
-  @BeforeEach
-  public void setup() throws Exception {
-    super.setup();
-  }
-
-  @Override
-  @AfterEach
-  public void teardown() throws Exception {
-    super.teardown();
-  }
-
   @Test
   void testMultipleMetaStore() throws Exception {
     String tableBasePath = basePath + "/test_multiple_metastore";
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index 35bc7884d77..814cee24c00 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -65,7 +65,7 @@ public class TestSqlSource extends UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index 113805d24db..9139765841b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -69,7 +69,7 @@ public abstract class TestAbstractDebeziumSource extends 
UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
     testUtils.teardown();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 493953e8949..a074d22b7eb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -163,7 +163,7 @@ public class UtilitiesTestBase {
   }
 
   @AfterAll
-  public static void cleanupClass() {
+  public static void cleanUpUtilitiesTestServices() {
     if (hdfsTestService != null) {
       hdfsTestService.stop();
       hdfsTestService = null;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
index d4593f64fd2..3fced8d031c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -59,7 +59,7 @@ public abstract class AbstractCloudObjectsSourceTestBase 
extends UtilitiesTestBa
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 74dafbd2ffc..1c9152755ca 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -67,7 +67,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
 
   @AfterAll
   public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
   }
 
   @Override
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
 
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
index 75d74d6bc89..0e76676a763 100644
--- 
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
@@ -22,6 +22,6 @@ hoodie.deltastreamer.source.kafka.topic=topic2
 hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
 hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
 hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
-hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestTableLevelGenerator
 
hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
-hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
\ No newline at end of file
+hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest


Reply via email to