the-other-tim-brown commented on code in PR #18073:
URL: https://github.com/apache/hudi/pull/18073#discussion_r2788444284
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -119,15 +130,27 @@ public boolean exists() {
}
/**
- * Adds a {@code MarkerCreationCompletableFuture} instance from a marker
- * creation request to the queue.
+ * Checks if an identical in-flight request exists and returns it, or
creates a new future.
+ * This deduplicates concurrent requests with the same marker and requestId.
*
- * @param future {@code MarkerCreationCompletableFuture} instance.
+ * @param context Javalin context
+ * @param markerName Marker name
+ * @param requestId Request ID (non-null from clients, can be null only
during recovery)
+ * @return Existing future if found, otherwise a new future
*/
- public void addMarkerCreationFuture(MarkerCreationFuture future) {
- synchronized (markerCreationFutures) {
- markerCreationFutures.add(future);
+ public MarkerCreationFuture getOrCreateMarkerCreationFuture(Context context,
String markerName, String requestId) {
+ String dedupKey = markerName + "|" + (requestId != null ? requestId :
NULL_REQUEST_ID);
+
+ MarkerCreationFuture existingFuture = inflightRequestMap.get(dedupKey);
Review Comment:
I'm wondering if we should use `computeIfAbsent` to avoid any race condition
between this check and then adding the future below.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -174,23 +197,35 @@ public List<MarkerCreationFuture>
fetchPendingMarkerCreationRequests() {
}
/**
- * @param shouldClear Should clear the internal request list or not.
+ * @param shouldClear Should clear the internal request map or not.
* @return futures of pending marker creation requests.
*/
public List<MarkerCreationFuture> getPendingMarkerCreationRequests(boolean
shouldClear) {
- List<MarkerCreationFuture> pendingFutures;
- synchronized (markerCreationFutures) {
- if (markerCreationFutures.isEmpty()) {
- return new ArrayList<>();
- }
- pendingFutures = new ArrayList<>(markerCreationFutures);
- if (shouldClear) {
- markerCreationFutures.clear();
Review Comment:
This section is no longer in a synchronized block so it may be susceptible
to race conditions like a value gets added after line 208 executes and then is
cleared in line 210 causing the request to be dropped.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -265,13 +286,14 @@ public void processMarkerCreationRequests(
*/
public boolean deleteAllMarkers() {
boolean result = FSUtils.deleteDir(hoodieEngineContext, storage,
markerDirPath, parallelism);
- allMarkers.clear();
+ markerToRequestIdMap.clear();
fileMarkersMap.clear();
return result;
}
/**
* Syncs all markers maintained in the underlying files under the marker
directory in the file system.
+ * When TLS recovers from crash, we have no request IDs so store sentinel
value for each marker.
Review Comment:
Bump on this, I still feel like this is introducing unnecessary risk
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -225,16 +262,16 @@ public void processMarkerCreationRequests(
log.warn("Failed to execute early conflict detection. Marker
creation will continue.", e);
// When early conflict detection fails to execute, we still
allow the marker creation
// to continue
- addMarkerToMap(fileIndex, markerName);
+ addMarkerToMap(fileIndex, markerName, requestId);
future.setIsSuccessful(true);
shouldFlushMarkers = true;
continue;
}
}
- addMarkerToMap(fileIndex, markerName);
+ addMarkerToMap(fileIndex, markerName, requestId);
+ future.setIsSuccessful(true);
Review Comment:
In the case where `exists` is `true` the `isSuccessful` on the future used
to be marked as `false` but now it is not set, is that intentional?
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -174,23 +197,35 @@ public List<MarkerCreationFuture>
fetchPendingMarkerCreationRequests() {
}
/**
- * @param shouldClear Should clear the internal request list or not.
+ * @param shouldClear Should clear the internal request map or not.
* @return futures of pending marker creation requests.
*/
public List<MarkerCreationFuture> getPendingMarkerCreationRequests(boolean
shouldClear) {
- List<MarkerCreationFuture> pendingFutures;
- synchronized (markerCreationFutures) {
- if (markerCreationFutures.isEmpty()) {
- return new ArrayList<>();
- }
- pendingFutures = new ArrayList<>(markerCreationFutures);
- if (shouldClear) {
- markerCreationFutures.clear();
- }
+ if (inflightRequestMap.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<MarkerCreationFuture> pendingFutures = new
ArrayList<>(inflightRequestMap.values());
+ if (shouldClear) {
+ inflightRequestMap.clear();
}
return pendingFutures;
}
+ private boolean isMarkerAlreadySeen(MarkerCreationFuture future, String
markerName, String requestId) {
Review Comment:
Let's add a quick javadoc explaining that this method is meant to return
true if the marker was created by another request
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]