manojpec commented on a change in pull request #4046:
URL: https://github.com/apache/hudi/pull/4046#discussion_r767073882



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -238,82 +255,101 @@ private void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
     createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
     validInstants.add("002");
     validInstants.add("003");
-    ExecutorService executors = Executors.newFixedThreadPool(2);
-    // write config with clustering enabled
-    HoodieWriteConfig cfg2 = writeConfigBuilder
-        
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
+
+    // Three clients running actions in parallel
+    final int threadCount = 3;
+    final CountDownLatch scheduleCountDownLatch = new 
CountDownLatch(threadCount);
+    final ExecutorService executors = 
Executors.newFixedThreadPool(threadCount);
+
+    // Write config with clustering enabled
+    final HoodieWriteConfig cfg2 = writeConfigBuilder
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(true)
+            .withInlineClusteringNumCommits(1)
+            .build())
         .build();
-    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
-    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
+
     // Create upserts, schedule cleaning, schedule compaction in parallel
     Future future1 = executors.submit(() -> {
-      String newCommitTime = "004";
-      int numRecords = 100;
-      String commitTimeBetweenPrevAndNew = "002";
-      try {
-        createCommitWithUpserts(cfg2, client1, "003", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          fail("Conflicts not handled correctly");
-        }
-        validInstants.add("004");
-      } catch (Exception e1) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          assertTrue(e1 instanceof HoodieWriteConflictException);
-        }
+      final String newCommitTime = "004";
+      final int numRecords = 100;
+      final String commitTimeBetweenPrevAndNew = "002";
+
+      // We want the upsert to go through only after the compaction
+      // and cleaning schedule completion. So, waiting on latch here.
+      latchCountDownAndWait(scheduleCountDownLatch, 30000);

Review comment:
       By having the count as 3, we can make sure all the clients have been 
successfully scheduled at ready to start to next set of actions in parallel. 
Your suggested approach of 1 will also work here (but without they doing 
countdown). Having the count at 3 gives us more controlled parallelism. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
##########
@@ -238,82 +255,101 @@ private void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
     createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
     validInstants.add("002");
     validInstants.add("003");
-    ExecutorService executors = Executors.newFixedThreadPool(2);
-    // write config with clustering enabled
-    HoodieWriteConfig cfg2 = writeConfigBuilder
-        
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
+
+    // Three clients running actions in parallel
+    final int threadCount = 3;
+    final CountDownLatch scheduleCountDownLatch = new 
CountDownLatch(threadCount);
+    final ExecutorService executors = 
Executors.newFixedThreadPool(threadCount);
+
+    // Write config with clustering enabled
+    final HoodieWriteConfig cfg2 = writeConfigBuilder
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withInlineClustering(true)
+            .withInlineClusteringNumCommits(1)
+            .build())
         .build();
-    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
-    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
+
     // Create upserts, schedule cleaning, schedule compaction in parallel
     Future future1 = executors.submit(() -> {
-      String newCommitTime = "004";
-      int numRecords = 100;
-      String commitTimeBetweenPrevAndNew = "002";
-      try {
-        createCommitWithUpserts(cfg2, client1, "003", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          fail("Conflicts not handled correctly");
-        }
-        validInstants.add("004");
-      } catch (Exception e1) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          assertTrue(e1 instanceof HoodieWriteConflictException);
-        }
+      final String newCommitTime = "004";
+      final int numRecords = 100;
+      final String commitTimeBetweenPrevAndNew = "002";
+
+      // We want the upsert to go through only after the compaction
+      // and cleaning schedule completion. So, waiting on latch here.
+      latchCountDownAndWait(scheduleCountDownLatch, 30000);
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        // Since the compaction already went in, this upsert has
+        // to fail
+        assertThrows(IllegalArgumentException.class, () -> {
+          createCommitWithUpserts(cfg, client1, "003", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
+        });
+      } else {
+        // We don't have the compaction for COW and so this upsert
+        // has to pass
+        assertDoesNotThrow(() -> {
+          createCommitWithUpserts(cfg, client1, "003", 
commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
+        });
+        validInstants.add(newCommitTime);
       }
     });
+
     Future future2 = executors.submit(() -> {
-      try {
-        client2.scheduleTableService("005", Option.empty(), 
TableServiceType.COMPACT);
-      } catch (Exception e2) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          throw new RuntimeException(e2);
-        }
+      if (tableType == HoodieTableType.MERGE_ON_READ) {
+        assertDoesNotThrow(() -> {
+          client2.scheduleTableService("005", Option.empty(), 
TableServiceType.COMPACT);
+        });
       }
+      latchCountDownAndWait(scheduleCountDownLatch, 30000);
     });
+
     Future future3 = executors.submit(() -> {
-      try {
-        client2.scheduleTableService("006", Option.empty(), 
TableServiceType.CLEAN);
-      } catch (Exception e2) {
-        throw new RuntimeException(e2);
-      }
+      assertDoesNotThrow(() -> {
+        latchCountDownAndWait(scheduleCountDownLatch, 30000);
+        client3.scheduleTableService("006", Option.empty(), 
TableServiceType.CLEAN);
+      });
     });
     future1.get();
     future2.get();
     future3.get();
+
+    CountDownLatch runCountDownLatch = new CountDownLatch(threadCount);
     // Create inserts, run cleaning, run compaction in parallel
     future1 = executors.submit(() -> {
-      String newCommitTime = "007";
-      int numRecords = 100;
-      try {
-        createCommitWithInserts(cfg2, client1, "003", newCommitTime, 
numRecords);
+      final String newCommitTime = "007";
+      final int numRecords = 100;
+      latchCountDownAndWait(runCountDownLatch, 30000);
+      assertDoesNotThrow(() -> {
+        createCommitWithInserts(cfg, client1, "003", newCommitTime, 
numRecords);
         validInstants.add("007");
-      } catch (Exception e1) {
-        throw new RuntimeException(e1);
-      }
+      });
     });
+
     future2 = executors.submit(() -> {
-      try {
-        JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) 
client2.compact("005");
-        client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
-        validInstants.add("005");
-      } catch (Exception e2) {
-        if (tableType == HoodieTableType.MERGE_ON_READ) {
-          throw new RuntimeException(e2);
-        }
+      latchCountDownAndWait(runCountDownLatch, 30000);
+      if (tableType == HoodieTableType.MERGE_ON_READ) {

Review comment:
       Executor submit is not same as thread started running the operation. By 
having the count down latch inside the block we are sure all three threads are 
ready for the operations and are running in parallel. This is a more stricter 
case than without latch. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to