zhangyue19921010 commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1029454364


##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +219,116 @@ public static Set<String> readMarkersFromFile(Path 
markersFilePath, Serializable
       fsDataInputStream = fs.open(markersFilePath);
       markers = new 
HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      if (ignoreException) {
+        LOG.warn("IOException occurs during read MARKERS file, ", e);
+      } else {
+        throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      }
     } finally {
       closeQuietly(fsDataInputStream);
     }
     return markers;
   }
+
+  public static List<Path> getAllMarkerDir(Path tempPath, FileSystem fs) 
throws IOException {
+    return 
Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList());
+  }
+
+  public static boolean hasCommitConflict(HoodieActiveTimeline activeTimeline, 
Set<String> currentFileIDs, Set<HoodieInstant> completedCommitInstants) {
+
+    Set<HoodieInstant> currentInstants = 
activeTimeline.reload().getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+
+    currentInstants.removeAll(completedCommitInstants);
+    Set<String> missingFileIDs = currentInstants.stream().flatMap(instant -> {
+      try {
+        return 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class)
+            .getFileIdAndRelativePaths().keySet().stream();
+      } catch (Exception e) {
+        return Stream.empty();
+      }
+    }).collect(Collectors.toSet());
+    currentFileIDs.retainAll(missingFileIDs);
+    return !currentFileIDs.isEmpty();
+  }
+
+  /**
+   * Get Candidate Instant to do conflict checking:
+   * 1. Skip current writer related instant(currentInstantTime)
+   * 2. Skip all instants after currentInstantTime
+   * 3. Skip dead writers related instants based on heart-beat
+   * 4. Skip pending compaction instant (For now we don' do early conflict 
check with compact action)
+   *      Because we don't want to let pending compaction block common writer.
+   * @param instants
+   * @return
+   */
+  public static List<String> getCandidateInstants(HoodieActiveTimeline 
activeTimeline, List<Path> instants, String currentInstantTime,
+                                                  long 
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) {
+
+    HoodieActiveTimeline reloadActive = activeTimeline.reload();
+
+    return instants.stream().map(Path::toString).filter(instantPath -> {
+      String instantTime = markerDirToInstantTime(instantPath);
+      return instantTime.compareToIgnoreCase(currentInstantTime) < 0
+          && 
!reloadActive.filterPendingCompactionTimeline().containsInstant(instantTime)
+          && 
!reloadActive.filterPendingReplaceTimeline().containsInstant(instantTime);
+    }).filter(instantPath -> {
+      try {
+        return !isHeartbeatExpired(markerDirToInstantTime(instantPath), 
maxAllowableHeartbeatIntervalInMs, fs, basePath);
+      } catch (IOException e) {
+        return false;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  /**
+   * Get fileID from full marker path, for example:
+   * 
20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0_85-15-1390_20220620181735781.parquet.marker.MERGE
+   *    ==> get 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0
+   * @param marker
+   * @return
+   */
+  public static String makerToPartitionAndFileID(String marker) {
+    String[] ele = marker.split("_");
+    return ele[0];
+  }
+
+  /**
+   * Get instantTime from full marker path, for example:
+   * 
/var/folders/t3/th1dw75d0yz2x2k2qt6ys9zh0000gp/T/junit6502909693741900820/dataset/.hoodie/.temp/003
+   *    ==> 003
+   * @param marker
+   * @return
+   */
+  public static String markerDirToInstantTime(String marker) {
+    String[] ele = marker.split("/");
+    return ele[ele.length - 1];
+  }
+
+  /**
+   * Use modification time as last heart beat time
+   * @param fs
+   * @param basePath
+   * @param instantTime
+   * @return
+   * @throws IOException
+   */
+  public static Long getLastHeartbeatTime(FileSystem fs, String basePath, 
String instantTime) throws IOException {
+    Path heartbeatFilePath = new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + 
instantTime);
+    if (fs.exists(heartbeatFilePath)) {
+      return fs.getFileStatus(heartbeatFilePath).getModificationTime();
+    } else {
+      // NOTE : This can happen when a writer is upgraded to use lazy cleaning 
and the last write had failed
+      return 0L;
+    }
+  }
+
+  public static boolean isHeartbeatExpired(String instantTime, long 
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) throws 
IOException {
+    Long currentTime = System.currentTimeMillis();
+    Long lastHeartbeatTime = getLastHeartbeatTime(fs, basePath, instantTime);
+    if (currentTime - lastHeartbeatTime > maxAllowableHeartbeatIntervalInMs) {
+      LOG.warn("Heartbeat expired, for instant: " + instantTime);
+      return true;
+    }
+    return false;
+  }

Review Comment:
   These methods in hudi-common are the same as 
org.apache.hudi.client.heartbeat.xxx under hudi-client-common
   
   We can not add hudi-client-common as a dependency in hudi-common. So just 
copy necessary methods here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to