Zouxxyy commented on code in PR #9233:
URL: https://github.com/apache/hudi/pull/9233#discussion_r1271265718


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java:
##########
@@ -18,65 +18,144 @@
 
 package org.apache.hudi.client.transaction;
 
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
-import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.bucket.BucketIdentifier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.model.WriteOperationType.CLUSTER;
+import static org.apache.hudi.common.model.WriteOperationType.LOG_COMPACT;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
- * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}.
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy}
+ * based on start_timestamp, and need to record the pending instants before 
write.
  */
 public class SimpleConcurrentFileWritesConflictResolutionStrategy
     implements ConflictResolutionStrategy {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
 
+  private final HoodieWriteConfig writeConfig;
+
+  public 
SimpleConcurrentFileWritesConflictResolutionStrategy(HoodieWriteConfig 
writeConfig) {
+    this.writeConfig = writeConfig;
+  }
+
   @Override
   public Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient 
metaClient, HoodieInstant currentInstant,
-                                                    Option<HoodieInstant> 
lastSuccessfulInstant) {
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-    // To find which instants are conflicting, we apply the following logic
-    // 1. Get completed instants timeline only for commits that have happened 
since the last successful write.
-    // 2. Get any scheduled or completed compaction or clustering operations 
that have started and/or finished
-    // after the current instant. We need to check for write conflicts since 
they may have mutated the same files
-    // that are being newly created by the current write.
-    Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
-        .getCommitsTimeline()
+                                                    Option<Set<String>> 
pendingInstantsBeforeWritten) {
+    // Divide the process of obtaining candidate instants into two steps:
+    //   1) get completed instants after current instant starts
+    //   2) pick from instants that are still pending
+    return Stream.concat(getCompletedInstantsAfterCurrent(metaClient, 
currentInstant, pendingInstantsBeforeWritten),
+        pickFromCurrentPendingInstants(metaClient, currentInstant));
+  }
+
+  /**
+   * Get completed instants after current instant starts based on 
start_timestamp:
+   *   1) pick start_timestamp > current start_timestamp && have completed 
instants
+   *   2) pending instants that were recorded before write, and have been 
converted to completed
+   *
+   * @param metaClient meta client
+   * @param currentInstant current instant
+   * @param pendingInstantsBeforeWrite pending instant recorded before write
+   * @return selected {@link HoodieInstant} stream
+   */
+  public Stream<HoodieInstant> 
getCompletedInstantsAfterCurrent(HoodieTableMetaClient metaClient, 
HoodieInstant currentInstant,
+                                                                
Option<Set<String>> pendingInstantsBeforeWrite) {
+    return metaClient.getActiveTimeline()
+        .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION))
         .filterCompletedInstants()
-        .findInstantsAfter(lastSuccessfulInstant.isPresent() ? 
lastSuccessfulInstant.get().getTimestamp() : HoodieTimeline.INIT_INSTANT_TS)
+        .filter(i -> compareTimestamps(i.getTimestamp(), GREATER_THAN, 
currentInstant.getTimestamp())
+            || (pendingInstantsBeforeWrite.isPresent() && 
pendingInstantsBeforeWrite.get().contains(i.getTimestamp())))
         .getInstantsAsStream();
+  }
+
+  /**
+   * Pick from instants that are still pending, need to compare the priority 
of the two operations.
+   *
+   * @param metaClient meta client
+   * @param currentInstant current instant
+   * @return selected {@link HoodieInstant} stream
+   */
+  public Stream<HoodieInstant> 
pickFromCurrentPendingInstants(HoodieTableMetaClient metaClient, HoodieInstant 
currentInstant) {
+
+    // Whether to pick the pending instant to candidate instants:
+    // 
+-----------------+----------------------------------------+---------------------------------------+------------+
+    // | current\pending | ingestion                              |  
clustering                           | compaction |
+    // 
|-----------------+----------------------------------------+---------------------------------------+------------+
+    // | ingestion       | no                                     | no if 
#isIngestionPrimaryClustering() | yes        |
+    // 
|-----------------+----------------------------------------+---------------------------------------+------------+
+    // | clustering      | yes if #isIngestionPrimaryClustering() | no         
                           | no         |
+    // 
+-----------------+----------------------------------------+---------------------------------------+------------+
+
+    Set<String> actionsToPick = new HashSet<>();
+
+    if (REPLACE_COMMIT_ACTION.equals(currentInstant.getAction())
+        && ClusteringUtils.isPendingClusteringInstant(metaClient, 
currentInstant)) {
+      if (isIngestionPrimaryClustering()) {
+        actionsToPick.add(COMMIT_ACTION);
+        actionsToPick.add(DELTA_COMMIT_ACTION);
+      }
+    } else {
+      if (!isIngestionPrimaryClustering()) {
+        actionsToPick.add(REPLACE_COMMIT_ACTION);
+      }
+      actionsToPick.add(COMPACTION_ACTION);

Review Comment:
   Here I just keep the current implementation, because log compaction has pre 
commit now, hope to get more opinions @suryaprasanna



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to