nsivabalan commented on code in PR #18012:
URL: https://github.com/apache/hudi/pull/18012#discussion_r2830923072
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -306,21 +308,47 @@ protected HoodieWriteMetadata<O> compact(String
compactionInstantTime, boolean s
* @return Collection of Write Status
*/
protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table,
String compactionInstantTime, boolean shouldComplete) {
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for compact
with auto-commit, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginStateChange(Option.of(inflightInstant),
txnManager.getLastCompletedTransactionOwner());
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+ throw new HoodieLockException("Cannot compact instant " +
compactionInstantTime + " due to heartbeat by concurrent writer/job");
Review Comment:
can we move this to a private method called `handleHeartBeatExpiry(..)`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -357,32 +385,38 @@ public void commitCompaction(String
compactionInstantTime, HoodieWriteMetadata<O
* Commit Compaction and track metrics.
*/
protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime, List<HoodieWriteStat>
partialMetadataWriteStats) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
- handleWriteErrors(writeStats, TableServiceType.COMPACT);
- InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
- final HoodieInstant compactionInstant =
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
try {
- this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
- finalizeWrite(table, compactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- writeToMetadataTable(table, compactionCommitTime, metadata,
partialMetadataWriteStats);
- log.info("Committing Compaction {}", compactionCommitTime);
- CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
- log.debug("Compaction {} finished with result: {}",
compactionCommitTime, metadata);
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ handleWriteErrors(writeStats, TableServiceType.COMPACT);
+ InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
+ final HoodieInstant compactionInstant =
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
+ try {
+ this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ writeToMetadataTable(table, compactionCommitTime, metadata,
partialMetadataWriteStats);
+ log.info("Committing Compaction {}", compactionCommitTime);
+ CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
+ log.debug("Compaction {} finished with result: {}",
compactionCommitTime, metadata);
+ } finally {
+ this.txnManager.endStateChange(Option.of(compactionInstant));
+ releaseResources(compactionCommitTime);
+ }
+ WriteMarkersFactory.get(config.getMarkersType(), table,
compactionCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+
TimelineUtils.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
->
+ metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs,
metadata, COMPACTION_ACTION)
+ );
+ }
+ log.info("Compacted successfully on commit {}", compactionCommitTime);
+ } catch (Exception e) {
+ throw e;
Review Comment:
same comment as above
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -306,21 +308,47 @@ protected HoodieWriteMetadata<O> compact(String
compactionInstantTime, boolean s
* @return Collection of Write Status
*/
protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table,
String compactionInstantTime, boolean shouldComplete) {
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for compact
with auto-commit, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginStateChange(Option.of(inflightInstant),
txnManager.getLastCompletedTransactionOwner());
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+ throw new HoodieLockException("Cannot compact instant " +
compactionInstantTime + " due to heartbeat by concurrent writer/job");
Review Comment:
also, thinking if we really needed to throw here if not expired? can we just
return right away w/o throwing?
bcoz, we have `runAnyPendingCompactions` in `BaseHoodieTableServiceClient`.
which will go through any pending compactions and will call `compact`.
So, if some other table service task(task1) is executing compaction instant
say t10, task2 can ignore that, and move in to executing compaction instant
t15.
but as of now, task2 will see an exception and may not be able to move
ahead.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -915,6 +916,373 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
client3.close();
}
+ /**
+ * Test that when two writers attempt to execute the same compaction plan
concurrently,
+ * at least one will succeed and create a completed compaction. Verifies
that the timeline
+ * has compaction requested and completed instant files.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Two clients attempting to execute the same compaction plan concurrently
+ final int threadCount = 2;
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+ final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ Future<?> future1 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await();
+
+ // Attempt to execute compaction with auto-complete
+ client1.compact(compactionInstantTime, true);
+ writer1Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 1 failed with exception: " + e.getMessage());
+ writer1Succeeded.set(false);
+ }
+ });
+
+ Future<?> future2 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await();
+
+ // Attempt to execute compaction with auto-complete
+ client2.compact(compactionInstantTime, true);
+ writer2Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 2 failed with exception: " + e.getMessage());
+ writer2Succeeded.set(false);
+ }
+ });
+
+ // Wait for both futures to complete
+ future1.get();
+ future2.get();
+
+ // Verify at least one writer succeeded
+ assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
Review Comment:
lets also validate that both are not true
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -306,21 +308,47 @@ protected HoodieWriteMetadata<O> compact(String
compactionInstantTime, boolean s
* @return Collection of Write Status
*/
protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table,
String compactionInstantTime, boolean shouldComplete) {
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for compact
with auto-commit, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginStateChange(Option.of(inflightInstant),
txnManager.getLastCompletedTransactionOwner());
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+ throw new HoodieLockException("Cannot compact instant " +
compactionInstantTime + " due to heartbeat by concurrent writer/job");
+ }
+ } catch (IOException e) {
+ throw new HoodieHeartbeatException("Error accessing heartbeat of
instant to compact " + compactionInstantTime, e);
+ }
+ if
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
{
+ throw new HoodieException("Requested compaction instant " +
compactionInstantTime + " is not present as pending or already completed in the
active timeline.");
+ }
+ this.heartbeatClient.start(compactionInstantTime);
+ } finally {
+ txnManager.endStateChange(Option.of(inflightInstant));
}
- compactionTimer = metrics.getCompactionCtx();
- HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime,
WriteOperationType.COMPACT);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(updatedWriteMetadata);
- if (shouldComplete) {
- commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
+ try {
+ HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
+ if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+ table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ compactionTimer = metrics.getCompactionCtx();
+ HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
+ HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime,
WriteOperationType.COMPACT);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(updatedWriteMetadata);
+ if (shouldComplete) {
+ commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
+ }
+ return compactionWriteMetadata;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ this.heartbeatClient.stop(compactionInstantTime);
Review Comment:
don't we need to stop this only when exception happens?
for happy paths, the finally block w/n commitCompaction will take care of
stopping the heart beat right.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java:
##########
@@ -70,35 +70,41 @@ protected TableWriteStats
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<Li
@Override
protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime, List<HoodieWriteStat>
partialMetadataWriteStats) {
- this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
- List<HoodieWriteStat> writeStats = metadata.getWriteStats();
- final HoodieInstant compactionInstant =
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
try {
- this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
- finalizeWrite(table, compactionCommitTime, writeStats);
- // commit to data table after committing to metadata table.
- // Do not do any conflict resolution here as we do with regular writes.
We take the lock here to ensure all writes to metadata table happens within a
- // single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
- writeTableMetadata(table, compactionCommitTime, metadata);
- log.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
- CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
- } finally {
- this.txnManager.endStateChange(Option.of(compactionInstant));
- }
- WriteMarkersFactory
- .get(config.getMarkersType(), table, compactionCommitTime)
- .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
- if (compactionTimer != null) {
- long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
+ List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+ final HoodieInstant compactionInstant =
table.getInstantGenerator().getCompactionInflightInstant(compactionCommitTime);
try {
-
metrics.updateCommitMetrics(TimelineUtils.parseDateFromInstantTime(compactionCommitTime).getTime(),
- durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
- } catch (ParseException e) {
- throw new HoodieCommitException("Commit time is not of valid format.
Failed to commit compaction "
- + config.getBasePath() + " at time " + compactionCommitTime, e);
+ this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
+ finalizeWrite(table, compactionCommitTime, writeStats);
+ // commit to data table after committing to metadata table.
+ // Do not do any conflict resolution here as we do with regular
writes. We take the lock here to ensure all writes to metadata table happens
within a
+ // single lock (single writer). Because more than one write to
metadata table will result in conflicts since all of them updates the same
partition.
+ writeTableMetadata(table, compactionCommitTime, metadata);
+ log.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
+ CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
+ } finally {
+ this.txnManager.endStateChange(Option.of(compactionInstant));
}
+ WriteMarkersFactory
+ .get(config.getMarkersType(), table, compactionCommitTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ if (compactionTimer != null) {
+ long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+ try {
+
metrics.updateCommitMetrics(TimelineUtils.parseDateFromInstantTime(compactionCommitTime).getTime(),
+ durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
+ } catch (ParseException e) {
+ throw new HoodieCommitException("Commit time is not of valid format.
Failed to commit compaction "
+ + config.getBasePath() + " at time " + compactionCommitTime, e);
+ }
+ }
+ log.info("Compacted successfully on commit " + compactionCommitTime);
+ } catch (Exception e) {
+ throw e;
Review Comment:
same here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -306,21 +308,47 @@ protected HoodieWriteMetadata<O> compact(String
compactionInstantTime, boolean s
* @return Collection of Write Status
*/
protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table,
String compactionInstantTime, boolean shouldComplete) {
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for compact
with auto-commit, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginStateChange(Option.of(inflightInstant),
txnManager.getLastCompletedTransactionOwner());
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+ throw new HoodieLockException("Cannot compact instant " +
compactionInstantTime + " due to heartbeat by concurrent writer/job");
+ }
+ } catch (IOException e) {
+ throw new HoodieHeartbeatException("Error accessing heartbeat of
instant to compact " + compactionInstantTime, e);
+ }
+ if
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
{
Review Comment:
why do we need to reload the timeline here?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -915,6 +916,373 @@ public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
client3.close();
}
+ /**
+ * Test that when two writers attempt to execute the same compaction plan
concurrently,
+ * at least one will succeed and create a completed compaction. Verifies
that the timeline
+ * has compaction requested and completed instant files.
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testConcurrentCompactionExecutionOnSamePlan() throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(60 * 1000)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Two clients attempting to execute the same compaction plan concurrently
+ final int threadCount = 2;
+ final ExecutorService executors =
Executors.newFixedThreadPool(threadCount);
+ final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
+ final AtomicBoolean writer1Succeeded = new AtomicBoolean(false);
+ final AtomicBoolean writer2Succeeded = new AtomicBoolean(false);
+
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ Future<?> future1 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await();
+
+ // Attempt to execute compaction with auto-complete
+ client1.compact(compactionInstantTime, true);
+ writer1Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 1 failed with exception: " + e.getMessage());
+ writer1Succeeded.set(false);
+ }
+ });
+
+ Future<?> future2 = executors.submit(() -> {
+ try {
+ // Wait for both writers to be ready
+ cyclicBarrier.await();
+
+ // Attempt to execute compaction with auto-complete
+ client2.compact(compactionInstantTime, true);
+ writer2Succeeded.set(true);
+ } catch (Exception e) {
+ // Expected - one writer may fail due to concurrent execution
+ LOG.info("Writer 2 failed with exception: " + e.getMessage());
+ writer2Succeeded.set(false);
+ }
+ });
+
+ // Wait for both futures to complete
+ future1.get();
+ future2.get();
+
+ // Verify at least one writer succeeded
+ assertTrue(writer1Succeeded.get() || writer2Succeeded.get(),
+ "At least one writer should succeed in executing the compaction");
+
+ // Reload timeline and verify compaction is completed
+ HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+
+ // Verify compaction instant is completed
+ HoodieTimeline completedCompactionTimeline =
reloadedTimeline.filterCompletedInstants().getCommitTimeline();
+
+ // Verify there is a completed compaction instant
+ boolean hasCompletedCompaction =
completedCompactionTimeline.getInstantsAsStream()
+ .anyMatch(instant ->
instant.requestedTime().equals(compactionInstantTime));
+ assertTrue(hasCompletedCompaction,
+ "The completed compaction instant should be in the timeline");
+
+ // Verify no pending compaction exists for this instant (it should be
completed)
+ HoodieTimeline finalPendingCompactionTimeline =
reloadedTimeline.filterPendingCompactionTimeline();
+
assertFalse(finalPendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should no longer be pending after execution");
+
+ // Clean up
+ executors.shutdown();
+ client.close();
+ client1.close();
+ client2.close();
+ FileIOUtils.deleteDirectory(new File(basePath));
+ }
+
+ /**
+ * Test that when writer1 starts compaction (with auto commit enabled)
execution and fails mid-way,
+ * and after sometime writer2 starts and is able to rollback and execute the
failed compaction
+ * (after heartbeat expired).
+ *
+ * This test uses a MOR table with multiwriter/optimistic concurrent control
enabled.
+ */
+ @Test
+ public void testCompactionRecoveryAfterWriterFailureWithHeartbeatExpiry()
throws Exception {
+ // Set up MOR table
+ setUpMORTestTable();
+
+ // Use short heartbeat interval so we can wait for expiry in test
+ int heartbeatIntervalMs = 2000;
+ int numTolerableHeartbeatMisses = 1;
+
+ Properties properties = new Properties();
+ properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
"3000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
"3");
+
+ // Build write config with multiwriter/OCC enabled and short heartbeat
+ HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
+ .withHeartbeatIntervalInMs(heartbeatIntervalMs)
+ .withHeartbeatTolerableMisses(numTolerableHeartbeatMisses)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .withMarkersType(MarkerType.DIRECT.name())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withConflictResolutionStrategy(new
SimpleConcurrentFileWritesConflictResolutionStrategy())
+ .build())
+ .withProperties(properties);
+
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+
+ // Create initial commit with inserts (creates base files)
+ SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+ String firstCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithInserts(cfg, client, "000", firstCommitTime, 200);
+
+ // Create delta commits (upserts create log files which are needed for
compaction)
+ String secondCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, firstCommitTime, Option.empty(),
secondCommitTime, 100);
+ String thirdCommitTime = WriteClientTestUtils.createNewInstantTime();
+ createCommitWithUpserts(cfg, client, secondCommitTime, Option.empty(),
thirdCommitTime, 100);
+
+ // Schedule compaction - this creates the compaction plan (requested
instant)
+ Option<String> compactionInstantOpt =
client.scheduleTableService(Option.empty(), Option.empty(),
TableServiceType.COMPACT);
+ assertTrue(compactionInstantOpt.isPresent(), "Compaction should be
scheduled");
+ String compactionInstantTime = compactionInstantOpt.get();
+ client.close();
+
+ // Verify compaction is in requested state before execution
+ HoodieTimeline pendingCompactionTimeline =
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline();
+
assertTrue(pendingCompactionTimeline.containsInstant(compactionInstantTime),
+ "Compaction instant should be in pending state after scheduling");
+
+ // Writer1: Start compaction with auto-commit, but simulate failure by
starting heartbeat
+ // and then stopping the writer without completing the compaction
+ SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
+
+ // Simulate writer1 starting compaction but failing mid-way:
+ // 1. Start heartbeat for the compaction instant
+ // 2. Transition the compaction to inflight state (simulating that
execution started)
+ // 3. Then "fail" by stopping heartbeat and closing the client without
completing
+
+ // Start heartbeat for this compaction instant (simulating writer1 started
execution)
+ client1.getHeartbeatClient().start(compactionInstantTime);
+
+ // Transition compaction from requested to inflight (simulating execution
started)
+ HoodieInstant requestedInstant = metaClient.reloadActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .getInstantsAsStream()
+ .filter(i -> i.requestedTime().equals(compactionInstantTime))
+ .findFirst()
+ .get();
+
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(requestedInstant);
+
+ // Verify compaction is now inflight
+ HoodieTimeline reloadedTimeline = metaClient.reloadActiveTimeline();
+ HoodieInstant inflightInstant =
INSTANT_GENERATOR.getCompactionInflightInstant(compactionInstantTime);
+
assertTrue(reloadedTimeline.filterPendingCompactionTimeline().containsInstant(inflightInstant),
+ "Compaction instant should be in inflight state");
+
+ // Simulate writer1 failure by stopping heartbeat (deletes heartbeat file
from DFS, so it is immediately
+ // considered expired when writer2 checks)
+ client1.getHeartbeatClient().stop(compactionInstantTime);
+ client1.close();
+
+ // Writer2: Now comes in after heartbeat expired, should be able to
rollback and execute compaction
+ SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
+
+ // Writer2 attempts to execute the compaction with auto-commit
+ // This should detect expired heartbeat, rollback the inflight compaction,
and re-execute it
+ assertDoesNotThrow(() -> {
+ client2.compact(compactionInstantTime, true);
+ }, "Writer2 should be able to rollback and execute the failed compaction
after heartbeat expires");
+
+ // Reload timeline and verify compaction is completed
+ HoodieTimeline finalTimeline = metaClient.reloadActiveTimeline();
+
+ // Verify compaction instant is completed
+ HoodieTimeline completedCompactionTimeline =
finalTimeline.filterCompletedInstants().getCommitTimeline();
+ boolean hasCompletedCompaction =
completedCompactionTimeline.getInstantsAsStream()
+ .anyMatch(instant ->
instant.requestedTime().equals(compactionInstantTime));
+ assertTrue(hasCompletedCompaction,
+ "The compaction instant should be completed after writer2 recovery");
Review Comment:
can we also validate that we could see a rollback in the timeline.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -306,21 +308,47 @@ protected HoodieWriteMetadata<O> compact(String
compactionInstantTime, boolean s
* @return Collection of Write Status
*/
protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table,
String compactionInstantTime, boolean shouldComplete) {
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
HoodieInstant inflightInstant =
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
- if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
- table.getMetaClient().reloadActiveTimeline();
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for compact
with auto-commit, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginStateChange(Option.of(inflightInstant),
txnManager.getLastCompletedTransactionOwner());
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+ throw new HoodieLockException("Cannot compact instant " +
compactionInstantTime + " due to heartbeat by concurrent writer/job");
+ }
+ } catch (IOException e) {
+ throw new HoodieHeartbeatException("Error accessing heartbeat of
instant to compact " + compactionInstantTime, e);
+ }
+ if
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
{
+ throw new HoodieException("Requested compaction instant " +
compactionInstantTime + " is not present as pending or already completed in the
active timeline.");
+ }
+ this.heartbeatClient.start(compactionInstantTime);
+ } finally {
+ txnManager.endStateChange(Option.of(inflightInstant));
}
- compactionTimer = metrics.getCompactionCtx();
- HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime,
WriteOperationType.COMPACT);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(updatedWriteMetadata);
- if (shouldComplete) {
- commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
+ try {
+ HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
+ if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+ table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
txnManager);
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ compactionTimer = metrics.getCompactionCtx();
+ HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
+ HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime,
WriteOperationType.COMPACT);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(updatedWriteMetadata);
+ if (shouldComplete) {
+ commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
+ }
+ return compactionWriteMetadata;
+ } catch (Exception e) {
+ throw e;
Review Comment:
we can remove the catch block. try w/ just finally it totally valid.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]