the-other-tim-brown commented on code in PR #18073:
URL: https://github.com/apache/hudi/pull/18073#discussion_r2756996956
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -211,30 +220,42 @@ public void processMarkerCreationRequests(
synchronized (markerCreationProcessingLock) {
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
String markerName = future.getMarkerName();
- boolean exists = allMarkers.contains(markerName);
- if (!exists) {
- if (conflictDetectionStrategy.isPresent()) {
- try {
-
conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary();
- } catch (HoodieEarlyConflictDetectionException he) {
- log.error("Detected the write conflict due to a concurrent
writer, "
- + "failing the marker creation as the early conflict
detection is enabled", he);
- future.setIsSuccessful(false);
- continue;
- } catch (Exception e) {
- log.warn("Failed to execute early conflict detection. Marker
creation will continue.", e);
- // When early conflict detection fails to execute, we still
allow the marker creation
- // to continue
- addMarkerToMap(fileIndex, markerName);
- future.setIsSuccessful(true);
- shouldFlushMarkers = true;
- continue;
- }
+ String requestId = future.getRequestId();
+ // Idempotent retry: marker already created with same requestId
+ if (markerToRequestIdMap.containsKey(markerName)) {
+ String existingRequestId = markerToRequestIdMap.get(markerName);
+ String normalizedRequestId = requestId == null ? NULL_REQUEST_ID :
requestId;
Review Comment:
If the requestId is null we should just skip this check.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -211,30 +220,42 @@ public void processMarkerCreationRequests(
synchronized (markerCreationProcessingLock) {
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
String markerName = future.getMarkerName();
- boolean exists = allMarkers.contains(markerName);
- if (!exists) {
- if (conflictDetectionStrategy.isPresent()) {
- try {
-
conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary();
- } catch (HoodieEarlyConflictDetectionException he) {
- log.error("Detected the write conflict due to a concurrent
writer, "
- + "failing the marker creation as the early conflict
detection is enabled", he);
- future.setIsSuccessful(false);
- continue;
- } catch (Exception e) {
- log.warn("Failed to execute early conflict detection. Marker
creation will continue.", e);
- // When early conflict detection fails to execute, we still
allow the marker creation
- // to continue
- addMarkerToMap(fileIndex, markerName);
- future.setIsSuccessful(true);
- shouldFlushMarkers = true;
- continue;
- }
+ String requestId = future.getRequestId();
+ // Idempotent retry: marker already created with same requestId
+ if (markerToRequestIdMap.containsKey(markerName)) {
+ String existingRequestId = markerToRequestIdMap.get(markerName);
+ String normalizedRequestId = requestId == null ? NULL_REQUEST_ID :
requestId;
+ boolean idempotentMatch =
normalizedRequestId.equals(existingRequestId)
+ || NULL_REQUEST_ID.equals(existingRequestId)
+ || NULL_REQUEST_ID.equals(normalizedRequestId);
+ if (idempotentMatch) {
+ future.setIsSuccessful(true);
+ } else {
+ future.setIsSuccessful(false);
}
- addMarkerToMap(fileIndex, markerName);
- shouldFlushMarkers = true;
+ continue;
}
- future.setIsSuccessful(!exists);
+ if (conflictDetectionStrategy.isPresent()) {
Review Comment:
Just simplify this to `else if`?
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -237,18 +240,18 @@ public CompletableFuture<String> createMarker(Context
context, String markerDir,
log.warn("Failed to execute early conflict detection. Marker creation
will continue.", e);
// When early conflict detection fails to execute, we still allow the
marker creation
// to continue
- return addMarkerCreationRequestForAsyncProcessing(context, markerDir,
markerName);
+ return addMarkerCreationRequestForAsyncProcessing(context, markerDir,
markerName, requestId);
}
}
// Step 2 create marker
- return addMarkerCreationRequestForAsyncProcessing(context, markerDir,
markerName);
+ return addMarkerCreationRequestForAsyncProcessing(context, markerDir,
markerName, requestId);
}
private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing(
- Context context, String markerDir, String markerName) {
+ Context context, String markerDir, String markerName, String requestId) {
log.debug("Request: Create marker: {}", markerName);
- MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir,
markerName);
+ MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir,
markerName, requestId);
Review Comment:
When we receive the request, should we check if there are any existing
MarkerCreationFuture's with this request ID and return that future instead?
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -265,13 +286,14 @@ public void processMarkerCreationRequests(
*/
public boolean deleteAllMarkers() {
boolean result = FSUtils.deleteDir(hoodieEngineContext, storage,
markerDirPath, parallelism);
- allMarkers.clear();
+ markerToRequestIdMap.clear();
fileMarkersMap.clear();
return result;
}
/**
* Syncs all markers maintained in the underlying files under the marker
directory in the file system.
+ * When TLS recovers from crash, we have no request IDs so store sentinel
value for each marker.
Review Comment:
Since we cannot know what the requestID was, we can avoid backfilling this
map with the `NULL_REQUEST_ID` and let the requests fail to be safe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]