zhangyue19921010 commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1064931339


##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +166,52 @@ public boolean doesMarkerDirExist(String markerDir) {
    * @param markerName marker name
    * @return the {@code CompletableFuture} instance for the request
    */
-  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName) {
+  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName, String basePath) {
+    // Step1 do early conflict detection if enable
+    if (timelineServiceConfig.earlyConflictDetectionEnable) {
+      try {
+        synchronized (earlyConflictDetectionLock) {
+          if (earlyConflictDetectionStrategy == null) {
+            earlyConflictDetectionStrategy = 
(HoodieTimelineServerBasedEarlyConflictDetectionStrategy) 
ReflectionUtils.loadClass(timelineServiceConfig.earlyConflictDetectionStrategy,
+                basePath, markerDir, markerName, 
timelineServiceConfig.checkCommitConflict);
+          }
+
+          // markerDir => $base_path/.hoodie/.temp/$instant_time
+          // If markerDir is changed like move to the next instant action, we 
need to fresh this earlyConflictDetectionStrategy.
+          // For specific instant related create marker action, we only call 
this check/fresh once
+          // instead of starting the conflict detector for every request
+          if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+            this.currentMarkerDir = markerDir;
+            Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+            Set<HoodieInstant> oldInstants = new HashSet<>(
+                viewManager.getFileSystemView(basePath)
+                    .getTimeline()
+                    .filterCompletedInstants()
+                    .filter(instant -> actions.contains(instant.getAction()))
+                    .getInstants());
+
+            
earlyConflictDetectionStrategy.fresh(timelineServiceConfig.asyncConflictDetectorBatchIntervalMs,
+                timelineServiceConfig.asyncConflictDetectorBatchPeriodMs, 
markerDir, basePath, timelineServiceConfig.maxAllowableHeartbeatIntervalInMs, 
fileSystem,
+                this, oldInstants);
+          }
+        }
+
+        earlyConflictDetectionStrategy.detectAndResolveConflictIfNecessary();
+
+      } catch (Exception ex) {

Review Comment:
   as for exception during early conflict detection, maybe any Exception need 
to catch and return false as marker creation result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to