zhangyue19921010 commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1028283595


##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
    * @param markerName marker name
    * @return the {@code CompletableFuture} instance for the request
    */
-  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName) {
+  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName,
+                                                String batchInterval, String 
period, String maxAllowableHeartbeatIntervalInMs,
+                                                String basePath, String 
earlyConflictDetectionEnable,
+                                                String 
earlyConflictDetectionClassName) {
+    // Step1 do early conflict detection if enable
+    if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+      try {
+        synchronized (earlyConflictDetectionLock) {
+          if (earlyConflictDetectionStrategy == null) {
+            earlyConflictDetectionStrategy = 
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+          }
+
+          if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+            this.currentMarkerDir = markerDir;
+            Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+            Set<HoodieInstant> oldInstants = 
viewManager.getFileSystemView(basePath)
+                .getTimeline()
+                .filterCompletedInstants()
+                .filter(instant -> actions.contains(instant.getAction()))
+                .getInstants()
+                .collect(Collectors.toSet());
+
+            earlyConflictDetectionStrategy.fresh(batchInterval, period, 
markerDir, basePath, maxAllowableHeartbeatIntervalInMs, fileSystem,
+                this, oldInstants);
+          }
+        }
+
+        if (earlyConflictDetectionStrategy.hasMarkerConflict()) {
+          earlyConflictDetectionStrategy.resolveMarkerConflict(basePath, 
markerDir, markerName);

Review Comment:
   > The timeline server should simply return false for the marker creation 
request
   
   Totally agree with it. 
   For now timeline server will return false for executor request and and 
executor will 
   ```
       if (success) {
         return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, 
partitionPath), markerFileName));
       } else {
         // this failed may due to early conflict detection, so we need to 
throw out.
         throw new HoodieEarlyConflictDetectionException(new 
ConcurrentModificationException("Early conflict detected but cannot resolve 
conflicts for overlapping writes"));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to