This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c48a2a1  [HUDI-2527] Multi writer test with conflicting async table 
services (#4046)
c48a2a1 is described below

commit c48a2a125a9d41f4fb4bf7d6a1d01bf75ba1877a
Author: Manoj Govindassamy <[email protected]>
AuthorDate: Fri Dec 10 17:01:19 2021 -0800

    [HUDI-2527] Multi writer test with conflicting async table services (#4046)
---
 .../hudi/client/TestHoodieClientMultiWriter.java   | 144 +++++++++++++--------
 1 file changed, 90 insertions(+), 54 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 27f83bd..6dd9ff1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -37,11 +38,8 @@ import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.testutils.HoodieClientTestBase;
-
-import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -53,12 +51,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -132,7 +134,7 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     }
   }
 
-  @Disabled
+  @Test
   public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws 
Exception {
     
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
   }
@@ -202,6 +204,21 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     });
   }
 
+  /**
+   * Count down the latch and await for all the needed threads to join.
+   *
+   * @param latch          - Count down latch
+   * @param waitTimeMillis - Max wait time in millis for waiting
+   */
+  private void latchCountDownAndWait(CountDownLatch latch, long 
waitTimeMillis) {
+    latch.countDown();
+    try {
+      latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      //
+    }
+  }
+
   private void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) 
throws Exception {
     // create inserts X 1
     if (tableType == HoodieTableType.MERGE_ON_READ) {
@@ -238,82 +255,101 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
     validInstants.add("002");
     validInstants.add("003");
-    ExecutorService executors = Executors.newFixedThreadPool(2);
-    // write config with clustering enabled
-    HoodieWriteConfig cfg2 = writeConfigBuilder
-        
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
+
+    // Three clients running actions in parallel
+    final int threadCount = 3;
+    final CountDownLatch scheduleCountDownLatch = new 
CountDownLatch(threadCount);
+    final ExecutorService executors = 
Executors.newFixedThreadPool(threadCount);
+
+    // Write config with clustering enabled
+    final HoodieWriteConfig cfg2 = writeConfigBuilder
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(true)
+            .withInlineClusteringNumCommits(1)
+            .build())
         .build();
-    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
-    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
+
     // Create upserts, schedule cleaning, schedule compaction in parallel
     Future future1 = executors.submit(() -> {
-      String newCommitTime = "004";
-      int numRecords = 100;
-      String commitTimeBetweenPrevAndNew = "002";
-      try {
-        createCommitWithUpserts(cfg2, client1, "003", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          fail("Conflicts not handled correctly");
-        }
-        validInstants.add("004");
-      } catch (Exception e1) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          assertTrue(e1 instanceof HoodieWriteConflictException);
-        }
+      final String newCommitTime = "004";
+      final int numRecords = 100;
+      final String commitTimeBetweenPrevAndNew = "002";
+
+      // We want the upsert to go through only after the compaction
+      // and cleaning schedule completion. So, waiting on latch here.
+      latchCountDownAndWait(scheduleCountDownLatch, 30000);
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        // Since the compaction already went in, this upsert has
+        // to fail
+        assertThrows(IllegalArgumentException.class, () -> {
+          createCommitWithUpserts(cfg, client1, "003", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
+        });
+      } else {
+        // We don't have the compaction for COW and so this upsert
+        // has to pass
+        assertDoesNotThrow(() -> {
+          createCommitWithUpserts(cfg, client1, "003", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
+        });
+        validInstants.add(newCommitTime);
       }
     });
+
     Future future2 = executors.submit(() -> {
-      try {
-        client2.scheduleTableService("005", Option.empty(), 
TableServiceType.COMPACT);
-      } catch (Exception e2) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          throw new RuntimeException(e2);
-        }
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        assertDoesNotThrow(() -> {
+          client2.scheduleTableService("005", Option.empty(), 
TableServiceType.COMPACT);
+        });
       }
+      latchCountDownAndWait(scheduleCountDownLatch, 30000);
     });
+
     Future future3 = executors.submit(() -> {
-      try {
-        client2.scheduleTableService("006", Option.empty(), 
TableServiceType.CLEAN);
-      } catch (Exception e2) {
-        throw new RuntimeException(e2);
-      }
+      assertDoesNotThrow(() -> {
+        latchCountDownAndWait(scheduleCountDownLatch, 30000);
+        client3.scheduleTableService("006", Option.empty(), 
TableServiceType.CLEAN);
+      });
     });
     future1.get();
     future2.get();
     future3.get();
+
+    CountDownLatch runCountDownLatch = new CountDownLatch(threadCount);
     // Create inserts, run cleaning, run compaction in parallel
     future1 = executors.submit(() -> {
-      String newCommitTime = "007";
-      int numRecords = 100;
-      try {
-        createCommitWithInserts(cfg2, client1, "003", newCommitTime, 
numRecords);
+      final String newCommitTime = "007";
+      final int numRecords = 100;
+      latchCountDownAndWait(runCountDownLatch, 30000);
+      assertDoesNotThrow(() -> {
+        createCommitWithInserts(cfg, client1, "003", newCommitTime, 
numRecords);
         validInstants.add("007");
-      } catch (Exception e1) {
-        throw new RuntimeException(e1);
-      }
+      });
     });
+
     future2 = executors.submit(() -> {
-      try {
-        JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) 
client2.compact("005");
-        client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
-        validInstants.add("005");
-      } catch (Exception e2) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          throw new RuntimeException(e2);
-        }
+      latchCountDownAndWait(runCountDownLatch, 30000);
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        assertDoesNotThrow(() -> {
+          JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) 
client2.compact("005");
+          client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
+          validInstants.add("005");
+        });
       }
     });
+
     future3 = executors.submit(() -> {
-      try {
-        client2.clean("006", false);
+      latchCountDownAndWait(runCountDownLatch, 30000);
+      assertDoesNotThrow(() -> {
+        client3.clean("006", false);
         validInstants.add("006");
-      } catch (Exception e2) {
-        throw new RuntimeException(e2);
-      }
+      });
     });
     future1.get();
     future2.get();
     future3.get();
+
     validInstants.addAll(
         metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
             
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));

Reply via email to