zhangyue19921010 commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1028283595
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
* @param markerName marker name
* @return the {@code CompletableFuture} instance for the request
*/
- public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName) {
+ public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName,
+ String batchInterval, String
period, String maxAllowableHeartbeatIntervalInMs,
+ String basePath, String
earlyConflictDetectionEnable,
+ String
earlyConflictDetectionClassName) {
+ // Step1 do early conflict detection if enable
+ if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+ try {
+ synchronized (earlyConflictDetectionLock) {
+ if (earlyConflictDetectionStrategy == null) {
+ earlyConflictDetectionStrategy =
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+ }
+
+ if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+ this.currentMarkerDir = markerDir;
+ Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+ Set<HoodieInstant> oldInstants =
viewManager.getFileSystemView(basePath)
+ .getTimeline()
+ .filterCompletedInstants()
+ .filter(instant -> actions.contains(instant.getAction()))
+ .getInstants()
+ .collect(Collectors.toSet());
+
+ earlyConflictDetectionStrategy.fresh(batchInterval, period,
markerDir, basePath, maxAllowableHeartbeatIntervalInMs, fileSystem,
+ this, oldInstants);
+ }
+ }
+
+ if (earlyConflictDetectionStrategy.hasMarkerConflict()) {
+ earlyConflictDetectionStrategy.resolveMarkerConflict(basePath,
markerDir, markerName);
Review Comment:
> The timeline server should simply return false for the marker creation
request
Totally agree with it.
For now timeline server will return false for executor request and and
executor will
```
if (success) {
return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath,
partitionPath), markerFileName));
} else {
// this failed may due to early conflict detection, so we need to
throw out.
throw new HoodieEarlyConflictDetectionException(new
ConcurrentModificationException("Early conflict detected but cannot resolve
conflicts for overlapping writes"));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]