yihua commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1064249219


##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +220,89 @@ public static Set<String> readMarkersFromFile(Path 
markersFilePath, Serializable
       fsDataInputStream = fs.open(markersFilePath);
       markers = new 
HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      if (ignoreException) {
+        LOG.warn("IOException occurs during read MARKERS file, ", e);
+      } else {
+        throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      }
     } finally {
       closeQuietly(fsDataInputStream);
     }
     return markers;
   }
+
+  public static List<Path> getAllMarkerDir(Path tempPath, FileSystem fs) 
throws IOException {
+    return 
Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList());
+  }
+
+  public static boolean hasCommitConflict(HoodieActiveTimeline activeTimeline, 
Set<String> currentFileIDs, Set<HoodieInstant> completedCommitInstants) {
+
+    Set<HoodieInstant> currentInstants = new HashSet<>(
+        
activeTimeline.reload().getCommitsTimeline().filterCompletedInstants().getInstants());
+
+    currentInstants.removeAll(completedCommitInstants);
+    Set<String> missingFileIDs = currentInstants.stream().flatMap(instant -> {
+      try {
+        return 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class)
+            .getFileIdAndRelativePaths().keySet().stream();
+      } catch (Exception e) {
+        return Stream.empty();
+      }
+    }).collect(Collectors.toSet());
+    currentFileIDs.retainAll(missingFileIDs);
+    return !currentFileIDs.isEmpty();
+  }
+
+  /**
+   * Get Candidate Instant to do conflict checking:
+   * 1. Skip current writer related instant(currentInstantTime)
+   * 2. Skip all instants after currentInstantTime
+   * 3. Skip dead writers related instants based on heart-beat
+   * 4. Skip pending compaction instant (For now we don' do early conflict 
check with compact action)
+   *      Because we don't want to let pending compaction block common writer.
+   * @param instants
+   * @return
+   */
+  public static List<String> getCandidateInstants(HoodieActiveTimeline 
activeTimeline, List<Path> instants, String currentInstantTime,
+                                                  long 
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) {
+
+    HoodieActiveTimeline reloadActive = activeTimeline.reload();

Review Comment:
   Do we need to reload the active timeline when creating a marker file?  This 
can introduce non-trivial latency overhead.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -155,6 +160,20 @@ protected Option<Path> create(String partitionPath, String 
dataFileName, IOType
     return create(getMarkerPath(partitionPath, dataFileName, type), 
checkIfExists);
   }
 
+  @Override
+  public Option<Path> createWithEarlyConflictDetection(String partitionPath, 
String dataFileName, IOType type, boolean checkIfExists, Set<HoodieInstant> 
completedCommitInstants,
+                                                       HoodieWriteConfig 
config, String fileId, HoodieActiveTimeline activeTimeline) {
+
+    long maxAllowableHeartbeatIntervalInMs = 
config.getHoodieClientHeartbeatIntervalInMs() * 
config.getHoodieClientHeartbeatTolerableMisses();
+
+    HoodieDirectMarkerBasedEarlyConflictDetectionStrategy strategy =
+        (HoodieDirectMarkerBasedEarlyConflictDetectionStrategy) 
ReflectionUtils.loadClass(config.getEarlyConflictDetectionStrategyClassName(),

Review Comment:
   nit: we can think about loading the strategy class through reflection in a 
common place for reuse, instead of loading for every marker creation.



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +166,52 @@ public boolean doesMarkerDirExist(String markerDir) {
    * @param markerName marker name
    * @return the {@code CompletableFuture} instance for the request
    */
-  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName) {
+  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName, String basePath) {
+    // Step1 do early conflict detection if enable
+    if (timelineServiceConfig.earlyConflictDetectionEnable) {
+      try {
+        synchronized (earlyConflictDetectionLock) {
+          if (earlyConflictDetectionStrategy == null) {
+            earlyConflictDetectionStrategy = 
(HoodieTimelineServerBasedEarlyConflictDetectionStrategy) 
ReflectionUtils.loadClass(timelineServiceConfig.earlyConflictDetectionStrategy,
+                basePath, markerDir, markerName, 
timelineServiceConfig.checkCommitConflict);
+          }
+
+          // markerDir => $base_path/.hoodie/.temp/$instant_time
+          // If markerDir is changed like move to the next instant action, we 
need to fresh this earlyConflictDetectionStrategy.
+          // For specific instant related create marker action, we only call 
this check/fresh once
+          // instead of starting the conflict detector for every request
+          if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+            this.currentMarkerDir = markerDir;
+            Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+            Set<HoodieInstant> oldInstants = new HashSet<>(
+                viewManager.getFileSystemView(basePath)
+                    .getTimeline()
+                    .filterCompletedInstants()
+                    .filter(instant -> actions.contains(instant.getAction()))
+                    .getInstants());
+
+            
earlyConflictDetectionStrategy.fresh(timelineServiceConfig.asyncConflictDetectorBatchIntervalMs,
+                timelineServiceConfig.asyncConflictDetectorBatchPeriodMs, 
markerDir, basePath, timelineServiceConfig.maxAllowableHeartbeatIntervalInMs, 
fileSystem,
+                this, oldInstants);
+          }
+        }
+
+        earlyConflictDetectionStrategy.detectAndResolveConflictIfNecessary();
+
+      } catch (Exception ex) {

Review Comment:
   Let's catch `HoodieEarlyConflictDetectionException` separately for detected 
conflict and then `Exception` for other errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to