kbuci commented on code in PR #18012:
URL: https://github.com/apache/hudi/pull/18012#discussion_r2772961605


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -915,6 +915,145 @@ public void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
     client3.close();
   }
 
+  /**
+   * Test that when two writers attempt to execute the same compaction plan 
concurrently,
+   * at least one will succeed and create a completed compaction. Verifies 
that the timeline
+   * has compaction requested and completed instant files.
+   *
+   * This test uses a MOR table with multiwriter/optimistic concurrent control 
enabled.
+   */
+  @Test
+  public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+    // Set up MOR table
+    setUpMORTestTable();
+
+    Properties properties = new Properties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "1000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, 
"3");
+
+    // Build write config with multiwriter/OCC enabled
+    HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .withAutoClean(false).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withMarkersType(MarkerType.DIRECT.name())
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .withConflictResolutionStrategy(new 
SimpleConcurrentFileWritesConflictResolutionStrategy())
+            .build())
+        .withProperties(properties);
+
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+    // Create initial commit with inserts (creates base files)
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+    // Create delta commits (upserts create log files which are needed for 
compaction)
+    String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(), 
secondCommitTime, 100);
+    String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+    createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(), 
thirdCommitTime, 100);
+
+    // Schedule compaction - this creates the compaction plan (requested 
instant)
+    Option<String> compactionInstantOpt = 
client.scheduleTableService(Option.empty(), Option.empty(), 
TableServiceType.COMPACT);
+    assertTrue(compactionInstantOpt.isPresent(), "Compaction should be 
scheduled");
+    String compactionInstantTime = compactionInstantOpt.get();
+
+    // Verify compaction is in requested state before execution
+    HoodieTimeline pendingCompactionTimeline = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+    
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should be in pending state after scheduling");
+
+    // Two clients attempting to execute the same compaction plan concurrently
+    final int threadCount = 2;
+    final ExecutorService executors = 
Executors.newFixedThreadPool(threadCount);
+    final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+    final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+    final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+    SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+    SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+    Future<?> future1 = executors.submit(() -> {
+      try {
+        // Wait for both writers to be ready
+        cyclicBarrier.await(60, TimeUnit.SECONDS);
+
+        // Attempt to execute compaction with auto-complete
+        client1.compact(compactionInstantTime, true);
+        writer1Succeeded.set(true);
+      } catch (Exception e) {
+        // Expected - one writer may fail due to concurrent execution
+        LOG.info("Writer 1 failed with exception: " + e.getMessage());
+        writer1Succeeded.set(false);
+      }
+    });
+
+    Future<?> future2 = executors.submit(() -> {
+      try {
+        // Wait for both writers to be ready
+        cyclicBarrier.await(60, TimeUnit.SECONDS);
+
+        // Attempt to execute compaction with auto-complete
+        client2.compact(compactionInstantTime, true);
+        writer2Succeeded.set(true);
+      } catch (Exception e) {
+        // Expected - one writer may fail due to concurrent execution
+        LOG.info("Writer 2 failed with exception: " + e.getMessage());
+        writer2Succeeded.set(false);
+      }
+    });
+
+    // Wait for both futures to complete
+    future1.get();
+    future2.get();
+
+    // Verify at least one writer succeeded
+    assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
+        "At least one writer should succeed in executing the compaction");
+
+    // Reload timeline and verify compaction is completed
+    HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+
+    // Verify compaction instant is completed
+    HoodieTimeline compactionTimeline = reloadedTimeline.getTimelineOfActions(
+        Collections.singleton(HoodieTimeline.COMPACTION_ACTION));
+    List<HoodieInstant> compactionInstants = compactionTimeline.getInstants();
+
+    // Verify there is a completed compaction instant
+    boolean hasCompletedCompaction = compactionInstants.stream()
+        .anyMatch(instant -> 
instant.requestedTime().equals(compactionInstantTime) && instant.isCompleted());
+    assertTrue(hasCompletedCompaction,
+        "There should be a completed compaction instant in the timeline");
+
+    // Verify no pending compaction exists for this instant (it should be 
completed)
+    HoodieTimeline finalPendingCompactionTimeline = 
reloadedTimeline.filterPendingCompactionTimeline();
+    
assertFalse(finalPendingCompactionTimeline.containsInstant(compactionInstantTime),
+        "Compaction instant should no longer be pending after execution");
+
+    // Clean up
+    executors.shutdown();
+    client.close();
+    client1.close();
+    client2.close();
+    FileIOUtils.deleteDirectory(new File(basePath));
+  }

Review Comment:
   Sure, added another test cae



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to