kbuci commented on code in PR #18012:
URL: https://github.com/apache/hudi/pull/18012#discussion_r2775659973
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -915,6 +915,145 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
client3.close();
}
+ /**
+ * Test that when two writers attempt to execute the same compaction plan
concurrently,
+ * at least one will succeed and create a completed compaction. Verifies
that the timeline
+ * has compaction requested and completed instant files.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Two clients attempting to execute the same compaction plan concurrently
+ final int threadCount = 2;
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+ final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ Future<?> future1 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await(60, TimeUnit.SECONDS);
Review Comment:
I removed the 60 sec wait, but my understanding is that this API is
deterministic as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]