This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8130415b457 [HUDI-5476] Fix utilities test failure with zookeeper
(#7560)
8130415b457 is described below
commit 8130415b457c9a43ad8555664d24140e5d50aef6
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Dec 27 22:18:40 2022 +0800
[HUDI-5476] Fix utilities test failure with zookeeper (#7560)
---
.../TestZookeeperBasedLockProvider.java | 19 ++++++-
.../TestDFSHoodieTestSuiteWriterAdapter.java | 2 +-
.../integ/testsuite/TestFileDeltaInputWriter.java | 2 +-
.../testsuite/job/TestHoodieTestSuiteJob.java | 2 +-
.../reader/TestDFSAvroDeltaInputReader.java | 2 +-
.../reader/TestDFSHoodieDatasetInputReader.java | 2 +-
.../apache/hudi/hive/testutils/HiveTestUtil.java | 10 +---
hudi-utilities/pom.xml | 4 ++
.../utilities/HoodieMetadataTableValidator.java | 8 ++-
.../hudi/utilities/TestHiveIncrementalPuller.java | 38 +++++++------
.../HoodieDeltaStreamerTestBase.java | 39 ++++++-------
.../TestHoodieDeltaStreamer.java | 66 +++++++---------------
.../TestHoodieDeltaStreamerWithMultiWriter.java | 15 ++---
.../TestHoodieMultiTableDeltaStreamer.java | 19 +++----
.../multisync/TestMultipleMetaSync.java | 25 +-------
.../hudi/utilities/sources/TestSqlSource.java | 2 +-
.../debezium/TestAbstractDebeziumSource.java | 2 +-
.../utilities/testutils/UtilitiesTestBase.java | 2 +-
.../AbstractCloudObjectsSourceTestBase.java | 2 +-
.../transform/TestSqlFileBasedTransformer.java | 2 +-
.../short_trip_uber_config.properties | 4 +-
21 files changed, 110 insertions(+), 157 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
index e9ab49a2966..c80ade210d7 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
@@ -18,19 +18,22 @@
package org.apache.hudi.client.transaction;
+import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.exception.HoodieLockException;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
-import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
-import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.exception.HoodieLockException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -75,6 +78,16 @@ public class TestZookeeperBasedLockProvider {
lockConfiguration = new LockConfiguration(properties);
}
+ @AfterAll
+ public static void tearDown() throws IOException {
+ if (server != null) {
+ server.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+
@Test
public void testAcquireLock() {
ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(lockConfiguration, client);
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index 9c21ee6bd44..0c0e920305d 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -70,7 +70,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends
UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@BeforeEach
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
index 36c88d02428..f2d582ca806 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
@@ -63,7 +63,7 @@ public class TestFileDeltaInputWriter extends
UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@BeforeEach
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index 2bc86f2f8d0..9dd6d9fb6f7 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -137,7 +137,7 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@BeforeEach
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
index 631601fb2cd..0bc1044fd4c 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
@@ -48,7 +48,7 @@ public class TestDFSAvroDeltaInputReader extends
UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@BeforeEach
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
index cf2921d41ee..3a11de9f0b5 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
@@ -56,7 +56,7 @@ public class TestDFSHoodieDatasetInputReader extends
UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@BeforeEach
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 131fdab6db2..33c530d67ea 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -24,11 +24,11 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -154,14 +154,6 @@ public class HiveTestUtil {
clear();
}
- public static void clearIncrementalPullSetup(String path1, String path2)
throws IOException, HiveException, MetaException {
- fileSystem.delete(new Path(path1), true);
- if (path2 != null) {
- fileSystem.delete(new Path(path2), true);
- }
- clear();
- }
-
public static void clear() throws IOException, HiveException, MetaException {
fileSystem.delete(new Path(basePath), true);
HoodieTableMetaClient.withPropertyBuilder()
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index ac49ba6f9fd..3811dedfeff 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -240,6 +240,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 69e50cbdeda..17c46d14bc9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -62,7 +62,6 @@ import org.apache.hudi.utilities.util.BloomFilterData;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
-import jline.internal.Log;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -940,6 +939,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
* verified in the {@link HoodieMetadataTableValidator}.
*/
private static class HoodieMetadataValidationContext implements Serializable
{
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieMetadataValidationContext.class);
+
private final HoodieTableMetaClient metaClient;
private final HoodieTableFileSystemView fileSystemView;
private final HoodieTableMetadata tableMetadata;
@@ -1050,11 +1052,11 @@ public class HoodieMetadataTableValidator implements
Serializable {
try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(metaClient.getHadoopConf(),
path)) {
bloomFilter = fileReader.readBloomFilter();
if (bloomFilter == null) {
- Log.error("Failed to read bloom filter for " + path);
+ LOG.error("Failed to read bloom filter for " + path);
return Option.empty();
}
} catch (IOException e) {
- Log.error("Failed to get file reader for " + path + " " +
e.getMessage());
+ LOG.error("Failed to get file reader for " + path + " " +
e.getMessage());
return Option.empty();
}
return Option.of(BloomFilterData.builder()
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
index 1dcca13a825..d2506effd09 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
@@ -25,11 +25,11 @@ import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.FileWriter;
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.time.Instant;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
@@ -53,34 +52,37 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHiveIncrementalPuller {
- private HiveIncrementalPuller.Config config;
- private String targetBasePath = null;
+ private final HiveIncrementalPuller.Config config = new
HiveIncrementalPuller.Config();
+ @TempDir
+ java.nio.file.Path tempDir;
+
+ @AfterAll
+ public static void cleanUpClass() throws Exception {
+ HiveTestUtil.shutdown();
+ }
@BeforeEach
- public void setup() throws HiveException, IOException, InterruptedException,
MetaException {
- config = new HiveIncrementalPuller.Config();
+ public void setUp() throws Exception {
HiveTestUtil.setUp();
}
@AfterEach
- public void teardown() throws Exception {
- HiveTestUtil.clearIncrementalPullSetup(config.hoodieTmpDir,
targetBasePath);
+ public void tearDown() throws Exception {
+ HiveTestUtil.clear();
}
@Test
public void testInitHiveIncrementalPuller() {
-
assertDoesNotThrow(() -> {
new HiveIncrementalPuller(config);
}, "Unexpected exception while initing HiveIncrementalPuller.");
-
}
private HiveIncrementalPuller.Config getHivePullerConfig(String
incrementalSql) throws IOException {
config.hiveJDBCUrl = hiveSyncProps.getString(HIVE_URL.key());
config.hiveUsername = hiveSyncProps.getString(HIVE_USER.key());
config.hivePassword = hiveSyncProps.getString(HIVE_PASS.key());
- config.hoodieTmpDir =
Files.createTempDirectory("hivePullerTest").toUri().toString();
+ config.hoodieTmpDir =
tempDir.resolve("hivePullerTest").toAbsolutePath().toString();
config.sourceDb = hiveSyncProps.getString(META_SYNC_DATABASE_NAME.key());
config.sourceTable = hiveSyncProps.getString(META_SYNC_TABLE_NAME.key());
config.targetDb = "tgtdb";
@@ -113,9 +115,9 @@ public class TestHiveIncrementalPuller {
private void createTargetTable() throws IOException, URISyntaxException {
String instantTime = "100";
- targetBasePath = Files.createTempDirectory("hivesynctest1" +
Instant.now().toEpochMilli()).toUri().toString();
+ String targetBasePath =
tempDir.resolve("target_table").toAbsolutePath().toString();
HiveTestUtil.createCOWTable(instantTime, 5, true,
- targetBasePath, "tgtdb", "test2");
+ targetBasePath, "tgtdb", "test2");
HiveSyncTool tool = new
HiveSyncTool(getTargetHiveSyncConfig(targetBasePath),
HiveTestUtil.getHiveConf());
tool.syncHoodieTable();
}
@@ -145,9 +147,9 @@ public class TestHiveIncrementalPuller {
public void testPullerWithoutIncrementalClause() throws IOException,
URISyntaxException {
createTables();
HiveIncrementalPuller puller = new
HiveIncrementalPuller(getHivePullerConfig(
- "select name from testdb.test1"));
+ "select name from testdb.test1"));
Exception e = assertThrows(HoodieIncrementalPullSQLException.class,
puller::saveDelta,
- "Should fail when incremental clause not provided!");
+ "Should fail when incremental clause not provided!");
assertTrue(e.getMessage().contains("Incremental SQL does not have clause
`_hoodie_commit_time` > '%s', which means its not pulling incrementally"));
}
@@ -155,9 +157,9 @@ public class TestHiveIncrementalPuller {
public void testPullerWithoutSourceInSql() throws IOException,
URISyntaxException {
createTables();
HiveIncrementalPuller puller = new
HiveIncrementalPuller(getHivePullerConfig(
- "select name from tgtdb.test2 where `_hoodie_commit_time` >
'%s'"));
+ "select name from tgtdb.test2 where `_hoodie_commit_time` > '%s'"));
Exception e = assertThrows(HoodieIncrementalPullSQLException.class,
puller::saveDelta,
- "Should fail when source db and table names not provided!");
+ "Should fail when source db and table names not provided!");
assertTrue(e.getMessage().contains("Incremental SQL does not have
testdb.test1"));
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
similarity index 96%
rename from
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
rename to
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index d7bf48053fe..3527a95781a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -7,16 +7,17 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -37,7 +38,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -57,7 +57,6 @@ import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
-
static final Random RANDOM = new Random();
static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
@@ -102,9 +101,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
protected static String defaultSchemaProviderClassName =
FilebasedSchemaProvider.class.getName();
protected static int testNum = 1;
- @BeforeAll
- public static void initClass() throws Exception {
- UtilitiesTestBase.initTestServices(false, true, true);
+ protected static void prepareTestSetup() throws IOException {
PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
ORC_SOURCE_ROOT = basePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
@@ -112,7 +109,6 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
testUtils.setup();
topicName = "topic" + testNum;
prepareInitialConfigs(fs, basePath, testUtils.brokerAddress());
-
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
}
@@ -197,23 +193,22 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE);
}
- @BeforeEach
- public void setup() throws Exception {
- super.setup();
- TestDataSource.returnEmptyBatch = false;
+ @BeforeAll
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initTestServices(false, true, false);
+ prepareTestSetup();
}
@AfterAll
- public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ public static void cleanupKafkaTestUtils() {
if (testUtils != null) {
testUtils.teardown();
}
}
- @AfterEach
- public void teardown() throws Exception {
- super.teardown();
+ @BeforeEach
+ public void resetTestDataSource() {
+ TestDataSource.returnEmptyBatch = false;
}
protected static void
populateInvalidTableConfigFilePathProps(TypedProperties props, String
dfsBasePath) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
similarity index 99%
rename from
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
rename to
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d911e8bacb0..60b870b6549 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -7,30 +7,22 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.HoodieSparkUtils$;
import org.apache.hudi.HoodieSparkRecordMerger;
+import org.apache.hudi.HoodieSparkUtils$;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -73,9 +65,6 @@ import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
-import org.apache.hudi.utilities.deltastreamer.DeltaSync;
-import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
-import org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
@@ -95,6 +84,16 @@ import
org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -109,12 +108,8 @@ import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -162,7 +157,6 @@ import static
org.junit.jupiter.params.provider.Arguments.arguments;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts,
upserts, inserts. Check counts at the end.
*/
-@Tag("functional")
public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
private static final Logger LOG =
LogManager.getLogger(TestHoodieDeltaStreamer.class);
@@ -192,26 +186,6 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
}
- @AfterAll
- public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
- if (testUtils != null) {
- testUtils.teardown();
- }
- }
-
- @Override
- @BeforeEach
- public void setup() throws Exception {
- super.setup();
- }
-
- @Override
- @AfterEach
- public void teardown() throws Exception {
- super.teardown();
- }
-
static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath,
WriteOperationType op) {
@@ -468,7 +442,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" +
PROPS_FILENAME_TEST_SOURCE)).getProps();
assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
assertEquals("_row_key",
props.getString("hoodie.datasource.write.recordkey.field"));
-
assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
+
assertEquals("org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator",
props.getString("hoodie.datasource.write.keygenerator.class"));
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
similarity index 97%
rename from
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
rename to
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 389c687619d..fd684e95b2d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
@@ -29,7 +29,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
-import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
@@ -39,7 +38,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -61,13 +59,12 @@ import static
org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELIS
import static
org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE;
import static
org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
-import static
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
-import static
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.addCommitToTimeline;
-import static
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
-import static
org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
-import static
org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
+import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
+import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.addCommitToTimeline;
+import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
+import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
+import static
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
-@Tag("functional")
public class TestHoodieDeltaStreamerWithMultiWriter extends
SparkClientFunctionalTestHarness {
private static final Logger LOG =
LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
similarity index 96%
rename from
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
rename to
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index 8ac8b616b5c..a2b480ac1ba 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -7,23 +7,22 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.utilities.functional;
+package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
-import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
@@ -34,7 +33,6 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -45,7 +43,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@Tag("functional")
public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBase {
private static final Logger LOG =
LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
index 3db11be49da..e68aae3c53a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
@@ -21,15 +21,12 @@ package org.apache.hudi.utilities.deltastreamer.multisync;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
-import org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -45,26 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMultipleMetaSync extends HoodieDeltaStreamerTestBase {
- @AfterAll
- public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
- if (testUtils != null) {
- testUtils.teardown();
- }
- }
-
- @Override
- @BeforeEach
- public void setup() throws Exception {
- super.setup();
- }
-
- @Override
- @AfterEach
- public void teardown() throws Exception {
- super.teardown();
- }
-
@Test
void testMultipleMetaStore() throws Exception {
String tableBasePath = basePath + "/test_multiple_metastore";
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index 35bc7884d77..814cee24c00 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -65,7 +65,7 @@ public class TestSqlSource extends UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@BeforeEach
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index 113805d24db..9139765841b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -69,7 +69,7 @@ public abstract class TestAbstractDebeziumSource extends
UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
testUtils.teardown();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 493953e8949..a074d22b7eb 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -163,7 +163,7 @@ public class UtilitiesTestBase {
}
@AfterAll
- public static void cleanupClass() {
+ public static void cleanUpUtilitiesTestServices() {
if (hdfsTestService != null) {
hdfsTestService.stop();
hdfsTestService = null;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
index d4593f64fd2..3fced8d031c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -59,7 +59,7 @@ public abstract class AbstractCloudObjectsSourceTestBase
extends UtilitiesTestBa
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@BeforeEach
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 74dafbd2ffc..1c9152755ca 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -67,7 +67,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
@AfterAll
public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
+ UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@Override
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
index 75d74d6bc89..0e76676a763 100644
---
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
+++
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
@@ -22,6 +22,6 @@ hoodie.deltastreamer.source.kafka.topic=topic2
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
-hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestTableLevelGenerator
hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
-hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
\ No newline at end of file
+hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest