stream2000 commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1196009711


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -545,6 +546,120 @@ public void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
     assertTrue(validInstants.containsAll(completedInstants));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ", 
"COPY_ON_WRITE"})
+  public void testMultiWriterWithAsyncLazyCleanRollback(HoodieTableType 
tableType) throws Exception {
+    // create inserts X 1
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      setUpMORTestTable();
+    }
+    // Disabling embedded timeline server, it doesn't work with multiwriter
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                    .withAutoClean(false)
+                    .withAsyncClean(true)
+                    
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                    .withInlineCompaction(false)
+                    .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+            .withEmbeddedTimelineServerEnabled(false)
+            // Timeline-server-based markers are not used for multi-writer 
tests
+            .withMarkersType(MarkerType.DIRECT.name())
+            
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+                    FileSystemViewStorageType.MEMORY).build())
+            
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+            // Set the config so that heartbeat will expire in 1 second 
without update
+            
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+                    
.build()).withAutoCommit(false).withProperties(lockProperties);
+    Set<String> validInstants = new HashSet<>();
+    // Create the first commit with inserts
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    createCommitWithInserts(cfg, client, "000", "001", 200, true);
+    validInstants.add("001");
+
+    // Three clients running actions in parallel
+    final int threadCount = 3;
+    final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+    final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+    final String commitTime2 = "002";
+    final String commitTime3 = "003";
+    AtomicReference<Object> writeStatus1 = new AtomicReference<>(null);
+    AtomicReference<Object> writeStatus2 = new AtomicReference<>(null);
+
+    Future future1 = executor.submit(() -> {
+      final int numRecords = 100;
+      assertDoesNotThrow(() -> {
+        writeStatus1.set(createCommitWithInserts(cfg, client1, "001", 
commitTime2, numRecords, false));
+      });
+    });
+    Future future2 = executor.submit(() -> {
+      final int numRecords = 100;
+      assertDoesNotThrow(() -> {
+        writeStatus2.set(createCommitWithInserts(cfg, client2, "001", 
commitTime3, numRecords, false));
+      });
+    });
+
+    future1.get();
+    future2.get();
+
+    final CountDownLatch commitCountDownLatch = new CountDownLatch(2);
+    HoodieTableMetaClient tableMetaClient = 
client.getTableServiceClient().createMetaClient(true);
+
+    // Get inflight instant stream before commit
+    List<HoodieInstant> inflightInstants = client
+            .getTableServiceClient()
+            .getInflightTimelineExcludeCompactionAndClustering(tableMetaClient)
+            .getReverseOrderedInstants()
+            .collect(Collectors.toList());
+
+    // Commit the instants and get instants to rollback in parallel
+    future1 = executor.submit(() -> {
+      long start = System.currentTimeMillis();
+      LOG.info(String.format("Start to commit instant %s", commitTime2));
+      client1.commit(commitTime2, writeStatus1.get());
+      commitCountDownLatch.countDown();
+      validInstants.add(commitTime2);
+      LOG.info(String.format("commit the instant cost %d ms", 
System.currentTimeMillis() - start));
+    });
+
+    future2 = executor.submit(() -> {
+      long start = System.currentTimeMillis();
+      LOG.info(String.format("Start to commit instant %s", commitTime3));
+      client2.commit(commitTime3, writeStatus2.get());
+      commitCountDownLatch.countDown();
+      validInstants.add(commitTime3);
+      LOG.info(String.format("commit the instant %s cost %d ms", commitTime3, 
System.currentTimeMillis() - start));
+    });
+
+    Future future3 = executor.submit(() -> {
+      try {
+        commitCountDownLatch.await(30000, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        //
+      }
+      LOG.info("Start to get instants to rollback");
+      List<String> instantsToRollback =
+              
client.getTableServiceClient().getInstantsToRollbackForLazyCleanPolicy(tableMetaClient,
 inflightInstants.stream());
+      // No instants will be rollback though some instants may be detected as 
`expired`
+      assertTrue(instantsToRollback.isEmpty());

Review Comment:
   Nice suggestions! will modify it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to