This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f763da2bc197 feat(conflict-resolution): Allow 
PreferWriterConflictResolutionStrategy to abort clustering if there is an 
ongoing write that is in requested state. (#18280)
f763da2bc197 is described below

commit f763da2bc19750f96aa0a1cf57a322726bf05d51
Author: Krishen <[email protected]>
AuthorDate: Fri Mar 13 14:39:52 2026 -0700

    feat(conflict-resolution): Allow PreferWriterConflictResolutionStrategy to 
abort clustering if there is an ongoing write that is in requested state. 
(#18280)
    
    #17907
    
    Summary and Changelog
    Summary: When PreferWriterConflictResolutionStrategy is used for write 
conflict resolution, clustering commits can now be configured to self-abort if 
there are any pending ingestion instants (in .requested state) that have an 
active heartbeat but have not yet transitioned to .inflight. This prevents 
clustering from committing and potentially causing a conflict with an ongoing 
ingestion write, since ingestion should have precedence over clustering (with 
this strategy). This is only rec [...]
    
    The heartbeat timeout is inferred from the write config's heartbeat 
interval and tolerable misses
    
    A new dedicated exception 
(HoodieWriteConflictAwaitingIngestionInflightException) and metric 
(conflict_resolution.awaiting_ingestion_inflight.counter) are emitted when this 
scenario occurs, allowing users to distinguish this specific failure mode from 
general write conflicts.
    
    Changelog:
    
    hudi-common
    HoodieWriteConflictAwaitingIngestionInflightException: New exception 
extending HoodieWriteConflictException, thrown when clustering detects a 
pending ingestion instant with an active heartbeat.
    hudi-client-common
    ConflictResolutionStrategy: Added a new default overload 
getCandidateInstants(metaClient, currentInstant, lastSuccessfulInstant, 
Option<HoodieWriteConfig>) that delegates to the existing method. 
Implementations can override to use the write config for additional behavior.
    PreferWriterConflictResolutionStrategy: Overrides the new 
getCandidateInstants overload. When the current operation is clustering and 
hoodie.clustering.block_for_pending_ingestion is enabled, checks ingestion 
.requested instants for active heartbeats. If an active heartbeat is found, 
throws HoodieWriteConflictAwaitingIngestionInflightException. If the heartbeat 
is expired, the instant is ignored (assumed to be a failed write).
    HoodieWriteConfig: Added CLUSTERING_BLOCK_FOR_PENDING_INGESTION config 
property (default false), getter isClusteringBlockForPendingIngestion(), and 
builder method withClusteringBlockForPendingIngestion(boolean).
    TransactionUtils: Updated resolveWriteConflictIfAny to pass 
Option.of(config) to the new getCandidateInstants overload, threading the write 
config through.
    BaseHoodieClient: Updated resolveWriteConflict to catch 
HoodieWriteConflictAwaitingIngestionInflightException and emit a dedicated 
metric before re-throwing.
    HoodieMetrics: Added emitConflictResolutionAwaitingIngestionInflight() 
counter metric.
    TestPreferWriterConflictResolutionStrategy: Added 3 tests covering: (1) 
clustering self-aborts with active heartbeat ingestion .requested instant, (2) 
no blocking when config is disabled, (3) old method signature delegates with 
defaults (blocking disabled).
    
    Impact
    User-facing: New opt-in config 
hoodie.clustering.block_for_pending_ingestion (default false). No change to 
existing behavior unless explicitly enabled.
    Behavior: When enabled, clustering writes using 
PreferWriterConflictResolutionStrategy will fail with 
HoodieWriteConflictAwaitingIngestionInflightException if they detect an 
ingestion .requested instant with an active heartbeat, preventing clustering 
from committing and risking a conflict with ongoing ingestion.
    Performance: Negligible. When enabled, adds heartbeat file existence checks 
for .requested ingestion instants during clustering commit conflict resolution.
    
    Risk Level
    Low. Config defaults to false, so existing behavior is unchanged. The new 
exception is a subclass of HoodieWriteConflictException, so existing catch 
blocks continue to work. All existing tests pass; 3 new tests validate the new 
behavior.
    
    
    
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../org/apache/hudi/client/BaseHoodieClient.java   |   1 +
 .../client/transaction/ConcurrentOperation.java    |  12 +-
 .../transaction/ConflictResolutionStrategy.java    |  13 +
 .../PreferWriterConflictResolutionStrategy.java    | 114 ++++++--
 .../apache/hudi/client/utils/TransactionUtils.java |   2 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  19 ++
 .../org/apache/hudi/metrics/HoodieMetrics.java     |  42 +++
 ...TestPreferWriterConflictResolutionStrategy.java | 309 +++++++++++++++++++++
 .../org/apache/hudi/metrics/TestHoodieMetrics.java |  35 +++
 .../exception/HoodieWriteConflictException.java    |  31 +++
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  62 +++++
 11 files changed, 619 insertions(+), 21 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 6fe7771d7252..b0875672b5e9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -231,6 +231,7 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
       metrics.emitConflictResolutionSuccessful();
     } catch (HoodieWriteConflictException e) {
       metrics.emitConflictResolutionFailed();
+      e.getCategory().ifPresent(metrics::emitConflictResolutionByCategory);
       throw e;
     } finally {
       if (conflictResolutionTimer != null) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index dc3bd59c6211..177119a789ab 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -118,9 +118,15 @@ public class ConcurrentOperation {
           break;
         case COMMIT_ACTION:
         case DELTA_COMMIT_ACTION:
-          this.mutatedPartitionAndFileIds = 
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
-              .getPartitionToWriteStats());
-          this.operationType = 
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
+          // Ingestion .requested instants may have empty plan files, so the 
deserialized
+          // commit metadata will be null. In that case we leave 
mutatedPartitionAndFileIds
+          // empty and operationType unset, since the write has not started 
yet.
+          org.apache.hudi.avro.model.HoodieCommitMetadata avroCommitMeta =
+              
this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata();
+          if (avroCommitMeta != null) {
+            this.mutatedPartitionAndFileIds = 
getPartitionAndFileIdWithoutSuffixFromSpecificRecord(avroCommitMeta.getPartitionToWriteStats());
+            this.operationType = 
WriteOperationType.fromValue(avroCommitMeta.getOperationType());
+          }
           break;
         case REPLACE_COMMIT_ACTION:
         case CLUSTERING_ACTION:
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
index 1deba86fe1df..629cde07ec05 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.table.HoodieTable;
 
@@ -42,6 +43,18 @@ public interface ConflictResolutionStrategy {
    */
   Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient metaClient, 
HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
 
+  /**
+   * Overload of {@link #getCandidateInstants(HoodieTableMetaClient, 
HoodieInstant, Option)} that also accepts an
+   * optional {@link HoodieWriteConfig}. Implementations may use the write 
config to infer additional behavior
+   * (e.g., heartbeat-based blocking of clustering when pending ingestion 
instants exist).
+   *
+   * <p>The default implementation simply delegates to the existing method, 
ignoring the write config.</p>
+   */
+  default Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient 
metaClient, HoodieInstant currentInstant,
+                                                     Option<HoodieInstant> 
lastSuccessfulInstant, Option<HoodieWriteConfig> writeConfigOpt) {
+    return getCandidateInstants(metaClient, currentInstant, 
lastSuccessfulInstant);
+  }
+
   /**
    * Implementations of this method will determine whether a conflict exists 
between 2 commits.
    * @param thisOperation
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 9b02e26aecc0..2c3942f7169e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -18,15 +18,22 @@
 
 package org.apache.hudi.client.transaction;
 
+import org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.table.HoodieTable;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -45,16 +52,27 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMI
 public class PreferWriterConflictResolutionStrategy
     extends SimpleConcurrentFileWritesConflictResolutionStrategy {
 
+  private boolean isClusteringBlockForPendingIngestion;
+
   /**
    * For tableservices like replacecommit and compaction commits this method 
also returns ingestion inflight commits.
    */
   @Override
   public Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient 
metaClient, HoodieInstant currentInstant,
                                                     Option<HoodieInstant> 
lastSuccessfulInstant) {
+    return getCandidateInstants(metaClient, currentInstant, 
lastSuccessfulInstant, Option.empty());
+  }
+
+  @Override
+  public Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient 
metaClient, HoodieInstant currentInstant,
+                                                    Option<HoodieInstant> 
lastSuccessfulInstant, Option<HoodieWriteConfig> writeConfigOpt) {
     HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
-    if (ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant, 
metaClient.getInstantGenerator())
-        || COMPACTION_ACTION.equals(currentInstant.getAction())) {
-      return getCandidateInstantsForTableServicesCommits(activeTimeline, 
currentInstant);
+    boolean isCurrentOperationClustering = 
ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant, 
metaClient.getInstantGenerator());
+    this.isClusteringBlockForPendingIngestion = isCurrentOperationClustering
+        && writeConfigOpt.isPresent() && 
writeConfigOpt.get().isClusteringBlockForPendingIngestion();
+
+    if (isCurrentOperationClustering || 
COMPACTION_ACTION.equals(currentInstant.getAction())) {
+      return getCandidateInstantsForTableServicesCommits(activeTimeline, 
currentInstant, isCurrentOperationClustering, metaClient, writeConfigOpt);
     } else {
       return getCandidateInstantsForNonTableServicesCommits(activeTimeline, 
currentInstant);
     }
@@ -78,13 +96,20 @@ public class PreferWriterConflictResolutionStrategy
   }
 
   /**
-   * To find which instants are conflicting, we apply the following logic
-   * Get both completed instants and ingestion inflight commits that have 
happened since the last successful write.
-   * We need to check for write conflicts since they may have mutated the same 
files
-   * that are being newly created by the current write.
+   * Returns candidate instants for table service commits (clustering or 
compaction).
+   * Includes both completed instants and ingestion inflight commits that have 
happened
+   * since the current write started.
+   *
+   * <p>If the current write is clustering and
+   * {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution} is 
enabled,
+   * also includes ingestion {@code .requested} instants (filtering out those 
with expired heartbeats)
+   * so they can be evaluated by {@link #hasConflict} and {@link 
#resolveConflict}.</p>
    */
-  private Stream<HoodieInstant> 
getCandidateInstantsForTableServicesCommits(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant) {
-    // Fetch list of completed commits.
+  private Stream<HoodieInstant> getCandidateInstantsForTableServicesCommits(
+      HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant,
+      boolean isCurrentOperationClustering, HoodieTableMetaClient metaClient,
+      Option<HoodieWriteConfig> writeConfigOpt) {
+
     Stream<HoodieInstant> completedCommitsStream =
         activeTimeline
             .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION))
@@ -92,21 +117,76 @@ public class PreferWriterConflictResolutionStrategy
             
.findInstantsModifiedAfterByCompletionTime(currentInstant.requestedTime())
             .getInstantsAsStream();
 
-    // Fetch list of ingestion inflight commits.
-    Stream<HoodieInstant> inflightIngestionCommitsStream =
-        activeTimeline
-            .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION))
-            .filterInflights()
-            .getInstantsAsStream();
+    Stream<HoodieInstant> inflightIngestionCommitsStream;
+    if (isClusteringBlockForPendingIngestion) {
+      HoodieWriteConfig writeConfig = writeConfigOpt.get();
+      long maxHeartbeatIntervalMs = 
writeConfig.getHoodieClientHeartbeatIntervalInMs()
+          * (writeConfig.getHoodieClientHeartbeatTolerableMisses() + 1);
+      inflightIngestionCommitsStream = activeTimeline
+          .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
+          .filterInflightsAndRequested()
+          .getInstantsAsStream()
+          .filter(i -> !ClusteringUtils.isClusteringInstant(activeTimeline, i, 
metaClient.getInstantGenerator()))
+          .filter(i -> {
+            if (i.isRequested()) {
+              try {
+                return 
!HoodieHeartbeatUtils.isHeartbeatExpired(i.requestedTime(), 
maxHeartbeatIntervalMs,
+                    metaClient.getStorage(), 
metaClient.getBasePath().toString());
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+            return i.isInflight();
+          });
+    } else {
+      inflightIngestionCommitsStream = activeTimeline
+          .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
+          .filterInflights()
+          .getInstantsAsStream();
+    }
 
-    // Merge and sort the instants and return.
     List<HoodieInstant> instantsToConsider = 
Stream.concat(completedCommitsStream, inflightIngestionCommitsStream)
-        .sorted(Comparator.comparing(o -> o.getCompletionTime()))
+        .sorted(Comparator.comparing(HoodieInstant::getCompletionTime, 
Comparator.nullsLast(Comparator.naturalOrder())))
         .collect(Collectors.toList());
     log.info("Instants that may have conflict with {} are {}", currentInstant, 
instantsToConsider);
     return instantsToConsider.stream();
   }
 
+  @Override
+  public boolean hasConflict(ConcurrentOperation thisOperation, 
ConcurrentOperation otherOperation) {
+    if (isClusteringBlockForPendingIngestion
+        && WriteOperationType.CLUSTER.equals(thisOperation.getOperationType())
+        && isRequestedIngestionInstant(otherOperation)) {
+      log.info("Clustering operation {} conflicts with pending ingestion 
instant {} "
+          + "that has an active heartbeat", thisOperation, otherOperation);
+      return true;
+    }
+    return super.hasConflict(thisOperation, otherOperation);
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
+      ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
+    if (isClusteringBlockForPendingIngestion
+        && WriteOperationType.CLUSTER.equals(thisOperation.getOperationType())
+        && isRequestedIngestionInstant(otherOperation)) {
+      throw new HoodieWriteConflictException(
+          
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION,
+          String.format("Pending ingestion instant %s with active heartbeat 
has not transitioned to "
+              + "inflight yet but may potentially conflict with current 
clustering operation %s",
+              otherOperation, thisOperation));
+    }
+    return super.resolveConflict(table, thisOperation, otherOperation);
+  }
+
+  private boolean isRequestedIngestionInstant(ConcurrentOperation operation) {
+    String state = operation.getInstantActionState();
+    String actionType = operation.getInstantActionType();
+    return HoodieInstant.State.REQUESTED.name().equals(state)
+        && (COMMIT_ACTION.equals(actionType) || 
DELTA_COMMIT_ACTION.equals(actionType)
+            || (REPLACE_COMMIT_ACTION.equals(actionType) && 
!WriteOperationType.CLUSTER.equals(operation.getOperationType())));
+  }
+
   @Override
   public boolean isPreCommitRequired() {
     return true;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index e8cf8e7f9d28..6b5ac8c575aa 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -81,7 +81,7 @@ public class TransactionUtils {
       Option<HoodieSchema> newTableSchema = 
resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, 
currentTxnOwnerInstant);
 
       Stream<HoodieInstant> instantStream = 
Stream.concat(resolutionStrategy.getCandidateInstants(
-              table.getMetaClient(), currentTxnOwnerInstant.get(), 
lastCompletedTxnOwnerInstant),
+              table.getMetaClient(), currentTxnOwnerInstant.get(), 
lastCompletedTxnOwnerInstant, Option.of(config)),
           completedInstantsDuringCurrentWriteOperation);
 
       final ConcurrentOperation thisOperation = new 
ConcurrentOperation(currentTxnOwnerInstant.get(), 
thisCommitMetadata.orElseGet(HoodieCommitMetadata::new));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d68e3e6a94a7..2486a398292a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -675,6 +675,16 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted.");
 
+  public static final ConfigProperty<Boolean> 
CLUSTERING_BLOCK_FOR_PENDING_INGESTION = ConfigProperty
+      
.key("hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution")
+      .defaultValue(false)
+      .markAdvanced()
+      .withDocumentation("Only applicable when 
\"hoodie.write.concurrency.mode\" is set to OCC or NBCC and the conflict "
+          + "resolution strategy 
(\"hoodie.write.conflict.resolution.strategy\") is set to "
+          + "PreferWriterConflictResolutionStrategy. When enabled, proactively 
prevents clustering from committing if "
+          + "there are any ongoing ingestion writes that have not transitioned 
from requested to inflight yet and have "
+          + "an active heartbeat, since ingestion may be targeting the same 
files and should have precedence.");
+
   public static final ConfigProperty<String> WRITE_CONCURRENCY_MODE = 
ConfigProperty
       .key("hoodie.write.concurrency.mode")
       .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
@@ -2665,6 +2675,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES);
   }
 
+  public boolean isClusteringBlockForPendingIngestion() {
+    return getBooleanOrDefault(CLUSTERING_BLOCK_FOR_PENDING_INGESTION);
+  }
+
   /**
    * File listing metadata configs.
    */
@@ -3455,6 +3469,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withClusteringBlockForPendingIngestion(boolean enable) {
+      writeConfig.setValue(CLUSTERING_BLOCK_FOR_PENDING_INGESTION, 
String.valueOf(enable));
+      return this;
+    }
+
     public Builder withWriteConcurrencyMode(WriteConcurrencyMode 
concurrencyMode) {
       writeConfig.setValue(WRITE_CONCURRENCY_MODE, concurrencyMode.name());
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 3fa85d5c7878..0decbe290c3d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.storage.HoodieStorage;
 
 import com.codahale.metrics.Counter;
@@ -116,6 +117,10 @@ public class HoodieMetrics {
   private String conflictResolutionTimerName = null;
   private String conflictResolutionSuccessCounterName = null;
   private String conflictResolutionFailureCounterName = null;
+  private String conflictResolutionIngestionVsIngestionCounterName = null;
+  private String conflictResolutionIngestionVsTableServiceCounterName = null;
+  private String conflictResolutionTableServiceVsIngestionCounterName = null;
+  private String conflictResolutionTableServiceVsTableServiceCounterName = 
null;
   private String compactionRequestedCounterName = null;
   private String compactionCompletedCounterName = null;
   private String rollbackFailureCounterName = null;
@@ -135,6 +140,10 @@ public class HoodieMetrics {
   private Timer conflictResolutionTimer = null;
   private Counter conflictResolutionSuccessCounter = null;
   private Counter conflictResolutionFailureCounter = null;
+  private Counter conflictResolutionIngestionVsIngestionCounter = null;
+  private Counter conflictResolutionIngestionVsTableServiceCounter = null;
+  private Counter conflictResolutionTableServiceVsIngestionCounter = null;
+  private Counter conflictResolutionTableServiceVsTableServiceCounter = null;
   private Counter compactionRequestedCounter = null;
   private Counter compactionCompletedCounter = null;
   private Counter rollbackFailureCounter = null;
@@ -158,6 +167,10 @@ public class HoodieMetrics {
       this.conflictResolutionTimerName = 
getMetricsName(CONFLICT_RESOLUTION_STR, TIMER_METRIC);
       this.conflictResolutionSuccessCounterName = 
getMetricsName(CONFLICT_RESOLUTION_STR, SUCCESS_COUNTER);
       this.conflictResolutionFailureCounterName = 
getMetricsName(CONFLICT_RESOLUTION_STR, FAILURE_COUNTER);
+      this.conflictResolutionIngestionVsIngestionCounterName = 
getMetricsName(CONFLICT_RESOLUTION_STR, "ingestion_vs_ingestion" + 
COUNTER_METRIC_EXTENSION);
+      this.conflictResolutionIngestionVsTableServiceCounterName = 
getMetricsName(CONFLICT_RESOLUTION_STR, "ingestion_vs_table_service" + 
COUNTER_METRIC_EXTENSION);
+      this.conflictResolutionTableServiceVsIngestionCounterName = 
getMetricsName(CONFLICT_RESOLUTION_STR, "table_service_vs_ingestion" + 
COUNTER_METRIC_EXTENSION);
+      this.conflictResolutionTableServiceVsTableServiceCounterName = 
getMetricsName(CONFLICT_RESOLUTION_STR, "table_service_vs_table_service" + 
COUNTER_METRIC_EXTENSION);
       this.compactionRequestedCounterName = 
getMetricsName(HoodieTimeline.COMPACTION_ACTION, 
HoodieTimeline.REQUESTED_COMPACTION_SUFFIX + COUNTER_METRIC_EXTENSION);
       this.compactionCompletedCounterName = 
getMetricsName(HoodieTimeline.COMPACTION_ACTION, 
HoodieTimeline.COMPLETED_COMPACTION_SUFFIX + COUNTER_METRIC_EXTENSION);
       this.rollbackFailureCounterName = getMetricsName("rollback", 
FAILURE_COUNTER);
@@ -548,6 +561,35 @@ public class HoodieMetrics {
     }
   }
 
+  public void 
emitConflictResolutionByCategory(HoodieWriteConflictException.ConflictCategory 
category) {
+    if (config.isLockingMetricsEnabled()) {
+      switch (category) {
+        case INGESTION_VS_INGESTION:
+          conflictResolutionIngestionVsIngestionCounter = getCounter(
+              conflictResolutionIngestionVsIngestionCounter, 
conflictResolutionIngestionVsIngestionCounterName);
+          conflictResolutionIngestionVsIngestionCounter.inc();
+          break;
+        case INGESTION_VS_TABLE_SERVICE:
+          conflictResolutionIngestionVsTableServiceCounter = getCounter(
+              conflictResolutionIngestionVsTableServiceCounter, 
conflictResolutionIngestionVsTableServiceCounterName);
+          conflictResolutionIngestionVsTableServiceCounter.inc();
+          break;
+        case TABLE_SERVICE_VS_INGESTION:
+          conflictResolutionTableServiceVsIngestionCounter = getCounter(
+              conflictResolutionTableServiceVsIngestionCounter, 
conflictResolutionTableServiceVsIngestionCounterName);
+          conflictResolutionTableServiceVsIngestionCounter.inc();
+          break;
+        case TABLE_SERVICE_VS_TABLE_SERVICE:
+          conflictResolutionTableServiceVsTableServiceCounter = getCounter(
+              conflictResolutionTableServiceVsTableServiceCounter, 
conflictResolutionTableServiceVsTableServiceCounterName);
+          conflictResolutionTableServiceVsTableServiceCounter.inc();
+          break;
+        default:
+          break;
+      }
+    }
+  }
+
   public void emitCompactionRequested() {
     if (config.isMetricsOn()) {
       compactionRequestedCounter = getCounter(compactionRequestedCounter, 
compactionRequestedCounterName);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index 80561f74e14d..f59490c77aef 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -19,13 +19,17 @@
 package org.apache.hudi.client.transaction;
 
 import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 
 import org.junit.jupiter.api.Assertions;
@@ -251,4 +255,309 @@ public class TestPreferWriterConflictResolutionStrategy 
extends HoodieCommonTest
       // expected
     }
   }
+
+  /**
+   * Confirms that when {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+   * is enabled, clustering will detect a conflict with an ingestion 
.requested instant
+   * that has an active heartbeat, via hasConflict/resolveConflict.
+   */
+  @Test
+  public void 
testClusterConflictingWithIngestionRequestedInstantWithActiveHeartbeat() throws 
Exception {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withClusteringBlockForPendingIngestion(true)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withHeartbeatTolerableMisses(2)
+        .build();
+
+    createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // clustering gets scheduled and goes inflight
+    String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+    createClusterRequested(currentWriterInstant, metaClient);
+    createClusterInflight(currentWriterInstant, metaClient);
+
+    // ingestion writer creates a .requested instant with active heartbeat
+    String activeIngestionInstantTime = 
WriteClientTestUtils.createNewInstantTime();
+    
HoodieTestTable.of(metaClient).addRequestedCommit(activeIngestionInstantTime);
+    HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+        metaClient.getStorage(), metaClient.getBasePath().toString(),
+        (long) (1000 * 60), 5);
+    heartbeatClient.start(activeIngestionInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+
+    try {
+      List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+          metaClient, currentInstant.get(), lastSuccessfulInstant, 
Option.of(writeConfig))
+          .collect(Collectors.toList());
+      // The .requested instant with active heartbeat should be returned as a 
candidate
+      Assertions.assertEquals(1, candidateInstants.size());
+      Assertions.assertEquals(activeIngestionInstantTime, 
candidateInstants.get(0).requestedTime());
+
+      HoodieReplaceCommitMetadata clusteringMetadata = new 
HoodieReplaceCommitMetadata();
+      clusteringMetadata.setOperationType(WriteOperationType.CLUSTER);
+      ConcurrentOperation thisOperation = new 
ConcurrentOperation(currentInstant.get(), clusteringMetadata);
+      ConcurrentOperation otherOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
+
+      // hasConflict should detect the conflict
+      Assertions.assertTrue(strategy.hasConflict(thisOperation, 
otherOperation));
+
+      // resolveConflict should throw with TABLE_SERVICE_VS_INGESTION category
+      HoodieWriteConflictException thrown = Assertions.assertThrows(
+          HoodieWriteConflictException.class,
+          () -> strategy.resolveConflict(null, thisOperation, otherOperation));
+      Assertions.assertTrue(thrown.getCategory().isPresent());
+      
Assertions.assertEquals(HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION,
+          thrown.getCategory().get());
+    } finally {
+      heartbeatClient.stop(activeIngestionInstantTime);
+      heartbeatClient.close();
+    }
+  }
+
+  /**
+   * Confirms that clustering does NOT fail for pending ingestion .requested 
instants
+   * when {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+   * is disabled (default behavior).
+   */
+  @Test
+  public void testClusterDoesNotBlockWithoutConfigEnabled() throws Exception {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withClusteringBlockForPendingIngestion(false)
+        .build();
+
+    createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // clustering gets scheduled and goes inflight
+    String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+    createClusterRequested(currentWriterInstant, metaClient);
+    createClusterInflight(currentWriterInstant, metaClient);
+
+    // ingestion writer creates a .requested instant with active heartbeat
+    String ingestionInstantTime = WriteClientTestUtils.createNewInstantTime();
+    HoodieTestTable.of(metaClient).addRequestedCommit(ingestionInstantTime);
+    HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+        metaClient.getStorage(), metaClient.getBasePath().toString(),
+        (long) (1000 * 60), 5);
+    heartbeatClient.start(ingestionInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+
+    // With 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution disabled,
+    // clustering should NOT fail even though there's an active heartbeat
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+        metaClient, currentInstant.get(), lastSuccessfulInstant, 
Option.of(writeConfig))
+        .collect(Collectors.toList());
+    Assertions.assertEquals(0, candidateInstants.size());
+
+    heartbeatClient.stop(ingestionInstantTime);
+    heartbeatClient.close();
+  }
+
+  /**
+   * Confirms that when getCandidateInstants is called without a write config,
+   * it delegates properly and uses defaults
+   * ({@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+   * disabled by default).
+   */
+  @Test
+  public void testClusterOldMethodDoesNotBlockByDefault() throws Exception {
+    createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // clustering gets scheduled and goes inflight
+    String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+    createClusterRequested(currentWriterInstant, metaClient);
+    createClusterInflight(currentWriterInstant, metaClient);
+
+    // ingestion writer creates a .requested instant with active heartbeat
+    String ingestionInstantTime = WriteClientTestUtils.createNewInstantTime();
+    HoodieTestTable.of(metaClient).addRequestedCommit(ingestionInstantTime);
+    HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+        metaClient.getStorage(), metaClient.getBasePath().toString(),
+        (long) (1000 * 60), 5);
+    heartbeatClient.start(ingestionInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+
+    // Without write config, should NOT throw since the default is
+    // hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution 
= false
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+        metaClient, currentInstant.get(), lastSuccessfulInstant)
+        .collect(Collectors.toList());
+    Assertions.assertEquals(0, candidateInstants.size());
+
+    heartbeatClient.stop(ingestionInstantTime);
+    heartbeatClient.close();
+  }
+
+  /**
+   * Confirms that when {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+   * is enabled and there is an inflight ingestion instant, it is returned as 
a candidate
+   * (exercises the i.isInflight() return path in the filter).
+   */
+  @Test
+  public void testClusterWithBlockingEnabledAndInflightIngestion() throws 
Exception {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withClusteringBlockForPendingIngestion(true)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withHeartbeatTolerableMisses(2)
+        .build();
+
+    createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // clustering gets scheduled and goes inflight
+    String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+    createClusterRequested(currentWriterInstant, metaClient);
+    createClusterInflight(currentWriterInstant, metaClient);
+
+    // ingestion writer creates an inflight commit
+    String ingestionInstantTime = WriteClientTestUtils.createNewInstantTime();
+    createInflightCommit(ingestionInstantTime, metaClient);
+
+    Option<HoodieInstant> currentInstant = Option.of(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+        metaClient, currentInstant.get(), lastSuccessfulInstant, 
Option.of(writeConfig))
+        .collect(Collectors.toList());
+    // The inflight ingestion instant should be returned as a candidate
+    Assertions.assertEquals(1, candidateInstants.size());
+    Assertions.assertEquals(ingestionInstantTime, 
candidateInstants.get(0).requestedTime());
+  }
+
+  /**
+   * Confirms that when {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution}
+   * is enabled and there is both an inflight and an expired-heartbeat 
requested ingestion instant,
+   * only the inflight is returned as a candidate.
+   */
+  @Test
+  public void testClusterWithBlockingEnabledInflightAndExpiredRequested() 
throws Exception {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withClusteringBlockForPendingIngestion(true)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withHeartbeatTolerableMisses(2)
+        .build();
+
+    createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // clustering gets scheduled and goes inflight
+    String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+    createClusterRequested(currentWriterInstant, metaClient);
+    createClusterInflight(currentWriterInstant, metaClient);
+
+    // expired requested ingestion instant (no heartbeat)
+    String expiredInstantTime = WriteClientTestUtils.createNewInstantTime();
+    HoodieTestTable.of(metaClient).addRequestedCommit(expiredInstantTime);
+
+    // active inflight ingestion instant
+    String inflightInstantTime = WriteClientTestUtils.createNewInstantTime();
+    createInflightCommit(inflightInstantTime, metaClient);
+
+    Option<HoodieInstant> currentInstant = Option.of(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+        metaClient, currentInstant.get(), lastSuccessfulInstant, 
Option.of(writeConfig))
+        .collect(Collectors.toList());
+    // Only the inflight should be returned; expired requested should be 
filtered out
+    Assertions.assertEquals(1, candidateInstants.size());
+    Assertions.assertEquals(inflightInstantTime, 
candidateInstants.get(0).requestedTime());
+  }
+
+  /**
+   * Confirms that when the .requested instant has an expired heartbeat (no 
heartbeat file),
+   * clustering does NOT treat it as a conflict even when
+   * {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution} is 
enabled.
+   */
+  @Test
+  public void testClusterWithBlockingEnabledAndExpiredHeartbeatRequested() 
throws Exception {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withClusteringBlockForPendingIngestion(true)
+        .withHeartbeatIntervalInMs(60 * 1000)
+        .withHeartbeatTolerableMisses(2)
+        .build();
+
+    createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // clustering gets scheduled and goes inflight
+    String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+    createClusterRequested(currentWriterInstant, metaClient);
+    createClusterInflight(currentWriterInstant, metaClient);
+
+    // ingestion writer creates a .requested instant but never starts a 
heartbeat (simulates expired/dead writer)
+    String expiredIngestionInstantTime = 
WriteClientTestUtils.createNewInstantTime();
+    
HoodieTestTable.of(metaClient).addRequestedCommit(expiredIngestionInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.CLUSTERING_ACTION, currentWriterInstant));
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+
+    // The .requested instant with expired heartbeat should be filtered out
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+        metaClient, currentInstant.get(), lastSuccessfulInstant, 
Option.of(writeConfig))
+        .collect(Collectors.toList());
+    Assertions.assertEquals(0, candidateInstants.size());
+  }
+
+  /**
+   * Confirms that compaction (non-clustering table service) when write config 
is provided
+   * still picks up inflight ingestion instants as candidates.
+   */
+  @Test
+  public void testCompactionWithInflightIngestionViaNewOverload() throws 
Exception {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withClusteringBlockForPendingIngestion(true)
+        .build();
+
+    createCommit(WriteClientTestUtils.createNewInstantTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+    // writer 1 starts (inflight ingestion)
+    String currentWriterInstant = WriteClientTestUtils.createNewInstantTime();
+    createInflightCommit(currentWriterInstant, metaClient);
+
+    // compaction gets scheduled
+    String compactionInstantTime = WriteClientTestUtils.createNewInstantTime();
+    createCompactionRequested(compactionInstantTime, metaClient);
+
+    Option<HoodieInstant> currentInstant = Option.of(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime));
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
+
+    // Compaction is not clustering, so .requested instants are not included 
even when
+    // hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution 
is enabled
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(
+        metaClient, currentInstant.get(), lastSuccessfulInstant, 
Option.of(writeConfig))
+        .collect(Collectors.toList());
+    Assertions.assertEquals(1, candidateInstants.size());
+    Assertions.assertEquals(currentWriterInstant, 
candidateInstants.get(0).requestedTime());
+  }
+
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index f28f80672e1b..b785d9025288 100755
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 
 import com.codahale.metrics.Timer;
@@ -354,4 +355,38 @@ public class TestHoodieMetrics {
     // Verify the original exception type counter is unchanged
     assertEquals(2, 
metrics.getRegistry().getCounters().get(exceptionMetricName).getCount());
   }
+
+  @Test
+  public void testConflictResolutionByCategoryMetrics() {
+    when(writeConfig.isLockingMetricsEnabled()).thenReturn(true);
+
+    String tableServiceVsIngestion = hoodieMetrics.getMetricsName(
+        HoodieMetrics.CONFLICT_RESOLUTION_STR, "table_service_vs_ingestion" + 
COUNTER_METRIC_EXTENSION);
+    String ingestionVsIngestion = hoodieMetrics.getMetricsName(
+        HoodieMetrics.CONFLICT_RESOLUTION_STR, "ingestion_vs_ingestion" + 
COUNTER_METRIC_EXTENSION);
+    String ingestionVsTableService = hoodieMetrics.getMetricsName(
+        HoodieMetrics.CONFLICT_RESOLUTION_STR, "ingestion_vs_table_service" + 
COUNTER_METRIC_EXTENSION);
+    String tableServiceVsTableService = hoodieMetrics.getMetricsName(
+        HoodieMetrics.CONFLICT_RESOLUTION_STR, 
"table_service_vs_table_service" + COUNTER_METRIC_EXTENSION);
+
+    hoodieMetrics.emitConflictResolutionByCategory(
+        
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION);
+    assertEquals(1, 
metrics.getRegistry().getCounters().get(tableServiceVsIngestion).getCount());
+
+    hoodieMetrics.emitConflictResolutionByCategory(
+        
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION);
+    assertEquals(2, 
metrics.getRegistry().getCounters().get(tableServiceVsIngestion).getCount());
+
+    hoodieMetrics.emitConflictResolutionByCategory(
+        HoodieWriteConflictException.ConflictCategory.INGESTION_VS_INGESTION);
+    assertEquals(1, 
metrics.getRegistry().getCounters().get(ingestionVsIngestion).getCount());
+
+    hoodieMetrics.emitConflictResolutionByCategory(
+        
HoodieWriteConflictException.ConflictCategory.INGESTION_VS_TABLE_SERVICE);
+    assertEquals(1, 
metrics.getRegistry().getCounters().get(ingestionVsTableService).getCount());
+
+    hoodieMetrics.emitConflictResolutionByCategory(
+        
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_TABLE_SERVICE);
+    assertEquals(1, 
metrics.getRegistry().getCounters().get(tableServiceVsTableService).getCount());
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
index f0f6dcbf0ab1..f2685e7854dc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieWriteConflictException.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.exception;
 
+import org.apache.hudi.common.util.Option;
+
 /**
  * <p>
  * Exception thrown for Hoodie failures. The root of the exception hierarchy.
@@ -29,15 +31,44 @@ package org.apache.hudi.exception;
  */
 public class HoodieWriteConflictException extends HoodieException {
 
+  /**
+   * Categorizes the two sides of a write conflict for metrics and diagnostics.
+   */
+  public enum ConflictCategory {
+    INGESTION_VS_INGESTION,
+    INGESTION_VS_TABLE_SERVICE,
+    TABLE_SERVICE_VS_INGESTION,
+    TABLE_SERVICE_VS_TABLE_SERVICE
+  }
+
+  private final Option<ConflictCategory> category;
+
   public HoodieWriteConflictException(String msg) {
     super(msg);
+    this.category = Option.empty();
   }
 
   public HoodieWriteConflictException(Throwable e) {
     super(e);
+    this.category = Option.empty();
   }
 
   public HoodieWriteConflictException(String msg, Throwable e) {
     super(msg, e);
+    this.category = Option.empty();
+  }
+
+  public HoodieWriteConflictException(ConflictCategory category, String msg) {
+    super(msg);
+    this.category = Option.of(category);
+  }
+
+  public HoodieWriteConflictException(ConflictCategory category, String msg, 
Throwable e) {
+    super(msg, e);
+    this.category = Option.of(category);
+  }
+
+  public Option<ConflictCategory> getCategory() {
+    return category;
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 83ccf54b5079..fcb08a2e2706 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
 import 
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
@@ -62,6 +63,7 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
@@ -1893,6 +1895,66 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     return clusteringInstant;
   }
 
+  /**
+   * Verifies that clustering fails with {@link HoodieWriteConflictException} 
when
+   * {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution} is 
enabled
+   * and an ingestion .requested instant with an active heartbeat exists.
+   */
+  @Test
+  public void testClusteringFailsOnPendingIngestionRequestedInstant() throws 
Exception {
+    Properties properties = getDisabledRowWriterProperties();
+    properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
+    HoodieCleanConfig cleanConfig = 
createCleanConfig(HoodieFailedWritesCleaningPolicy.LAZY, false);
+
+    // Insert base data with a regular ingestion writer
+    HoodieWriteConfig insertWriteConfig = getConfigBuilder()
+        .withCleanConfig(cleanConfig)
+        .withLockConfig(createLockConfig(new 
PreferWriterConflictResolutionStrategy()))
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withProperties(properties)
+        .build();
+    SparkRDDWriteClient client = getHoodieWriteClient(insertWriteConfig);
+
+    int numRecords = 200;
+    String firstCommit = WriteClientTestUtils.createNewInstantTime();
+    String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionStr});
+    writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), 
"000",
+        numRecords, dataGenerator::generateInserts, 
SparkRDDWriteClient::insert, true, numRecords, numRecords,
+        1, INSTANT_GENERATOR);
+
+    // Simulate an ingestion writer that has created a .requested commit with 
an active heartbeat
+    String ingestionRequestedTime = 
WriteClientTestUtils.createNewInstantTime();
+    HoodieTestTable.of(metaClient).addRequestedCommit(ingestionRequestedTime);
+    HoodieHeartbeatClient heartbeatClient = new HoodieHeartbeatClient(
+        metaClient.getStorage(), metaClient.getBasePath().toString(),
+        (long) (1000 * 60), 5);
+    heartbeatClient.start(ingestionRequestedTime);
+
+    try {
+      // Schedule and execute clustering with blocking enabled — should fail 
during conflict
+      // resolution because it detects the active ingestion .requested instant
+      HoodieWriteConfig clusteringWriteConfig = getConfigBuilder()
+          .withCleanConfig(cleanConfig)
+          .withClusteringConfig(createClusteringBuilder(true, 1).build())
+          
.withPreCommitValidatorConfig(createPreCommitValidatorConfig(numRecords))
+          .withLockConfig(createLockConfig(new 
PreferWriterConflictResolutionStrategy()))
+          
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+          .withClusteringBlockForPendingIngestion(true)
+          .withProperties(properties)
+          .build();
+
+      SparkRDDWriteClient<?> clusteringWriteClient = 
getHoodieWriteClient(clusteringWriteConfig);
+      String clusteringCommitTime = 
clusteringWriteClient.scheduleClustering(Option.empty()).get();
+      HoodieClusteringException exception = 
assertThrows(HoodieClusteringException.class,
+          () -> clusteringWriteClient.cluster(clusteringCommitTime, true));
+      assertTrue(exception.getCause() instanceof HoodieWriteConflictException);
+    } finally {
+      heartbeatClient.stop(ingestionRequestedTime);
+      heartbeatClient.close();
+    }
+  }
+
   public static class FailingPreCommitValidator<T extends HoodieRecordPayload, 
I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, 
K, O> {
 
     public FailingPreCommitValidator(HoodieSparkTable table, 
HoodieEngineContext context, HoodieWriteConfig config) {

Reply via email to