huaxingao commented on code in PR #4659:
URL: https://github.com/apache/polaris/pull/4659#discussion_r3399218639


##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +490,138 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    // Idempotency is opt-in per request (key present) and per deployment 
(feature enabled). When
+    // off, the pre-flight / race-resolution / record steps below are simply 
skipped, leaving the
+    // plain create path. A binding mismatch surfaces as 
IdempotencyConflictException, which
+    // IcebergExceptionMapper maps to HTTP 422 — no inline translation needed.
+    boolean idempotent = idempotencyKey.isPresent() && 
idempotencySupport().isEnabled();
+    IdempotentOperation operation = IdempotentOperation.CREATE_TABLE;
+    String key = idempotencyKey.orElse(null);
+    String principalHash =
+        idempotent ? idempotencySupport().principalHash(polarisPrincipal()) : 
null;
+    String resourceHash =
+        idempotent
+            ? idempotencySupport()
+                .resourceHash(
+                    operation,
+                    namespace.toString(),
+                    request.name(),
+                    resolvedMode.map(Enum::name).orElse("none"))
+            : null;
+
+    // Pre-flight: replay a prior success for this key.
+    if (idempotent) {
+      IdempotencyHandlerSupport.Outcome preflight =
+          idempotencySupport().preflight(key, operation, resourceHash, 
principalHash);

Review Comment:
   Done. `preflight()` now takes the raw inputs `(Optional<UUID> key, 
PolarisPrincipal, operation, resource-identity components)` and owns the whole 
decision: it computes the hashes and returns a top-level `IdempotencyOutcome`.  
  



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +490,138 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    // Idempotency is opt-in per request (key present) and per deployment 
(feature enabled). When
+    // off, the pre-flight / race-resolution / record steps below are simply 
skipped, leaving the
+    // plain create path. A binding mismatch surfaces as 
IdempotencyConflictException, which
+    // IcebergExceptionMapper maps to HTTP 422 — no inline translation needed.
+    boolean idempotent = idempotencyKey.isPresent() && 
idempotencySupport().isEnabled();
+    IdempotentOperation operation = IdempotentOperation.CREATE_TABLE;
+    String key = idempotencyKey.orElse(null);
+    String principalHash =
+        idempotent ? idempotencySupport().principalHash(polarisPrincipal()) : 
null;
+    String resourceHash =
+        idempotent
+            ? idempotencySupport()
+                .resourceHash(
+                    operation,
+                    namespace.toString(),
+                    request.name(),
+                    resolvedMode.map(Enum::name).orElse("none"))
+            : null;
+
+    // Pre-flight: replay a prior success for this key.
+    if (idempotent) {
+      IdempotencyHandlerSupport.Outcome preflight =
+          idempotencySupport().preflight(key, operation, resourceHash, 
principalHash);
+      if (preflight instanceof IdempotencyHandlerSupport.Outcome.Duplicate 
dup) {
+        return replayCreateTableDirect(
+            namespace, request, resolvedMode, refreshCredentialsEndpoint, 
dup.existing());
+      }
+    }
+
+    // Run the operation. A concurrent request carrying the same key can win 
the catalog-level race
+    // and make this attempt fail with AlreadyExistsException; if that winner 
recorded a matching
+    // idempotency outcome, replay it instead of returning a 409.
+    LoadTableResponse response;
+    try {
+      response = doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    } catch (AlreadyExistsException e) {
+      if (idempotent) {
+        Optional<IdempotencyRecord> raceWinner =
+            resolveConcurrentDuplicate(key, operation, resourceHash, 
principalHash);
+        if (raceWinner.isPresent()) {
+          return replayCreateTableDirect(
+              namespace, request, resolvedMode, refreshCredentialsEndpoint, 
raceWinner.get());
+        }
+      }
+      // Not a same-key retry: the table genuinely pre-existed, so this is a 
real conflict.
+      throw e;
+    }
+
+    // Record the successful outcome. If a concurrent caller recorded first, 
replay theirs.
+    if (idempotent) {
+      String metadataLocation = 
response.tableMetadata().metadataFileLocation();
+      IdempotencyHandlerSupport.Outcome recordOutcome =
+          idempotencySupport()
+              .recordOutcome(key, operation, resourceHash, principalHash, 200, 
metadataLocation);
+      if (recordOutcome instanceof IdempotencyHandlerSupport.Outcome.Duplicate 
dup) {
+        // Another caller raced ahead and recorded first. Replay so the 
response is the same shape
+        // (and credentials are freshly vended for this caller) as what the 
race winner returned.
+        return replayCreateTableDirect(
+            namespace, request, resolvedMode, refreshCredentialsEndpoint, 
dup.existing());
+      }
+    }
+    return response;
+  }
+
+  /**
+   * After a concurrent {@code createTable} loses the catalog race 
(AlreadyExistsException), checks
+   * whether the race winner recorded a matching idempotency outcome so this 
caller can replay it.
+   *
+   * <p>The winner records its outcome only after committing the table, so a 
loser may observe the
+   * conflict slightly before the record is visible; this retries the lookup a 
bounded number of
+   * times. A binding mismatch surfaces as {@link 
IdempotencyConflictException} (mapped to 422).
+   * Returns empty if no matching record appears, meaning the conflict was a 
genuine pre-existing
+   * table rather than a same-key retry.
+   */
+  private Optional<IdempotencyRecord> resolveConcurrentDuplicate(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to