nsivabalan commented on code in PR #18012:
URL: https://github.com/apache/hudi/pull/18012#discussion_r2751676399
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -915,6 +915,145 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
client3.close();
}
+ /**
+ * Test that when two writers attempt to execute the same compaction plan
concurrently,
+ * at least one will succeed and create a completed compaction. Verifies
that the timeline
+ * has compaction requested and completed instant files.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Two clients attempting to execute the same compaction plan concurrently
+ final int threadCount = 2;
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+ final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ Future<?> future1 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
Review Comment:
can we use countdownLatch to be deterministic and not add a 60 sec wait.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1221,7 +1223,28 @@ public void commitLogCompaction(String
compactionInstantTime, HoodieWriteMetadat
public HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean
shouldComplete) {
HoodieTable table = createTable(config);
preWrite(compactionInstantTime, WriteOperationType.COMPACT,
table.getMetaClient());
- return tableServiceClient.compact(table, compactionInstantTime,
shouldComplete);
+ HoodieInstant compactionPlanInstant =
table.getMetaClient().getInstantGenerator().getCompactionRequestedInstant(compactionInstantTime);
+ if (config.getWriteConcurrencyMode().supportsMultiWriter()) {
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for
compact, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginStateChange(Option.of(compactionPlanInstant),
txnManager.getLastCompletedTransactionOwner());
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime))
{
+ throw new HoodieLockException("Cannot compact instant " +
compactionInstantTime + " due to heartbeat by existing job");
Review Comment:
minor.
`heartbeat by concurrent table service writer/job`
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -915,6 +915,145 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
client3.close();
}
+ /**
+ * Test that when two writers attempt to execute the same compaction plan
concurrently,
+ * at least one will succeed and create a completed compaction. Verifies
that the timeline
+ * has compaction requested and completed instant files.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Two clients attempting to execute the same compaction plan concurrently
+ final int threadCount = 2;
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+ final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ Future<?> future1 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
+
+ // Attempt to execute compaction with auto-complete
+ client1.compact(compactionInstantTime, true);
+ writer1Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 1 failed with exception: " + e.getMessage());
+ writer1Succeeded.set(false);
+ }
+ });
+
+ Future<?> future2 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
+
+ // Attempt to execute compaction with auto-complete
+ client2.compact(compactionInstantTime, true);
+ writer2Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 2 failed with exception: " + e.getMessage());
+ writer2Succeeded.set(false);
+ }
+ });
+
+ // Wait for both futures to complete
+ future1.get();
+ future2.get();
+
+ // Verify at least one writer succeeded
+ assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
+ "At least one writer should succeed in executing the compaction");
+
+ // Reload timeline and verify compaction is completed
+ HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+
+ // Verify compaction instant is completed
+ HoodieTimeline compactionTimeline = reloadedTimeline.getTimelineOfActions(
+ Collections.singleton(HoodieTimeline.COMPACTION_ACTION));
+ List<HoodieInstant> compactionInstants = compactionTimeline.getInstants();
+
+ // Verify there is a completed compaction instant
+ boolean hasCompletedCompaction = compactionInstants.stream()
+ .anyMatch(instant ->
instant.requestedTime().equals(compactionInstantTime) && instant.isCompleted());
+ assertTrue(hasCompletedCompaction,
+ "There should be a completed compaction instant in the timeline");
+
+ // Verify no pending compaction exists for this instant (it should be
completed)
+ HoodieTimeline finalPendingCompactionTimeline =
reloadedTimeline.filterPendingCompactionTimeline();
+
assertFalse(finalPendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should no longer be pending after execution");
+
+ // Clean up
+ executors.shutdown();
+ client.close();
+ client1.close();
+ client2.close();
+ FileIOUtils.deleteDirectory(new File(basePath));
+ }
Review Comment:
can we also test the case where writer1 started compaction execution and
failed mid-way. and after sometime writer2 starts and able to rollback and
execute the failed compaction (after heart beat expired)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]