This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7f67e6abd82fb532163cde33c62ba642f3d0b65a Author: Sivabalan Narayanan <[email protected]> AuthorDate: Thu Jan 26 16:40:04 2023 -0800 [HUDI-5592] Fixing some of the flaky tests in CI (#7720) Recently we have more flakiness in our CI runs. So, taking a stab at fixing some of the high frequent tests. Tests that are fixed: TestHoodieClientOnMergeOnReadStorage ( testReadingMORTableWithoutBaseFile, testCompactionOnMORTable, testLogCompactionOnMORTable, testLogCompactionOnMORTableWithoutBaseFile) Reasoning for flakiness: we generate only 10 inserts in our tests and it does not guarantee we have records for all 3 partitions(HoodieTestDataGenerator). Fixes: HoodieTestDataGenerator was choosing random partition among list of partitions while generating insert records. Fixed that to do round robin. Also, bumped up the num of records inserted in some of the flaky tests to 100 from 10. Fixed respective MOR tests to disable small file handling. --- .../TestHoodieClientOnMergeOnReadStorage.java | 69 +++++++++++----------- .../common/testutils/HoodieTestDataGenerator.java | 9 ++- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java index c623e13bddb..6e2257dcd91 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java @@ -85,27 +85,27 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { // Do insert and updates thrice one after the other. // Insert String commitTime = HoodieActiveTimeline.createNewInstantTime(); - insertBatch(config, client, commitTime, "000", 10, SparkRDDWriteClient::insert, - false, false, 10, 10, 1, Option.empty()); + insertBatch(config, client, commitTime, "000", 100, SparkRDDWriteClient::insert, + false, false, 100, 100, 1, Option.empty()); // Update String commitTimeBetweenPrevAndNew = commitTime; commitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, commitTime, commitTimeBetweenPrevAndNew, - Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 5, SparkRDDWriteClient::upsert, - false, false, 5, 10, 2, config.populateMetaFields()); + Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 50, SparkRDDWriteClient::upsert, + false, false, 50, 100, 2, config.populateMetaFields()); // Delete 5 records String prevCommitTime = commitTime; commitTime = HoodieActiveTimeline.createNewInstantTime(); deleteBatch(config, client, commitTime, prevCommitTime, - "000", 2, SparkRDDWriteClient::delete, false, false, - 0, 150); + "000", 25, SparkRDDWriteClient::delete, false, false, + 0, 100); // Verify all the records. metaClient.reloadActiveTimeline(); Map<String, GenericRecord> recordMap = GenericRecordValidationTestUtils.getRecordsMap(config, hadoopConf, dataGen); - assertEquals(8, recordMap.size()); + assertEquals(75, recordMap.size()); } @Test @@ -119,15 +119,15 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { // Do insert and updates thrice one after the other. // Insert String commitTime = HoodieActiveTimeline.createNewInstantTime(); - insertBatch(config, client, commitTime, "000", 10, SparkRDDWriteClient::insert, - false, false, 10, 10, 1, Option.empty()); + insertBatch(config, client, commitTime, "000", 100, SparkRDDWriteClient::insert, + false, false, 100, 100, 1, Option.empty()); // Update String commitTimeBetweenPrevAndNew = commitTime; commitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, commitTime, commitTimeBetweenPrevAndNew, - Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 5, SparkRDDWriteClient::upsert, - false, false, 5, 10, 2, config.populateMetaFields()); + Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 50, SparkRDDWriteClient::upsert, + false, false, 5, 100, 2, config.populateMetaFields()); // Schedule and execute compaction. Option<String> timeStamp = client.scheduleCompaction(Option.empty()); @@ -149,18 +149,20 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { SparkRDDWriteClient client = getHoodieWriteClient(config); // First insert + int expectedTotalRecs = 100; String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - insertBatch(config, client, newCommitTime, "000", 10, - SparkRDDWriteClient::insert, false, false, 10, 100, + insertBatch(config, client, newCommitTime, "000", expectedTotalRecs, + SparkRDDWriteClient::insert, false, false, expectedTotalRecs, expectedTotalRecs, 1, Option.empty()); String prevCommitTime = newCommitTime; for (int i = 0; i < 5; i++) { // Upsert newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + expectedTotalRecs += 50; updateBatch(config, client, newCommitTime, prevCommitTime, - Option.of(Arrays.asList(prevCommitTime)), "000", 2, SparkRDDWriteClient::upsert, - false, false, 50, 10, i + 2, config.populateMetaFields()); + Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, + false, false, 50, expectedTotalRecs, i + 2, config.populateMetaFields()); prevCommitTime = newCommitTime; } @@ -173,9 +175,10 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { for (int i = 0; i < 2; i++) { // Upsert newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + expectedTotalRecs += 50; updateBatch(config, client, newCommitTime, prevCommitTime, - Option.of(Arrays.asList(prevCommitTime)), "000", 2, SparkRDDWriteClient::upsert, - false, false, 50, 10, i + 8, config.populateMetaFields()); + Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, + false, false, 50, expectedTotalRecs, i + 8, config.populateMetaFields()); prevCommitTime = newCommitTime; } String lastCommitBeforeLogCompaction = prevCommitTime; @@ -205,23 +208,23 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { // First insert 10 records String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - insertBatch(config, client, newCommitTime, "000", 10, - SparkRDDWriteClient::insert, false, false, 10, 10, + insertBatch(config, client, newCommitTime, "000", 100, + SparkRDDWriteClient::insert, false, false, 100, 100, 1, Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)); // Upsert 5 records String prevCommitTime = newCommitTime; newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, - Option.of(Arrays.asList(prevCommitTime)), "000", 5, SparkRDDWriteClient::upsert, - false, false, 5, 10, 2, config.populateMetaFields()); + Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, + false, false, 50, 100, 2, config.populateMetaFields()); prevCommitTime = newCommitTime; // Delete 3 records newCommitTime = HoodieActiveTimeline.createNewInstantTime(); deleteBatch(config, client, newCommitTime, prevCommitTime, - "000", 3, SparkRDDWriteClient::delete, false, false, - 0, 10); + "000", 30, SparkRDDWriteClient::delete, false, false, + 0, 70); String lastCommitBeforeLogCompaction = newCommitTime; // Schedule and execute compaction. @@ -249,7 +252,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { // First insert String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); insertBatch(config, client, newCommitTime, "000", 100, - SparkRDDWriteClient::insert, false, false, 10, 100, + SparkRDDWriteClient::insert, false, false, 100, 100, 1, Option.empty()); String prevCommitTime = newCommitTime; @@ -257,7 +260,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, - false, false, 50, 10, 2, config.populateMetaFields()); + false, false, 50, 100, 2, config.populateMetaFields()); // Schedule compaction Option<String> compactionTimeStamp = client.scheduleCompaction(Option.empty()); @@ -294,7 +297,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { // First insert String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); insertBatch(config, client, newCommitTime, "000", 100, - SparkRDDWriteClient::insert, false, false, 10, 100, + SparkRDDWriteClient::insert, false, false, 100, 100, 1, Option.empty()); String prevCommitTime = newCommitTime; @@ -302,7 +305,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, - false, false, 50, 10, 2, config.populateMetaFields()); + false, false, 50, 100, 2, config.populateMetaFields()); // Schedule log compaction Option<String> logCompactionTimeStamp = client.scheduleLogCompaction(Option.empty()); @@ -329,7 +332,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { // First insert. Here first file slice gets added to file group. String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); insertBatch(config, client, newCommitTime, "000", 100, - SparkRDDWriteClient::insert, false, false, 10, 100, + SparkRDDWriteClient::insert, false, false, 100, 100, 1, Option.empty()); // Schedule and execute compaction. Here, second file slice gets added. @@ -342,7 +345,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, - false, false, 50, 10, 2, config.populateMetaFields()); + false, false, 50, 100, 2, config.populateMetaFields()); prevCommitTime = newCommitTime; // Schedule compaction. Third file slice gets added, compaction is not complete so base file is not created yet. @@ -355,7 +358,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, - false, false, 50, 10, 2, config.populateMetaFields()); + false, false, 50, 100, 2, config.populateMetaFields()); prevCommitTime = newCommitTime; if (i == 2) { // Since retain commits is 4 exactly after 6th completed commit there will be some files to be cleaned, @@ -384,7 +387,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { // First insert String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); insertBatch(config, client, newCommitTime, "000", 100, - SparkRDDWriteClient::insert, false, false, 10, 100, + SparkRDDWriteClient::insert, false, false, 100, 100, 1, Option.empty()); String prevCommitTime = newCommitTime; @@ -392,7 +395,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(prevCommitTime)), "000", 10, SparkRDDWriteClient::upsert, - false, false, 10, 10, 4, config.populateMetaFields()); + false, false, 10, 100, 4, config.populateMetaFields()); prevCommitTime = newCommitTime; // Schedule and execute log-compaction but do not commit. @@ -418,7 +421,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(prevCommitTime)), "000", 10, SparkRDDWriteClient::upsert, - false, false, 10, 10, 4, config.populateMetaFields()); + false, false, 10, 100, 4, config.populateMetaFields()); prevCommitTime = newCommitTime; // Complete log-compaction now. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index c6ab11a310f..28fb77c57ba 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -73,6 +73,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -607,8 +608,14 @@ public class HoodieTestDataGenerator implements AutoCloseable { } public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) { + AtomicInteger partitionIndex = new AtomicInteger(0); return generateInsertsStream(commitTime, n, isFlattened, schemaStr, containsAllPartitions, - () -> partitionPaths[rand.nextInt(partitionPaths.length)], + () -> { + // round robin to ensure we generate inserts for all partition paths + String partitionToUse = partitionPaths[partitionIndex.get()]; + partitionIndex.set((partitionIndex.get() + 1) % partitionPaths.length); + return partitionToUse; + }, () -> genPseudoRandomUUID(rand).toString()); }
