zhangyue19921010 commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1029444656
##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +219,116 @@ public static Set<String> readMarkersFromFile(Path
markersFilePath, Serializable
fsDataInputStream = fs.open(markersFilePath);
markers = new
HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
} catch (IOException e) {
- throw new HoodieIOException("Failed to read MARKERS file " +
markersFilePath, e);
+ if (ignoreException) {
+ LOG.warn("IOException occurs during read MARKERS file, ", e);
+ } else {
+ throw new HoodieIOException("Failed to read MARKERS file " +
markersFilePath, e);
+ }
} finally {
closeQuietly(fsDataInputStream);
}
return markers;
}
+
+ public static List<Path> getAllMarkerDir(Path tempPath, FileSystem fs)
throws IOException {
+ return
Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList());
+ }
+
+ public static boolean hasCommitConflict(HoodieActiveTimeline activeTimeline,
Set<String> currentFileIDs, Set<HoodieInstant> completedCommitInstants) {
+
+ Set<HoodieInstant> currentInstants =
activeTimeline.reload().getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+
+ currentInstants.removeAll(completedCommitInstants);
+ Set<String> missingFileIDs = currentInstants.stream().flatMap(instant -> {
+ try {
+ return
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class)
+ .getFileIdAndRelativePaths().keySet().stream();
+ } catch (Exception e) {
+ return Stream.empty();
+ }
+ }).collect(Collectors.toSet());
+ currentFileIDs.retainAll(missingFileIDs);
+ return !currentFileIDs.isEmpty();
+ }
+
+ /**
+ * Get Candidate Instant to do conflict checking:
+ * 1. Skip current writer related instant(currentInstantTime)
+ * 2. Skip all instants after currentInstantTime
+ * 3. Skip dead writers related instants based on heart-beat
+ * 4. Skip pending compaction instant (For now we don' do early conflict
check with compact action)
+ * Because we don't want to let pending compaction block common writer.
+ * @param instants
+ * @return
+ */
+ public static List<String> getCandidateInstants(HoodieActiveTimeline
activeTimeline, List<Path> instants, String currentInstantTime,
+ long
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) {
+
+ HoodieActiveTimeline reloadActive = activeTimeline.reload();
+
+ return instants.stream().map(Path::toString).filter(instantPath -> {
+ String instantTime = markerDirToInstantTime(instantPath);
+ return instantTime.compareToIgnoreCase(currentInstantTime) < 0
+ &&
!reloadActive.filterPendingCompactionTimeline().containsInstant(instantTime)
Review Comment:
Skip clustering instant and compaction instant when doing early conflict
detection.
Because we don't want to let any pending compaction block common writer.
For example there is a failed compaction in active-timeline while common
writer continues to write data to this failed compaction related file groups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]