This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e969a4c7848 [HUDI-5592] Fixing some of the flaky tests in CI (#7720)
e969a4c7848 is described below
commit e969a4c7848fcbe6e0b4960c688e3f48b9e4a6f9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jan 26 16:40:04 2023 -0800
[HUDI-5592] Fixing some of the flaky tests in CI (#7720)
Recently we have more flakiness in our CI runs. So, taking a stab at fixing
some of the high frequent tests.
Tests that are fixed:
TestHoodieClientOnMergeOnReadStorage ( testReadingMORTableWithoutBaseFile,
testCompactionOnMORTable,
testLogCompactionOnMORTable, testLogCompactionOnMORTableWithoutBaseFile)
Reasoning for flakiness:
we generate only 10 inserts in our tests and it does not guarantee we have
records for all 3 partitions(HoodieTestDataGenerator).
Fixes:
HoodieTestDataGenerator was choosing random partition among list of
partitions while generating insert records. Fixed that to do round robin. Also,
bumped up the num of records inserted in some of the flaky tests to 100 from 10.
Fixed respective MOR tests to disable small file handling.
---
.../TestHoodieClientOnMergeOnReadStorage.java | 69 +++++++++++-----------
.../common/testutils/HoodieTestDataGenerator.java | 9 ++-
2 files changed, 44 insertions(+), 34 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index c623e13bddb..6e2257dcd91 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -85,27 +85,27 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// Do insert and updates thrice one after the other.
// Insert
String commitTime = HoodieActiveTimeline.createNewInstantTime();
- insertBatch(config, client, commitTime, "000", 10,
SparkRDDWriteClient::insert,
- false, false, 10, 10, 1, Option.empty());
+ insertBatch(config, client, commitTime, "000", 100,
SparkRDDWriteClient::insert,
+ false, false, 100, 100, 1, Option.empty());
// Update
String commitTimeBetweenPrevAndNew = commitTime;
commitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, commitTime, commitTimeBetweenPrevAndNew,
- Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 5,
SparkRDDWriteClient::upsert,
- false, false, 5, 10, 2, config.populateMetaFields());
+ Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, 100, 2, config.populateMetaFields());
// Delete 5 records
String prevCommitTime = commitTime;
commitTime = HoodieActiveTimeline.createNewInstantTime();
deleteBatch(config, client, commitTime, prevCommitTime,
- "000", 2, SparkRDDWriteClient::delete, false, false,
- 0, 150);
+ "000", 25, SparkRDDWriteClient::delete, false, false,
+ 0, 100);
// Verify all the records.
metaClient.reloadActiveTimeline();
Map<String, GenericRecord> recordMap =
GenericRecordValidationTestUtils.getRecordsMap(config, hadoopConf, dataGen);
- assertEquals(8, recordMap.size());
+ assertEquals(75, recordMap.size());
}
@Test
@@ -119,15 +119,15 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// Do insert and updates thrice one after the other.
// Insert
String commitTime = HoodieActiveTimeline.createNewInstantTime();
- insertBatch(config, client, commitTime, "000", 10,
SparkRDDWriteClient::insert,
- false, false, 10, 10, 1, Option.empty());
+ insertBatch(config, client, commitTime, "000", 100,
SparkRDDWriteClient::insert,
+ false, false, 100, 100, 1, Option.empty());
// Update
String commitTimeBetweenPrevAndNew = commitTime;
commitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, commitTime, commitTimeBetweenPrevAndNew,
- Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 5,
SparkRDDWriteClient::upsert,
- false, false, 5, 10, 2, config.populateMetaFields());
+ Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 5, 100, 2, config.populateMetaFields());
// Schedule and execute compaction.
Option<String> timeStamp = client.scheduleCompaction(Option.empty());
@@ -149,18 +149,20 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
SparkRDDWriteClient client = getHoodieWriteClient(config);
// First insert
+ int expectedTotalRecs = 100;
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- insertBatch(config, client, newCommitTime, "000", 10,
- SparkRDDWriteClient::insert, false, false, 10, 100,
+ insertBatch(config, client, newCommitTime, "000", expectedTotalRecs,
+ SparkRDDWriteClient::insert, false, false, expectedTotalRecs,
expectedTotalRecs,
1, Option.empty());
String prevCommitTime = newCommitTime;
for (int i = 0; i < 5; i++) {
// Upsert
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ expectedTotalRecs += 50;
updateBatch(config, client, newCommitTime, prevCommitTime,
- Option.of(Arrays.asList(prevCommitTime)), "000", 2,
SparkRDDWriteClient::upsert,
- false, false, 50, 10, i + 2, config.populateMetaFields());
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, i + 2,
config.populateMetaFields());
prevCommitTime = newCommitTime;
}
@@ -173,9 +175,10 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
for (int i = 0; i < 2; i++) {
// Upsert
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ expectedTotalRecs += 50;
updateBatch(config, client, newCommitTime, prevCommitTime,
- Option.of(Arrays.asList(prevCommitTime)), "000", 2,
SparkRDDWriteClient::upsert,
- false, false, 50, 10, i + 8, config.populateMetaFields());
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, expectedTotalRecs, i + 8,
config.populateMetaFields());
prevCommitTime = newCommitTime;
}
String lastCommitBeforeLogCompaction = prevCommitTime;
@@ -205,23 +208,23 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// First insert 10 records
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
- insertBatch(config, client, newCommitTime, "000", 10,
- SparkRDDWriteClient::insert, false, false, 10, 10,
+ insertBatch(config, client, newCommitTime, "000", 100,
+ SparkRDDWriteClient::insert, false, false, 100, 100,
1, Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH));
// Upsert 5 records
String prevCommitTime = newCommitTime;
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, newCommitTime, prevCommitTime,
- Option.of(Arrays.asList(prevCommitTime)), "000", 5,
SparkRDDWriteClient::upsert,
- false, false, 5, 10, 2, config.populateMetaFields());
+ Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
+ false, false, 50, 100, 2, config.populateMetaFields());
prevCommitTime = newCommitTime;
// Delete 3 records
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
deleteBatch(config, client, newCommitTime, prevCommitTime,
- "000", 3, SparkRDDWriteClient::delete, false, false,
- 0, 10);
+ "000", 30, SparkRDDWriteClient::delete, false, false,
+ 0, 70);
String lastCommitBeforeLogCompaction = newCommitTime;
// Schedule and execute compaction.
@@ -249,7 +252,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// First insert
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
insertBatch(config, client, newCommitTime, "000", 100,
- SparkRDDWriteClient::insert, false, false, 10, 100,
+ SparkRDDWriteClient::insert, false, false, 100, 100,
1, Option.empty());
String prevCommitTime = newCommitTime;
@@ -257,7 +260,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
- false, false, 50, 10, 2, config.populateMetaFields());
+ false, false, 50, 100, 2, config.populateMetaFields());
// Schedule compaction
Option<String> compactionTimeStamp =
client.scheduleCompaction(Option.empty());
@@ -294,7 +297,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// First insert
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
insertBatch(config, client, newCommitTime, "000", 100,
- SparkRDDWriteClient::insert, false, false, 10, 100,
+ SparkRDDWriteClient::insert, false, false, 100, 100,
1, Option.empty());
String prevCommitTime = newCommitTime;
@@ -302,7 +305,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
- false, false, 50, 10, 2, config.populateMetaFields());
+ false, false, 50, 100, 2, config.populateMetaFields());
// Schedule log compaction
Option<String> logCompactionTimeStamp =
client.scheduleLogCompaction(Option.empty());
@@ -329,7 +332,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// First insert. Here first file slice gets added to file group.
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
insertBatch(config, client, newCommitTime, "000", 100,
- SparkRDDWriteClient::insert, false, false, 10, 100,
+ SparkRDDWriteClient::insert, false, false, 100, 100,
1, Option.empty());
// Schedule and execute compaction. Here, second file slice gets added.
@@ -342,7 +345,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
- false, false, 50, 10, 2, config.populateMetaFields());
+ false, false, 50, 100, 2, config.populateMetaFields());
prevCommitTime = newCommitTime;
// Schedule compaction. Third file slice gets added, compaction is not
complete so base file is not created yet.
@@ -355,7 +358,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(prevCommitTime)), "000", 50,
SparkRDDWriteClient::upsert,
- false, false, 50, 10, 2, config.populateMetaFields());
+ false, false, 50, 100, 2, config.populateMetaFields());
prevCommitTime = newCommitTime;
if (i == 2) {
// Since retain commits is 4 exactly after 6th completed commit there
will be some files to be cleaned,
@@ -384,7 +387,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// First insert
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
insertBatch(config, client, newCommitTime, "000", 100,
- SparkRDDWriteClient::insert, false, false, 10, 100,
+ SparkRDDWriteClient::insert, false, false, 100, 100,
1, Option.empty());
String prevCommitTime = newCommitTime;
@@ -392,7 +395,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(prevCommitTime)), "000", 10,
SparkRDDWriteClient::upsert,
- false, false, 10, 10, 4, config.populateMetaFields());
+ false, false, 10, 100, 4, config.populateMetaFields());
prevCommitTime = newCommitTime;
// Schedule and execute log-compaction but do not commit.
@@ -418,7 +421,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
updateBatch(config, client, newCommitTime, prevCommitTime,
Option.of(Arrays.asList(prevCommitTime)), "000", 10,
SparkRDDWriteClient::upsert,
- false, false, 10, 10, 4, config.populateMetaFields());
+ false, false, 10, 100, 4, config.populateMetaFields());
prevCommitTime = newCommitTime;
// Complete log-compaction now.
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index c6ab11a310f..28fb77c57ba 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -73,6 +73,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -607,8 +608,14 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
}
public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer
n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
+ AtomicInteger partitionIndex = new AtomicInteger(0);
return generateInsertsStream(commitTime, n, isFlattened, schemaStr,
containsAllPartitions,
- () -> partitionPaths[rand.nextInt(partitionPaths.length)],
+ () -> {
+ // round robin to ensure we generate inserts for all partition paths
+ String partitionToUse = partitionPaths[partitionIndex.get()];
+ partitionIndex.set((partitionIndex.get() + 1) %
partitionPaths.length);
+ return partitionToUse;
+ },
() -> genPseudoRandomUUID(rand).toString());
}