the-other-tim-brown commented on code in PR #18073:
URL: https://github.com/apache/hudi/pull/18073#discussion_r2756996956


##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -211,30 +220,42 @@ public void processMarkerCreationRequests(
     synchronized (markerCreationProcessingLock) {
       for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
         String markerName = future.getMarkerName();
-        boolean exists = allMarkers.contains(markerName);
-        if (!exists) {
-          if (conflictDetectionStrategy.isPresent()) {
-            try {
-              
conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary();
-            } catch (HoodieEarlyConflictDetectionException he) {
-              log.error("Detected the write conflict due to a concurrent 
writer, "
-                  + "failing the marker creation as the early conflict 
detection is enabled", he);
-              future.setIsSuccessful(false);
-              continue;
-            } catch (Exception e) {
-              log.warn("Failed to execute early conflict detection. Marker 
creation will continue.", e);
-              // When early conflict detection fails to execute, we still 
allow the marker creation
-              // to continue
-              addMarkerToMap(fileIndex, markerName);
-              future.setIsSuccessful(true);
-              shouldFlushMarkers = true;
-              continue;
-            }
+        String requestId = future.getRequestId();
+        // Idempotent retry: marker already created with same requestId
+        if (markerToRequestIdMap.containsKey(markerName)) {
+          String existingRequestId = markerToRequestIdMap.get(markerName);
+          String normalizedRequestId = requestId == null ? NULL_REQUEST_ID : 
requestId;

Review Comment:
   If the requestId is null we should just skip this check. 



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -211,30 +220,42 @@ public void processMarkerCreationRequests(
     synchronized (markerCreationProcessingLock) {
       for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
         String markerName = future.getMarkerName();
-        boolean exists = allMarkers.contains(markerName);
-        if (!exists) {
-          if (conflictDetectionStrategy.isPresent()) {
-            try {
-              
conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary();
-            } catch (HoodieEarlyConflictDetectionException he) {
-              log.error("Detected the write conflict due to a concurrent 
writer, "
-                  + "failing the marker creation as the early conflict 
detection is enabled", he);
-              future.setIsSuccessful(false);
-              continue;
-            } catch (Exception e) {
-              log.warn("Failed to execute early conflict detection. Marker 
creation will continue.", e);
-              // When early conflict detection fails to execute, we still 
allow the marker creation
-              // to continue
-              addMarkerToMap(fileIndex, markerName);
-              future.setIsSuccessful(true);
-              shouldFlushMarkers = true;
-              continue;
-            }
+        String requestId = future.getRequestId();
+        // Idempotent retry: marker already created with same requestId
+        if (markerToRequestIdMap.containsKey(markerName)) {
+          String existingRequestId = markerToRequestIdMap.get(markerName);
+          String normalizedRequestId = requestId == null ? NULL_REQUEST_ID : 
requestId;
+          boolean idempotentMatch = 
normalizedRequestId.equals(existingRequestId)
+              || NULL_REQUEST_ID.equals(existingRequestId)
+              || NULL_REQUEST_ID.equals(normalizedRequestId);
+          if (idempotentMatch) {
+            future.setIsSuccessful(true);
+          } else {
+            future.setIsSuccessful(false);
           }
-          addMarkerToMap(fileIndex, markerName);
-          shouldFlushMarkers = true;
+          continue;
         }
-        future.setIsSuccessful(!exists);
+        if (conflictDetectionStrategy.isPresent()) {

Review Comment:
   Just simplify this to `else if`?



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -237,18 +240,18 @@ public CompletableFuture<String> createMarker(Context 
context, String markerDir,
         log.warn("Failed to execute early conflict detection. Marker creation 
will continue.", e);
         // When early conflict detection fails to execute, we still allow the 
marker creation
         // to continue
-        return addMarkerCreationRequestForAsyncProcessing(context, markerDir, 
markerName);
+        return addMarkerCreationRequestForAsyncProcessing(context, markerDir, 
markerName, requestId);
       }
     }
 
     // Step 2 create marker
-    return addMarkerCreationRequestForAsyncProcessing(context, markerDir, 
markerName);
+    return addMarkerCreationRequestForAsyncProcessing(context, markerDir, 
markerName, requestId);
   }
 
   private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing(
-      Context context, String markerDir, String markerName) {
+      Context context, String markerDir, String markerName, String requestId) {
     log.debug("Request: Create marker: {}", markerName);
-    MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, 
markerName);
+    MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, 
markerName, requestId);

Review Comment:
   When we receive the request, should we check if there are any existing 
MarkerCreationFuture's with this request ID and return that future instead?



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -265,13 +286,14 @@ public void processMarkerCreationRequests(
    */
   public boolean deleteAllMarkers() {
     boolean result = FSUtils.deleteDir(hoodieEngineContext, storage, 
markerDirPath, parallelism);
-    allMarkers.clear();
+    markerToRequestIdMap.clear();
     fileMarkersMap.clear();
     return result;
   }
 
   /**
    * Syncs all markers maintained in the underlying files under the marker 
directory in the file system.
+   * When TLS recovers from crash, we have no request IDs so store sentinel 
value for each marker.

Review Comment:
   Since we cannot know what the requestID was, we can avoid backfilling this 
map with the `NULL_REQUEST_ID` and let the requests fail to be safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to