codope commented on a change in pull request #5138:
URL: https://github.com/apache/hudi/pull/5138#discussion_r838933864



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
##########
@@ -404,6 +423,79 @@ public void 
testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerg
     assertThrows(HoodieException.class, () -> 
metaClient.getArchivedTimeline().reload());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testArchivalWithMultiWriters(boolean enableMetadata) throws 
Exception {
+    HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2,
+        HoodieTableType.COPY_ON_WRITE, false, 10, 209715200,
+        HoodieFailedWritesCleaningPolicy.LAZY, 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL);
+
+    final ExecutorService executors = Executors.newFixedThreadPool(2);
+    List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    IntStream.range(0, 2).forEach(index -> {
+      completableFutureList.add(CompletableFuture.supplyAsync(() -> {
+        HoodieTable table = HoodieSparkTable.create(writeConfig, context, 
metaClient);
+        try {
+          countDownLatch.await(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        metaClient.reloadActiveTimeline();
+        while 
(!metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp().endsWith("29")
+            || 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
 > 4) {
+          try {
+            //System.out.println("Archiving " + index + ", total completed 
instants " + 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants());
+            //System.out.println("Last active instant " + 
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().toString());
+            HoodieTimelineArchiver archiver = new 
HoodieTimelineArchiver(writeConfig, table);
+            archiver.archiveIfRequired(context, true);
+            // if not for below sleep, both archiving threads acquires lock in 
quick succession and does not give space for main thread
+            // to complete the write operation when metadata table is enabled.
+            if (enableMetadata) {
+              Thread.sleep(2);

Review comment:
       got it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to