This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c48a2a1 [HUDI-2527] Multi writer test with conflicting async table
services (#4046)
c48a2a1 is described below
commit c48a2a125a9d41f4fb4bf7d6a1d01bf75ba1877a
Author: Manoj Govindassamy <[email protected]>
AuthorDate: Fri Dec 10 17:01:19 2021 -0800
[HUDI-2527] Multi writer test with conflicting async table services (#4046)
---
.../hudi/client/TestHoodieClientMultiWriter.java | 144 +++++++++++++--------
1 file changed, 90 insertions(+), 54 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 27f83bd..6dd9ff1 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -37,11 +38,8 @@ import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;
-
-import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -53,12 +51,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -132,7 +134,7 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
}
}
- @Disabled
+ @Test
public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws
Exception {
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
}
@@ -202,6 +204,21 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
});
}
+ /**
+ * Count down the latch and await for all the needed threads to join.
+ *
+ * @param latch - Count down latch
+ * @param waitTimeMillis - Max wait time in millis for waiting
+ */
+ private void latchCountDownAndWait(CountDownLatch latch, long
waitTimeMillis) {
+ latch.countDown();
+ try {
+ latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ //
+ }
+ }
+
private void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType)
throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
@@ -238,82 +255,101 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
validInstants.add("002");
validInstants.add("003");
- ExecutorService executors = Executors.newFixedThreadPool(2);
- // write config with clustering enabled
- HoodieWriteConfig cfg2 = writeConfigBuilder
-
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
+
+ // Three clients running actions in parallel
+ final int threadCount = 3;
+ final CountDownLatch scheduleCountDownLatch = new
CountDownLatch(threadCount);
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+
+ // Write config with clustering enabled
+ final HoodieWriteConfig cfg2 = writeConfigBuilder
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(true)
+ .withInlineClusteringNumCommits(1)
+ .build())
.build();
- SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
- SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+ final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
+
// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
- String newCommitTime = "004";
- int numRecords = 100;
- String commitTimeBetweenPrevAndNew = "002";
- try {
- createCommitWithUpserts(cfg2, client1, "003",
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- fail("Conflicts not handled correctly");
- }
- validInstants.add("004");
- } catch (Exception e1) {
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- assertTrue(e1 instanceof HoodieWriteConflictException);
- }
+ final String newCommitTime = "004";
+ final int numRecords = 100;
+ final String commitTimeBetweenPrevAndNew = "002";
+
+ // We want the upsert to go through only after the compaction
+ // and cleaning schedule completion. So, waiting on latch here.
+ latchCountDownAndWait(scheduleCountDownLatch, 30000);
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ // Since the compaction already went in, this upsert has
+ // to fail
+ assertThrows(IllegalArgumentException.class, () -> {
+ createCommitWithUpserts(cfg, client1, "003",
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
+ });
+ } else {
+ // We don't have the compaction for COW and so this upsert
+ // has to pass
+ assertDoesNotThrow(() -> {
+ createCommitWithUpserts(cfg, client1, "003",
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
+ });
+ validInstants.add(newCommitTime);
}
});
+
Future future2 = executors.submit(() -> {
- try {
- client2.scheduleTableService("005", Option.empty(),
TableServiceType.COMPACT);
- } catch (Exception e2) {
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- throw new RuntimeException(e2);
- }
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ assertDoesNotThrow(() -> {
+ client2.scheduleTableService("005", Option.empty(),
TableServiceType.COMPACT);
+ });
}
+ latchCountDownAndWait(scheduleCountDownLatch, 30000);
});
+
Future future3 = executors.submit(() -> {
- try {
- client2.scheduleTableService("006", Option.empty(),
TableServiceType.CLEAN);
- } catch (Exception e2) {
- throw new RuntimeException(e2);
- }
+ assertDoesNotThrow(() -> {
+ latchCountDownAndWait(scheduleCountDownLatch, 30000);
+ client3.scheduleTableService("006", Option.empty(),
TableServiceType.CLEAN);
+ });
});
future1.get();
future2.get();
future3.get();
+
+ CountDownLatch runCountDownLatch = new CountDownLatch(threadCount);
// Create inserts, run cleaning, run compaction in parallel
future1 = executors.submit(() -> {
- String newCommitTime = "007";
- int numRecords = 100;
- try {
- createCommitWithInserts(cfg2, client1, "003", newCommitTime,
numRecords);
+ final String newCommitTime = "007";
+ final int numRecords = 100;
+ latchCountDownAndWait(runCountDownLatch, 30000);
+ assertDoesNotThrow(() -> {
+ createCommitWithInserts(cfg, client1, "003", newCommitTime,
numRecords);
validInstants.add("007");
- } catch (Exception e1) {
- throw new RuntimeException(e1);
- }
+ });
});
+
future2 = executors.submit(() -> {
- try {
- JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>)
client2.compact("005");
- client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
- validInstants.add("005");
- } catch (Exception e2) {
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- throw new RuntimeException(e2);
- }
+ latchCountDownAndWait(runCountDownLatch, 30000);
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ assertDoesNotThrow(() -> {
+ JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>)
client2.compact("005");
+ client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
+ validInstants.add("005");
+ });
}
});
+
future3 = executors.submit(() -> {
- try {
- client2.clean("006", false);
+ latchCountDownAndWait(runCountDownLatch, 30000);
+ assertDoesNotThrow(() -> {
+ client3.clean("006", false);
validInstants.add("006");
- } catch (Exception e2) {
- throw new RuntimeException(e2);
- }
+ });
});
future1.get();
future2.get();
future3.get();
+
validInstants.addAll(
metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));