kbuci commented on code in PR #18280:
URL: https://github.com/apache/hudi/pull/18280#discussion_r2921918569


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java:
##########
@@ -78,35 +92,102 @@ private Stream<HoodieInstant> 
getCandidateInstantsForNonTableServicesCommits(Hoo
   }
 
   /**
-   * To find which instants are conflicting, we apply the following logic
-   * Get both completed instants and ingestion inflight commits that have 
happened since the last successful write.
-   * We need to check for write conflicts since they may have mutated the same 
files
-   * that are being newly created by the current write.
+   * Returns candidate instants for table service commits (clustering or 
compaction).
+   * Includes both completed instants and ingestion inflight commits that have 
happened
+   * since the current write started.
+   *
+   * <p>If the current write is clustering and
+   * {@code 
hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution} is 
enabled,
+   * also includes ingestion {@code .requested} instants (filtering out those 
with expired heartbeats)
+   * so they can be evaluated by {@link #hasConflict} and {@link 
#resolveConflict}.</p>
    */
-  private Stream<HoodieInstant> 
getCandidateInstantsForTableServicesCommits(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant) {
-    // Fetch list of completed commits.
+  private Stream<HoodieInstant> getCandidateInstantsForTableServicesCommits(
+      HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant,
+      boolean isCurrentOperationClustering, HoodieTableMetaClient metaClient,
+      Option<HoodieWriteConfig> writeConfigOpt) {
+
     Stream<HoodieInstant> completedCommitsStream =
         activeTimeline
             .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION))
             .filterCompletedInstants()
             
.findInstantsModifiedAfterByCompletionTime(currentInstant.requestedTime())
             .getInstantsAsStream();
 
-    // Fetch list of ingestion inflight commits.
-    Stream<HoodieInstant> inflightIngestionCommitsStream =
-        activeTimeline
-            .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION))
-            .filterInflights()
-            .getInstantsAsStream();
+    boolean blockForPendingIngestion = writeConfigOpt.isPresent()
+        && writeConfigOpt.get().isClusteringBlockForPendingIngestion();
+
+    Stream<HoodieInstant> inflightIngestionCommitsStream;
+    if (isCurrentOperationClustering && blockForPendingIngestion) {
+      HoodieWriteConfig writeConfig = writeConfigOpt.get();
+      long maxHeartbeatIntervalMs = 
writeConfig.getHoodieClientHeartbeatIntervalInMs()
+          * (writeConfig.getHoodieClientHeartbeatTolerableMisses() + 1);
+      inflightIngestionCommitsStream = activeTimeline
+          .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
+          .filterInflightsAndRequested()
+          .getInstantsAsStream()
+          .filter(i -> !ClusteringUtils.isClusteringInstant(activeTimeline, i, 
metaClient.getInstantGenerator()))
+          .filter(i -> {
+            if (i.isRequested()) {
+              try {
+                return 
!HoodieHeartbeatUtils.isHeartbeatExpired(i.requestedTime(), 
maxHeartbeatIntervalMs,
+                    metaClient.getStorage(), 
metaClient.getBasePath().toString());
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+            return i.isInflight();
+          });
+    } else {
+      inflightIngestionCommitsStream = activeTimeline
+          .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
+          .filterInflights()
+          .getInstantsAsStream();
+    }
 
-    // Merge and sort the instants and return.
     List<HoodieInstant> instantsToConsider = 
Stream.concat(completedCommitsStream, inflightIngestionCommitsStream)
-        .sorted(Comparator.comparing(o -> o.getCompletionTime()))
+        .sorted(Comparator.comparing(HoodieInstant::getCompletionTime, 
Comparator.nullsLast(Comparator.naturalOrder())))
         .collect(Collectors.toList());
     log.info("Instants that may have conflict with {} are {}", currentInstant, 
instantsToConsider);
     return instantsToConsider.stream();
   }
 
+  @Override
+  public boolean hasConflict(ConcurrentOperation thisOperation, 
ConcurrentOperation otherOperation) {
+    // A .requested ingestion instant only appears as a candidate when
+    // getCandidateInstantsForTableServicesCommits determined that blocking is
+    // enabled and the heartbeat is still active, so no additional config check
+    // is needed here.
+    if (thisOperation.getOperationType() == WriteOperationType.CLUSTER
+        && isRequestedIngestionInstant(otherOperation)) {
+      log.info("Clustering operation {} conflicts with pending ingestion 
instant {} "
+          + "that has an active heartbeat", thisOperation, otherOperation);
+      return true;
+    }
+    return super.hasConflict(thisOperation, otherOperation);
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
+      ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
+    if (thisOperation.getOperationType() == WriteOperationType.CLUSTER
+        && isRequestedIngestionInstant(otherOperation)) {
+      throw new HoodieWriteConflictException(
+          
HoodieWriteConflictException.ConflictCategory.TABLE_SERVICE_VS_INGESTION,
+          String.format("Pending ingestion instant %s with active heartbeat 
has not transitioned to "
+              + "inflight yet but may potentially conflict with current 
clustering operation %s",
+              otherOperation, thisOperation));
+    }
+    return super.resolveConflict(table, thisOperation, otherOperation);
+  }
+
+  private boolean isRequestedIngestionInstant(ConcurrentOperation operation) {
+    String state = operation.getInstantActionState();
+    String actionType = operation.getInstantActionType();
+    return HoodieInstant.State.REQUESTED.name().equals(state)
+        && (COMMIT_ACTION.equals(actionType) || 
DELTA_COMMIT_ACTION.equals(actionType)
+            || (REPLACE_COMMIT_ACTION.equals(actionType) && 
operation.getOperationType() != WriteOperationType.CLUSTER));

Review Comment:
    I wanted to avoid making changes outside of Prefered write conflict 
strategy class, but sure that works too. Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to