nsivabalan commented on code in PR #18073:
URL: https://github.com/apache/hudi/pull/18073#discussion_r2760308179
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -211,30 +220,42 @@ public void processMarkerCreationRequests(
synchronized (markerCreationProcessingLock) {
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
String markerName = future.getMarkerName();
- boolean exists = allMarkers.contains(markerName);
- if (!exists) {
- if (conflictDetectionStrategy.isPresent()) {
- try {
-
conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary();
- } catch (HoodieEarlyConflictDetectionException he) {
- log.error("Detected the write conflict due to a concurrent
writer, "
- + "failing the marker creation as the early conflict
detection is enabled", he);
- future.setIsSuccessful(false);
- continue;
- } catch (Exception e) {
- log.warn("Failed to execute early conflict detection. Marker
creation will continue.", e);
- // When early conflict detection fails to execute, we still
allow the marker creation
- // to continue
- addMarkerToMap(fileIndex, markerName);
- future.setIsSuccessful(true);
- shouldFlushMarkers = true;
- continue;
- }
+ String requestId = future.getRequestId();
+ // Idempotent retry: marker already created with same requestId
+ if (markerToRequestIdMap.containsKey(markerName)) {
+ String existingRequestId = markerToRequestIdMap.get(markerName);
+ String normalizedRequestId = requestId == null ? NULL_REQUEST_ID :
requestId;
Review Comment:
how can incoming requestId could be null.
when this release goes out, any marker request to TLS should have a request
Id set right?
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -211,30 +220,42 @@ public void processMarkerCreationRequests(
synchronized (markerCreationProcessingLock) {
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
String markerName = future.getMarkerName();
- boolean exists = allMarkers.contains(markerName);
- if (!exists) {
- if (conflictDetectionStrategy.isPresent()) {
- try {
-
conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary();
- } catch (HoodieEarlyConflictDetectionException he) {
- log.error("Detected the write conflict due to a concurrent
writer, "
- + "failing the marker creation as the early conflict
detection is enabled", he);
- future.setIsSuccessful(false);
- continue;
- } catch (Exception e) {
- log.warn("Failed to execute early conflict detection. Marker
creation will continue.", e);
- // When early conflict detection fails to execute, we still
allow the marker creation
- // to continue
- addMarkerToMap(fileIndex, markerName);
- future.setIsSuccessful(true);
- shouldFlushMarkers = true;
- continue;
- }
+ String requestId = future.getRequestId();
+ // Idempotent retry: marker already created with same requestId
+ if (markerToRequestIdMap.containsKey(markerName)) {
+ String existingRequestId = markerToRequestIdMap.get(markerName);
+ String normalizedRequestId = requestId == null ? NULL_REQUEST_ID :
requestId;
+ boolean idempotentMatch =
normalizedRequestId.equals(existingRequestId)
+ || NULL_REQUEST_ID.equals(existingRequestId)
+ || NULL_REQUEST_ID.equals(normalizedRequestId);
+ if (idempotentMatch) {
+ future.setIsSuccessful(true);
+ } else {
+ future.setIsSuccessful(false);
}
- addMarkerToMap(fileIndex, markerName);
- shouldFlushMarkers = true;
+ continue;
Review Comment:
we can also move the entire idempotency to a private method and keep this
method lean.
##########
hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java:
##########
@@ -120,6 +122,87 @@ void testCreateMarkerAPIWithDifferentSchemes() throws
IOException {
assertMarkerCreation(tempDir.resolve("base-path-2").toUri().toString(),
"test2:/");
}
+ @Test
+ void testMarkerIdempotency() throws IOException, InterruptedException {
+ String basePath =
tempDir.resolve("base-path-idempotency").toUri().toString();
+ HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath,
getTableType());
+ String markerDir = metaClient.getMarkerFolderPath("102");
+ String markerName = "partition1/file1.parquet.marker.CREATE";
+ String requestId1 = java.util.UUID.randomUUID().toString();
+ String requestId2 = java.util.UUID.randomUUID().toString();
+
+ Map<String, String> queryParameters = new HashMap<>();
+ queryParameters.put(BASEPATH_PARAM, basePath);
+ queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir);
+ queryParameters.put(MARKER_NAME_PARAM, markerName);
+ queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId1);
+
+ // First request
+ boolean result1 = timelineServiceClient.makeRequest(
+ TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL)
+ .addQueryParams(queryParameters)
+ .build())
+ .getDecodedContent(new TypeReference<Boolean>() {});
+ assertTrue(result1, "First marker creation should succeed");
+
+ // Give server time to process
+ Thread.sleep(500);
+
+ // Retry with same requestId (simulating timeout + retry) should succeed
(idempotent)
+ boolean result2 = timelineServiceClient.makeRequest(
+ TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL)
+ .addQueryParams(queryParameters)
+ .build())
+ .getDecodedContent(new TypeReference<Boolean>() {});
+ assertTrue(result2, "Retry with same requestId should succeed
(idempotent)");
+
+ // Retry with different requestId (distinct logical request) should fail
+ queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId2);
+ boolean result3 = timelineServiceClient.makeRequest(
+ TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL)
+ .addQueryParams(queryParameters)
+ .build())
+ .getDecodedContent(new TypeReference<Boolean>() {});
+ assertFalse(result3, "Retry with different requestId should fail");
+ }
+
+ @Test
+ void testMarkerBackwardCompatibilityNullExistingRequestId() throws
IOException, InterruptedException {
Review Comment:
I am not sure if we need this test.
once someone rolls out a release w/ this fix, both driver (where the TLS
runs) and executor gets this new code base. So, there won't be a case, where
driver and TLS is running new code, while executor is running old code or
otherway round.
##########
hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java:
##########
@@ -120,6 +122,87 @@ void testCreateMarkerAPIWithDifferentSchemes() throws
IOException {
assertMarkerCreation(tempDir.resolve("base-path-2").toUri().toString(),
"test2:/");
}
+ @Test
+ void testMarkerIdempotency() throws IOException, InterruptedException {
+ String basePath =
tempDir.resolve("base-path-idempotency").toUri().toString();
+ HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath,
getTableType());
+ String markerDir = metaClient.getMarkerFolderPath("102");
+ String markerName = "partition1/file1.parquet.marker.CREATE";
+ String requestId1 = java.util.UUID.randomUUID().toString();
+ String requestId2 = java.util.UUID.randomUUID().toString();
+
+ Map<String, String> queryParameters = new HashMap<>();
+ queryParameters.put(BASEPATH_PARAM, basePath);
+ queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir);
+ queryParameters.put(MARKER_NAME_PARAM, markerName);
+ queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId1);
+
+ // First request
+ boolean result1 = timelineServiceClient.makeRequest(
+ TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL)
+ .addQueryParams(queryParameters)
+ .build())
+ .getDecodedContent(new TypeReference<Boolean>() {});
+ assertTrue(result1, "First marker creation should succeed");
+
+ // Give server time to process
+ Thread.sleep(500);
Review Comment:
can we poll for marker list and exist early if marker is already from TLS
get apis.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -65,13 +66,14 @@
@Slf4j
public class MarkerDirState implements Serializable {
+ private static final String NULL_REQUEST_ID = "";
// Marker directory
private final StoragePath markerDirPath;
private final HoodieStorage storage;
private final Registry metricsRegistry;
- // A cached copy of all markers in memory
- @Getter
- private final Set<String> allMarkers = new HashSet<>();
+ // Marker name -> request ID (empty string sentinel when TLS recovers from
crash or for legacy clients).
Review Comment:
we can remove mention of `legacy clients` in the comments here.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -265,13 +286,14 @@ public void processMarkerCreationRequests(
*/
public boolean deleteAllMarkers() {
boolean result = FSUtils.deleteDir(hoodieEngineContext, storage,
markerDirPath, parallelism);
- allMarkers.clear();
+ markerToRequestIdMap.clear();
fileMarkersMap.clear();
return result;
}
/**
* Syncs all markers maintained in the underlying files under the marker
directory in the file system.
+ * When TLS recovers from crash, we have no request IDs so store sentinel
value for each marker.
Review Comment:
but to fail the request, we end up looking up in `markerToRequestIdMap`
right. So, to even fail a marker request call to an already existing marker, we
need to populate entries in the map.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -301,9 +325,10 @@ private void syncMarkersFromFileSystem() {
*
* @param fileIndex Marker file index number.
* @param markerName Marker name.
+ * @param requestId Optional request ID for idempotency; null for legacy or
recovery.
*/
- private void addMarkerToMap(int fileIndex, String markerName) {
- allMarkers.add(markerName);
+ private void addMarkerToMap(int fileIndex, String markerName, String
requestId) {
+ markerToRequestIdMap.put(markerName, requestId == null ? NULL_REQUEST_ID :
requestId);
Review Comment:
why account for null requestId?
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java:
##########
@@ -211,30 +220,42 @@ public void processMarkerCreationRequests(
synchronized (markerCreationProcessingLock) {
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
String markerName = future.getMarkerName();
- boolean exists = allMarkers.contains(markerName);
- if (!exists) {
- if (conflictDetectionStrategy.isPresent()) {
- try {
-
conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary();
- } catch (HoodieEarlyConflictDetectionException he) {
- log.error("Detected the write conflict due to a concurrent
writer, "
- + "failing the marker creation as the early conflict
detection is enabled", he);
- future.setIsSuccessful(false);
- continue;
- } catch (Exception e) {
- log.warn("Failed to execute early conflict detection. Marker
creation will continue.", e);
- // When early conflict detection fails to execute, we still
allow the marker creation
- // to continue
- addMarkerToMap(fileIndex, markerName);
- future.setIsSuccessful(true);
- shouldFlushMarkers = true;
- continue;
- }
+ String requestId = future.getRequestId();
+ // Idempotent retry: marker already created with same requestId
+ if (markerToRequestIdMap.containsKey(markerName)) {
+ String existingRequestId = markerToRequestIdMap.get(markerName);
+ String normalizedRequestId = requestId == null ? NULL_REQUEST_ID :
requestId;
+ boolean idempotentMatch =
normalizedRequestId.equals(existingRequestId)
+ || NULL_REQUEST_ID.equals(existingRequestId)
+ || NULL_REQUEST_ID.equals(normalizedRequestId);
+ if (idempotentMatch) {
+ future.setIsSuccessful(true);
+ } else {
+ future.setIsSuccessful(false);
}
- addMarkerToMap(fileIndex, markerName);
- shouldFlushMarkers = true;
+ continue;
Review Comment:
essentially, we can replace
```
boolean exists = allMarkers.contains(markerName);
```
with
```
boolean exists = isMarkerAlreadySeen(markerName, requestId);
```
then, we do not need to touch any other line in this code block.
feel free to rename `exists` to some other apt name if need be
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]