This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 686da41 [HUDI-3689] Fix UT failures in TestHoodieDeltaStreamer (#5120)
686da41 is described below
commit 686da41696e17daa182f84796cd2721b945d71b1
Author: Raymond Xu <[email protected]>
AuthorDate: Thu Mar 24 09:10:33 2022 -0700
[HUDI-3689] Fix UT failures in TestHoodieDeltaStreamer (#5120)
---
.../testutils/TestHiveSyncGlobalCommitTool.java | 133 ---------------------
.../org/apache/hudi/utilities/TestUtilHelpers.java | 97 ---------------
.../functional/TestHoodieDeltaStreamer.java | 30 ++---
.../TestHoodieMultiTableDeltaStreamer.java | 8 +-
pom.xml | 4 +-
5 files changed, 21 insertions(+), 251 deletions(-)
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
deleted file mode 100644
index 980374e..0000000
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.hive.testutils;
-
-import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
-import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
-
-import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-
-import static
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
-import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
-import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
-import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
-import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
-import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
-import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestHiveSyncGlobalCommitTool {
-
- TestCluster localCluster;
- TestCluster remoteCluster;
-
- private static String DB_NAME = "foo";
- private static String TBL_NAME = "bar";
-
- private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
- String commitTime, String dbName, String tblName) throws Exception {
- HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
- config.properties.setProperty(LOCAL_HIVE_SITE_URI,
localCluster.getHiveSiteXmlLocation());
- config.properties.setProperty(REMOTE_HIVE_SITE_URI,
remoteCluster.getHiveSiteXmlLocation());
- config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS,
localCluster.getHiveJdBcUrl());
- config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS,
remoteCluster.getHiveJdBcUrl());
- config.properties.setProperty(LOCAL_BASE_PATH,
localCluster.tablePath(dbName, tblName));
- config.properties.setProperty(REMOTE_BASE_PATH,
remoteCluster.tablePath(dbName, tblName));
- config.globallyReplicatedTimeStamp = commitTime;
- config.hiveUser = System.getProperty("user.name");
- config.hivePass = "";
- config.databaseName = dbName;
- config.tableName = tblName;
- config.basePath = localCluster.tablePath(dbName, tblName);
- config.assumeDatePartitioning = true;
- config.usePreApacheInputFormat = false;
- config.partitionFields = Collections.singletonList("datestr");
- return config;
- }
-
- private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig
config) throws Exception {
- assertEquals(localCluster.getHMSClient().getTable(config.databaseName,
config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
- remoteCluster.getHMSClient().getTable(config.databaseName,
config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
- "compare replicated timestamps");
- }
-
- @BeforeEach
- public void setUp() throws Exception {
- localCluster = new TestCluster();
- localCluster.setup();
- remoteCluster = new TestCluster();
- remoteCluster.setup();
- localCluster.forceCreateDb(DB_NAME);
- remoteCluster.forceCreateDb(DB_NAME);
- localCluster.dfsCluster.getFileSystem().delete(new
Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
- remoteCluster.dfsCluster.getFileSystem().delete(new
Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
- }
-
- @AfterEach
- public void clear() throws Exception {
- localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
- remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
- localCluster.shutDown();
- remoteCluster.shutDown();
- }
-
- @Test
- public void testBasicGlobalCommit() throws Exception {
- String commitTime = "100";
- localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
- // simulate drs
- remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
- HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime,
DB_NAME, TBL_NAME);
- HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
- assertTrue(tool.commit());
- compareEqualLastReplicatedTimeStamp(config);
- }
-
- @Test
- public void testBasicRollback() throws Exception {
- String commitTime = "100";
- localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
- // simulate drs
- remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
- HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime,
DB_NAME, TBL_NAME);
- HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
- assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
- assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
- // stop the remote cluster hive server to simulate cluster going down
- remoteCluster.stopHiveServer2();
- assertFalse(tool.commit());
- assertEquals(commitTime, localCluster.getHMSClient()
- .getTable(config.databaseName, config.tableName).getParameters()
- .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
- assertTrue(tool.rollback()); // do a rollback
- assertNotEquals(commitTime, localCluster.getHMSClient()
- .getTable(config.databaseName, config.tableName).getParameters()
- .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
- assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
- remoteCluster.startHiveServer2();
- }
-}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
deleted file mode 100644
index 45ffa1f..0000000
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestUtilHelpers.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.utilities.transform.ChainedTransformer;
-import org.apache.hudi.utilities.transform.Transformer;
-
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestUtilHelpers {
-
- public static class TransformerFoo implements Transformer {
-
- @Override
- public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
- return null;
- }
- }
-
- public static class TransformerBar implements Transformer {
-
- @Override
- public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
- return null;
- }
- }
-
- @Nested
- public class TestCreateTransformer {
-
- @Test
- public void testCreateTransformerNotPresent() throws IOException {
- assertFalse(UtilHelpers.createTransformer(null).isPresent());
- }
-
- @Test
- public void testCreateTransformerLoadOneClass() throws IOException {
- Transformer transformer =
UtilHelpers.createTransformer(Collections.singletonList(TransformerFoo.class.getName())).get();
- assertTrue(transformer instanceof ChainedTransformer);
- List<String> transformerNames = ((ChainedTransformer)
transformer).getTransformersNames();
- assertEquals(1, transformerNames.size());
- assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
- }
-
- @Test
- public void testCreateTransformerLoadMultipleClasses() throws IOException {
- List<String> classNames = Arrays.asList(TransformerFoo.class.getName(),
TransformerBar.class.getName());
- Transformer transformer =
UtilHelpers.createTransformer(classNames).get();
- assertTrue(transformer instanceof ChainedTransformer);
- List<String> transformerNames = ((ChainedTransformer)
transformer).getTransformersNames();
- assertEquals(2, transformerNames.size());
- assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
- assertEquals(TransformerBar.class.getName(), transformerNames.get(1));
- }
-
- @Test
- public void testCreateTransformerThrowsException() throws IOException {
- Exception e = assertThrows(IOException.class, () -> {
- UtilHelpers.createTransformer(Arrays.asList("foo", "bar"));
- });
- assertEquals("Could not load transformer class(es) [foo, bar]",
e.getMessage());
- }
- }
-}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 202d0b7..2a57716 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1378,7 +1378,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertEquals(1000, c);
}
- private static void prepareJsonKafkaDFSFiles(int numRecords, boolean
createTopic, String topicName) throws IOException {
+ private static void prepareJsonKafkaDFSFiles(int numRecords, boolean
createTopic, String topicName) {
if (createTopic) {
try {
testUtils.createTopic(topicName, 2);
@@ -1491,7 +1491,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
- props.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
+ props.setProperty("hoodie.datasource.write.partitionpath.field", "");
props.setProperty("hoodie.deltastreamer.source.dfs.root",
JSON_KAFKA_SOURCE_ROOT);
props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
@@ -1515,15 +1515,15 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA,
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false, "source_uber.avsc",
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
- PARQUET_SOURCE_ROOT, false);
+ PARQUET_SOURCE_ROOT, false, "");
// delta streamer w/ parquet source
String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
- Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
+ Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
- TestHelpers.assertRecordCount(parquetRecords, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(parquetRecords, tableBasePath, sqlContext);
deltaStreamer.shutdownGracefully();
// prep json kafka source
@@ -1533,18 +1533,18 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
// delta streamer w/ json kafka source
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
- Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
// if auto reset value is set to LATEST, this all kafka records so far may
not be synced.
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 :
JSON_KAFKA_NUM_RECORDS);
- TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath,
sqlContext);
// verify 2nd batch to test LATEST auto reset value.
prepareJsonKafkaDFSFiles(20, false, topicName);
totalExpectedRecords += 20;
deltaStreamer.sync();
- TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath,
sqlContext);
testNum++;
}
@@ -1556,17 +1556,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
- Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
- TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath,
sqlContext);
int totalRecords = JSON_KAFKA_NUM_RECORDS;
int records = 10;
totalRecords += records;
prepareJsonKafkaDFSFiles(records, false, topicName);
deltaStreamer.sync();
- TestHelpers.assertRecordCount(totalRecords, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
}
@Test
@@ -1578,20 +1578,20 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
- Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null,
null, "timestamp", String.valueOf(System.currentTimeMillis())),
jsc);
deltaStreamer.sync();
- TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath,
sqlContext);
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
JsonKafkaSource.class.getName(),
- Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
+ Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null,
"timestamp", String.valueOf(System.currentTimeMillis())), jsc);
deltaStreamer.sync();
- TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath,
sqlContext);
}
@Test
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index 416f2c5..f80d38a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -161,8 +161,8 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
String targetBasePath2 =
executionContexts.get(1).getConfig().targetBasePath;
streamer.sync();
- TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 +
"/*/*.parquet", sqlContext);
- TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2
+ "/*/*.parquet", sqlContext);
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1,
sqlContext);
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2,
sqlContext);
//insert updates for already existing records in kafka topics
testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
@@ -177,8 +177,8 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
assertTrue(streamer.getFailedTables().isEmpty());
//assert the record count matches now
- TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 +
"/*/*.parquet", sqlContext);
- TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2
+ "/*/*.parquet", sqlContext);
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1,
sqlContext);
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2,
sqlContext);
testNum++;
}
diff --git a/pom.xml b/pom.xml
index c61d5ef..ebd2a76 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,8 +73,8 @@
<properties>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
- <maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
- <maven-failsafe-plugin.version>3.0.0-M4</maven-failsafe-plugin.version>
+ <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
+ <maven-failsafe-plugin.version>3.0.0-M5</maven-failsafe-plugin.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>