nsivabalan commented on code in PR #11580:
URL: https://github.com/apache/hudi/pull/11580#discussion_r1771725525


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java:
##########
@@ -163,6 +173,85 @@ public void testWriteDuringCompaction(String payloadClass) 
throws IOException {
     assertEquals(300, readTableTotalRecordsNum());
   }
 
+  @Test
+  public void testOutOfOrderCompactionSchedules() throws IOException, 
ExecutionException, InterruptedException {
+    HoodieWriteConfig config = getWriteConfigWithMockCompactionStrategy();
+    HoodieWriteConfig config2 = getWriteConfig();
+
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, new 
Properties());
+    client = getHoodieWriteClient(config);
+
+    // write data and commit
+    writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
+
+    // 2nd batch
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    JavaRDD records = jsc().parallelize(dataGen.generateUniqueUpdates(commit2, 
50), 2);
+    client.startCommitWithTime(commit2);
+    List<WriteStatus> writeStatuses = client.upsert(records, 
commit2).collect();
+    List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    client.commitStats(commit2, context().parallelize(writeStatuses, 1), 
writeStats, Option.empty(), metaClient.getCommitActionType());
+
+    // schedule compaction
+    String compactionInstant1 = HoodieActiveTimeline.createNewInstantTime();
+    Thread.sleep(10);
+    String compactionInstant2 = HoodieActiveTimeline.createNewInstantTime();
+
+    final AtomicBoolean writer1Completed = new AtomicBoolean(false);
+    final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+    final ExecutorService executors = Executors.newFixedThreadPool(2);
+    final SparkRDDWriteClient client2 = getHoodieWriteClient(config2);
+    Future future1 = executors.submit(() -> {
+      try {
+        client.scheduleCompactionAtInstant(compactionInstant1, Option.empty());
+        // since compactionInstant1 is earlier than compactionInstant2, and 
compaction strategy sleeps for 10s, this is expected to throw.
+        fail("Should not have reached here");
+      } catch (Exception e) {
+        writer1Completed.set(true);
+      }
+    });
+
+    Future future2 = executors.submit(() -> {
+      try {
+        Thread.sleep(10);
+        assertTrue(client2.scheduleCompactionAtInstant(compactionInstant2, 
Option.empty()));
+        writer2Completed.set(true);
+      } catch (Exception e) {
+        throw new HoodieException("Should not have reached here");
+      }
+    });
+
+    future1.get();
+    future2.get();
+
+    // both should have been completed successfully. I mean, we already assert 
for conflict for writer2 at L155.

Review Comment:
   not sure I get it. can you clarify. 
   We assert that writer1 fails. and we also assert that writer2 completes 
successfully. and finally I just validate that both writers have completed 
here. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to