PavithranRick commented on code in PR #18073:
URL: https://github.com/apache/hudi/pull/18073#discussion_r2806294277


##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -119,15 +130,27 @@ public boolean exists() {
   }
 
   /**
-   * Adds a {@code MarkerCreationCompletableFuture} instance from a marker
-   * creation request to the queue.
+   * Checks if an identical in-flight request exists and returns it, or 
creates a new future.
+   * This deduplicates concurrent requests with the same marker and requestId.
    *
-   * @param future  {@code MarkerCreationCompletableFuture} instance.
+   * @param context    Javalin context
+   * @param markerName Marker name
+   * @param requestId  Request ID (non-null from clients, can be null only 
during recovery)
+   * @return Existing future if found, otherwise a new future
    */
-  public void addMarkerCreationFuture(MarkerCreationFuture future) {
-    synchronized (markerCreationFutures) {
-      markerCreationFutures.add(future);
+  public MarkerCreationFuture getOrCreateMarkerCreationFuture(Context context, 
String markerName, String requestId) {
+    String dedupKey = markerName + "|" + (requestId != null ? requestId : 
NULL_REQUEST_ID);
+
+    MarkerCreationFuture existingFuture = inflightRequestMap.get(dedupKey);

Review Comment:
   Used `computeIfAbsent` and put this inside a synchronized block



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -174,23 +197,35 @@ public List<MarkerCreationFuture> 
fetchPendingMarkerCreationRequests() {
   }
 
   /**
-   * @param shouldClear Should clear the internal request list or not.
+   * @param shouldClear Should clear the internal request map or not.
    * @return futures of pending marker creation requests.
    */
   public List<MarkerCreationFuture> getPendingMarkerCreationRequests(boolean 
shouldClear) {
-    List<MarkerCreationFuture> pendingFutures;
-    synchronized (markerCreationFutures) {
-      if (markerCreationFutures.isEmpty()) {
-        return new ArrayList<>();
-      }
-      pendingFutures = new ArrayList<>(markerCreationFutures);
-      if (shouldClear) {
-        markerCreationFutures.clear();

Review Comment:
   Thanks for catching. Used `synchronized` block



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -225,16 +262,16 @@ public void processMarkerCreationRequests(
               log.warn("Failed to execute early conflict detection. Marker 
creation will continue.", e);
               // When early conflict detection fails to execute, we still 
allow the marker creation
               // to continue
-              addMarkerToMap(fileIndex, markerName);
+              addMarkerToMap(fileIndex, markerName, requestId);
               future.setIsSuccessful(true);
               shouldFlushMarkers = true;
               continue;
             }
           }
-          addMarkerToMap(fileIndex, markerName);
+          addMarkerToMap(fileIndex, markerName, requestId);
+          future.setIsSuccessful(true);

Review Comment:
   yes. This line inside `isMarkerAlreadySeen` should take care it now 
`future.setIsSuccessful(idempotentMatch);`



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -80,8 +83,9 @@ public class MarkerDirState implements Serializable {
   // {@code true} means the file is in use by a {@code 
BatchCreateMarkerRunnable}.
   // Index of the list is used for the filename, i.e., "1" -> "MARKERS1"
   private final List<Boolean> threadUseStatus;
-  // A list of pending futures from async marker creation requests
-  private final List<MarkerCreationFuture> markerCreationFutures = new 
ArrayList<>();
+  // Map of in-flight requests: (markerName + "|" + requestId) -> 
MarkerCreationFuture
+  // Used for BOTH deduplication AND batch marker creation requests queue
+  private final Map<String, MarkerCreationFuture> inflightRequestMap = new 
ConcurrentHashMap<>();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to