dimas-b commented on code in PR #4659:
URL: https://github.com/apache/polaris/pull/4659#discussion_r3391430766


##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -416,30 +431,43 @@ public ListTablesResponse listTables(Namespace namespace) 
{
   }
 
   /**
-   * Create a table.
+   * Convenience entry point used by tests that exercise the authorization 
wiring without going
+   * through the REST adapter. Production callers go through the four-arg 
overload via the adapter.
    *
    * @param namespace the namespace to create the table in
    * @param request the table creation request
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
+  @VisibleForTesting
   public LoadTableResponse createTableDirect(Namespace namespace, 
CreateTableRequest request) {
     return createTableDirect(
-        namespace, request, EnumSet.noneOf(AccessDelegationMode.class), 
Optional.empty());
+        namespace,
+        request,
+        EnumSet.noneOf(AccessDelegationMode.class),
+        Optional.empty(),
+        Optional.empty());
   }
 
   /**
-   * Create a table.
+   * Convenience entry point used by tests for the write-delegation variant. 
Production callers go
+   * through the adapter, which resolves the delegation modes and 
refresh-credentials endpoint from
+   * the request.
    *
    * @param namespace the namespace to create the table in
    * @param request the table creation request
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
+  @VisibleForTesting
   public LoadTableResponse createTableDirectWithWriteDelegation(

Review Comment:
   side note: this method is not used on "prod" call paths. We should remove it 
and refactor tests to use "prod" methods.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java:
##########
@@ -81,17 +82,38 @@ public class IcebergCatalogAdapter
   private final CatalogPrefixParser prefixParser;
   private final ReservedProperties reservedProperties;
   private final IcebergCatalogHandlerFactory handlerFactory;
+  private final IdempotencyHandlerSupport idempotencySupport;
 
   @Inject
   public IcebergCatalogAdapter(
       CallContext callContext,
       CatalogPrefixParser prefixParser,
       ReservedProperties reservedProperties,
-      IcebergCatalogHandlerFactory handlerFactory) {
+      IcebergCatalogHandlerFactory handlerFactory,
+      IdempotencyHandlerSupport idempotencySupport) {
     this.realmConfig = callContext.getRealmConfig();
     this.prefixParser = prefixParser;
     this.reservedProperties = reservedProperties;
     this.handlerFactory = handlerFactory;
+    this.idempotencySupport = idempotencySupport;
+  }
+
+  /**
+   * Validates the {@code Idempotency-Key} parameter auto-bound from the 
request header by the
+   * OpenAPI-generated stub. Returns the normalised key string when present 
and idempotency is
+   * enabled; otherwise {@link Optional#empty()}.
+   *
+   * @throws BadRequestException if the header is present but not a valid 
UUIDv7
+   */
+  private Optional<String> validatedIdempotencyKey(UUID idempotencyKey) {
+    if (idempotencyKey == null) {
+      return Optional.empty();
+    }
+    try {
+      return idempotencySupport.validatedKey(idempotencyKey.toString());
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException("%s", e.getMessage());

Review Comment:
   Doesn't `IllegalArgumentException` result in a 400 response by itself (same 
as `BadRequestException`)?



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Handler-side helper for the single-transaction ("optimistic commit") 
idempotency model.
+ *
+ * <p>Responsibilities:
+ *
+ * <ul>
+ *   <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 
only).
+ *   <li>Compute the principal/resource hashes that form the binding stored 
alongside each record.
+ *   <li>Pre-flight: look up an existing record for the same {@code (realm, 
key)} and dispatch into
+ *       {@link Outcome#owned()} or {@link 
Outcome#duplicate(IdempotencyRecord)} for the handler.
+ *   <li>Record the terminal outcome after a successful operation, returning 
{@link Outcome#owned()}
+ *       on win and {@link Outcome#duplicate(IdempotencyRecord)} on a 
race-driven duplicate.
+ * </ul>
+ *
+ * <p>This bean is {@link RequestScoped}: a single request operates within one 
realm, so the
+ * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link
+ * IdempotencyStoreFactory} for the request's {@link RealmContext}. When 
idempotency is disabled the
+ * store is never resolved — the bean is an inert shell. No response body is 
stored; duplicate
+ * responses are rebuilt from authoritative catalog state by the handler 
itself.
+ */
+@RequestScoped
+public class IdempotencyHandlerSupport {
+
+  // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+  private static final Pattern UUID_V7_PATTERN =
+      Pattern.compile(
+          
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+          Pattern.CASE_INSENSITIVE);
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject IdempotencyStoreFactory storeFactory;
+  @Inject RealmContext realmContext;
+  @Inject Clock clock;
+
+  // Resolved lazily on first use within the request; never resolved when 
idempotency is disabled.
+  private IdempotencyStore store;
+
+  /**
+   * Returns an instance with idempotency permanently disabled. Useful for 
test fixtures that need a
+   * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent 
code paths.
+   */
+  public static IdempotencyHandlerSupport disabled() {
+    IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport();
+    instance.configuration = DisabledConfiguration.INSTANCE;
+    return instance;
+  }
+
+  /** Returns {@code true} if handler-level idempotency is enabled. */
+  public boolean isEnabled() {
+    return configuration != null && configuration.enabled();
+  }
+
+  /**
+   * Reads and validates the idempotency key from the request headers using 
the deploy-time
+   * configured header name from {@link IdempotencyConfiguration#keyHeader()}.
+   *
+   * @return validated key, or {@link Optional#empty()} if {@code httpHeaders} 
is null, the header
+   *     is absent / blank, or idempotency is disabled
+   * @throws IllegalArgumentException if the header is present but not a valid 
UUIDv7 (callers
+   *     translate this into a 400 Bad Request)
+   */
+  public Optional<String> validatedKey(@Nullable HttpHeaders httpHeaders) {
+    if (httpHeaders == null) {
+      return Optional.empty();
+    }
+    return 
validatedKey(httpHeaders.getHeaderString(configuration.keyHeader()));
+  }
+
+  /**
+   * Validates a raw header value. UUIDv7 is required so that the key has 
enough entropy to be a
+   * meaningful idempotency boundary.
+   */
+  public Optional<String> validatedKey(@Nullable String headerValue) {
+    if (!isEnabled() || headerValue == null) {
+      return Optional.empty();
+    }
+    String trimmed = headerValue.trim();
+    if (trimmed.isEmpty()) {
+      return Optional.empty();
+    }
+    if (!UUID_V7_PATTERN.matcher(trimmed).matches()) {
+      throw new IllegalArgumentException(
+          "Idempotency-Key must be a UUIDv7; got: " + summarizeKey(trimmed));
+    }
+    try {
+      UUID.fromString(trimmed);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Idempotency-Key must be a valid 
UUID");
+    }
+    return Optional.of(trimmed.toLowerCase(Locale.ROOT));
+  }
+
+  /**
+   * SHA-256 (hex) of the calling principal's identity, bound to the realm. 
The input is the
+   * principal name, the realm id, and the activated role set, canonicalised 
so the hash is
+   * deterministic and order-independent.
+   *
+   * <p>Roles are part of the binding so two callers that share a name but 
differ in activated roles
+   * do not collide. Principal properties are intentionally excluded: they are 
admin-mutable and not
+   * authentication context.
+   */
+  public String principalHash(PolarisPrincipal principal, String realmId) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("name=").append(principal.getName()).append('|');
+    sb.append("realm=").append(realmId).append('|');
+    sb.append("roles=");
+    new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(','));
+    return sha256Hex(sb.toString());
+  }
+
+  /**
+   * SHA-256 hex of an arbitrary string used as the resource binding 
component. The caller supplies
+   * a stable resource identity (e.g. operation, namespace, name and 
access-delegation modes); the
+   * request payload itself is intentionally not part of the binding.
+   */
+  public String resourceHash(String value) {
+    return sha256Hex(value);
+  }
+
+  /**
+   * Pre-flight: look up an existing record for {@code (realm, key)} and 
dispatch the handler.
+   *
+   * <p>If an existing record is found, the binding is validated against the 
current request and a
+   * mismatch raises {@link ConflictException} (handler maps to HTTP 422). If 
the binding matches,
+   * the existing record is returned as a {@link Outcome.Duplicate} so the 
handler can rebuild the
+   * response from current catalog state.
+   *
+   * <p>If no record exists, this returns {@link Outcome.Owned}; the handler 
should perform the
+   * operation and then call {@link #recordOutcome(String, String, String, 
String, int, String)} to
+   * commit a record.
+   */
+  public Outcome preflight(
+      String idempotencyKey, String operationType, String resourceHash, String 
principalHash) {
+    Optional<IdempotencyRecord> existing =
+        store().load(realmContext.getRealmIdentifier(), idempotencyKey);
+    if (existing.isEmpty()) {
+      return Outcome.owned();
+    }
+    return matchOrConflict(existing.get(), operationType, resourceHash, 
principalHash);
+  }
+
+  /**
+   * Records the terminal outcome of an operation that just completed.
+   *
+   * <p>The insert is atomic on {@code (realm, key)}; if another caller raced 
ahead and inserted
+   * first, the returned {@link Outcome} carries that existing record so the 
handler can rebuild an
+   * equivalent response from current state.
+   */
+  public Outcome recordOutcome(
+      String idempotencyKey,
+      String operationType,
+      String resourceHash,
+      String principalHash,
+      int httpStatus,
+      @Nullable String metadataLocation) {
+    Instant now = clock.instant();
+    Instant expiresAt = now.plus(configuration.ttl());
+    IdempotencyStore.RecordResult result =
+        store()
+            .recordIfAbsent(
+                realmContext.getRealmIdentifier(),
+                idempotencyKey,
+                operationType,
+                resourceHash,
+                principalHash,
+                httpStatus,
+                metadataLocation,
+                now,
+                expiresAt);
+    if (result.type() == IdempotencyStore.RecordResultType.OWNED) {
+      return Outcome.owned();
+    }
+    IdempotencyRecord existing =
+        result
+            .existing()
+            .orElseThrow(() -> new IllegalStateException("DUPLICATE result 
without record"));
+    return matchOrConflict(existing, operationType, resourceHash, 
principalHash);
+  }
+
+  /**
+   * Resolves (once per request) the realm-scoped {@link IdempotencyStore} for 
the request's realm.
+   * Only called from the idempotency code paths, so it is never invoked when 
the feature is
+   * disabled.
+   */
+  private IdempotencyStore store() {
+    if (store == null) {
+      store = storeFactory.getOrCreateIdempotencyStore(realmContext);
+    }
+    return store;
+  }
+
+  private static Outcome matchOrConflict(
+      IdempotencyRecord existing,
+      String expectedOperationType,
+      String expectedResourceHash,
+      String expectedPrincipalHash) {
+    if (!expectedPrincipalHash.equals(existing.principalHash())) {
+      throw new ConflictException(
+          "Idempotency-Key already used by a different caller for the same 
key");
+    }
+    if (!expectedResourceHash.equals(existing.resourceHash())
+        || !expectedOperationType.equals(existing.operationType())) {
+      throw new ConflictException(
+          "Idempotency-Key already used for a different operation or 
resource");
+    }

Review Comment:
   nit: Is it so critical to distinguish these two cases?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +488,167 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    if (idempotencyKey.isEmpty() || !idempotencySupport().isEnabled()) {
+      return doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    }
+    return createTableDirectIdempotently(
+        namespace,
+        request,
+        delegationModes,
+        resolvedMode,
+        refreshCredentialsEndpoint,
+        idempotencyKey.get());
+  }
+
+  private LoadTableResponse createTableDirectIdempotently(
+      Namespace namespace,
+      CreateTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<AccessDelegationMode> resolvedMode,
+      Optional<String> refreshCredentialsEndpoint,
+      String idempotencyKey) {
+
+    String operationType = "create-table";
+    String principalHash =
+        idempotencySupport().principalHash(polarisPrincipal(), 
realmContext().getRealmIdentifier());
+    String resourceHash =
+        idempotencySupport()
+            .resourceHash(
+                "create-table:"
+                    + namespace.toString()
+                    + ":"
+                    + request.name()
+                    + ":"
+                    + delegationModesToken(delegationModes));
+
+    // Pre-flight: replay a prior success, or 422 on a binding mismatch (same 
key, different
+    // request/principal).
+    try {
+      IdempotencyHandlerSupport.Outcome preflight =
+          idempotencySupport()
+              .preflight(idempotencyKey, operationType, resourceHash, 
principalHash);
+      if (preflight instanceof IdempotencyHandlerSupport.Outcome.Duplicate 
dup) {
+        return replayCreateTableDirect(
+            namespace, request, resolvedMode, refreshCredentialsEndpoint, 
dup.existing());
+      }
+    } catch (IdempotencyHandlerSupport.ConflictException e) {
+      throw new UnprocessableEntityException("%s", e.getMessage());

Review Comment:
   Why not add mappings to `PolarisExceptionMapper`?.. and make 
`ConflictException` a top-level class?



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Handler-side helper for the single-transaction ("optimistic commit") 
idempotency model.
+ *
+ * <p>Responsibilities:
+ *
+ * <ul>
+ *   <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 
only).
+ *   <li>Compute the principal/resource hashes that form the binding stored 
alongside each record.
+ *   <li>Pre-flight: look up an existing record for the same {@code (realm, 
key)} and dispatch into
+ *       {@link Outcome#owned()} or {@link 
Outcome#duplicate(IdempotencyRecord)} for the handler.
+ *   <li>Record the terminal outcome after a successful operation, returning 
{@link Outcome#owned()}
+ *       on win and {@link Outcome#duplicate(IdempotencyRecord)} on a 
race-driven duplicate.
+ * </ul>
+ *
+ * <p>This bean is {@link RequestScoped}: a single request operates within one 
realm, so the
+ * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link
+ * IdempotencyStoreFactory} for the request's {@link RealmContext}. When 
idempotency is disabled the
+ * store is never resolved — the bean is an inert shell. No response body is 
stored; duplicate
+ * responses are rebuilt from authoritative catalog state by the handler 
itself.
+ */
+@RequestScoped
+public class IdempotencyHandlerSupport {
+
+  // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+  private static final Pattern UUID_V7_PATTERN =
+      Pattern.compile(
+          
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+          Pattern.CASE_INSENSITIVE);
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject IdempotencyStoreFactory storeFactory;
+  @Inject RealmContext realmContext;
+  @Inject Clock clock;
+
+  // Resolved lazily on first use within the request; never resolved when 
idempotency is disabled.
+  private IdempotencyStore store;
+
+  /**
+   * Returns an instance with idempotency permanently disabled. Useful for 
test fixtures that need a
+   * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent 
code paths.
+   */
+  public static IdempotencyHandlerSupport disabled() {
+    IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport();
+    instance.configuration = DisabledConfiguration.INSTANCE;
+    return instance;
+  }
+
+  /** Returns {@code true} if handler-level idempotency is enabled. */
+  public boolean isEnabled() {
+    return configuration != null && configuration.enabled();
+  }
+
+  /**
+   * Reads and validates the idempotency key from the request headers using 
the deploy-time
+   * configured header name from {@link IdempotencyConfiguration#keyHeader()}.
+   *
+   * @return validated key, or {@link Optional#empty()} if {@code httpHeaders} 
is null, the header
+   *     is absent / blank, or idempotency is disabled
+   * @throws IllegalArgumentException if the header is present but not a valid 
UUIDv7 (callers
+   *     translate this into a 400 Bad Request)
+   */
+  public Optional<String> validatedKey(@Nullable HttpHeaders httpHeaders) {
+    if (httpHeaders == null) {
+      return Optional.empty();
+    }
+    return 
validatedKey(httpHeaders.getHeaderString(configuration.keyHeader()));
+  }
+
+  /**
+   * Validates a raw header value. UUIDv7 is required so that the key has 
enough entropy to be a
+   * meaningful idempotency boundary.
+   */
+  public Optional<String> validatedKey(@Nullable String headerValue) {
+    if (!isEnabled() || headerValue == null) {
+      return Optional.empty();
+    }
+    String trimmed = headerValue.trim();
+    if (trimmed.isEmpty()) {
+      return Optional.empty();
+    }
+    if (!UUID_V7_PATTERN.matcher(trimmed).matches()) {
+      throw new IllegalArgumentException(
+          "Idempotency-Key must be a UUIDv7; got: " + summarizeKey(trimmed));
+    }
+    try {
+      UUID.fromString(trimmed);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Idempotency-Key must be a valid 
UUID");
+    }
+    return Optional.of(trimmed.toLowerCase(Locale.ROOT));
+  }
+
+  /**
+   * SHA-256 (hex) of the calling principal's identity, bound to the realm. 
The input is the
+   * principal name, the realm id, and the activated role set, canonicalised 
so the hash is
+   * deterministic and order-independent.
+   *
+   * <p>Roles are part of the binding so two callers that share a name but 
differ in activated roles
+   * do not collide. Principal properties are intentionally excluded: they are 
admin-mutable and not
+   * authentication context.
+   */
+  public String principalHash(PolarisPrincipal principal, String realmId) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("name=").append(principal.getName()).append('|');
+    sb.append("realm=").append(realmId).append('|');
+    sb.append("roles=");
+    new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(','));
+    return sha256Hex(sb.toString());
+  }
+
+  /**
+   * SHA-256 hex of an arbitrary string used as the resource binding 
component. The caller supplies
+   * a stable resource identity (e.g. operation, namespace, name and 
access-delegation modes); the
+   * request payload itself is intentionally not part of the binding.
+   */
+  public String resourceHash(String value) {
+    return sha256Hex(value);
+  }
+
+  /**
+   * Pre-flight: look up an existing record for {@code (realm, key)} and 
dispatch the handler.
+   *
+   * <p>If an existing record is found, the binding is validated against the 
current request and a
+   * mismatch raises {@link ConflictException} (handler maps to HTTP 422). If 
the binding matches,
+   * the existing record is returned as a {@link Outcome.Duplicate} so the 
handler can rebuild the
+   * response from current catalog state.
+   *
+   * <p>If no record exists, this returns {@link Outcome.Owned}; the handler 
should perform the
+   * operation and then call {@link #recordOutcome(String, String, String, 
String, int, String)} to
+   * commit a record.
+   */
+  public Outcome preflight(
+      String idempotencyKey, String operationType, String resourceHash, String 
principalHash) {
+    Optional<IdempotencyRecord> existing =
+        store().load(realmContext.getRealmIdentifier(), idempotencyKey);
+    if (existing.isEmpty()) {
+      return Outcome.owned();
+    }
+    return matchOrConflict(existing.get(), operationType, resourceHash, 
principalHash);
+  }
+
+  /**
+   * Records the terminal outcome of an operation that just completed.
+   *
+   * <p>The insert is atomic on {@code (realm, key)}; if another caller raced 
ahead and inserted
+   * first, the returned {@link Outcome} carries that existing record so the 
handler can rebuild an
+   * equivalent response from current state.
+   */
+  public Outcome recordOutcome(
+      String idempotencyKey,
+      String operationType,
+      String resourceHash,
+      String principalHash,
+      int httpStatus,
+      @Nullable String metadataLocation) {
+    Instant now = clock.instant();
+    Instant expiresAt = now.plus(configuration.ttl());
+    IdempotencyStore.RecordResult result =
+        store()
+            .recordIfAbsent(
+                realmContext.getRealmIdentifier(),
+                idempotencyKey,
+                operationType,
+                resourceHash,
+                principalHash,
+                httpStatus,
+                metadataLocation,
+                now,
+                expiresAt);
+    if (result.type() == IdempotencyStore.RecordResultType.OWNED) {
+      return Outcome.owned();
+    }
+    IdempotencyRecord existing =
+        result
+            .existing()
+            .orElseThrow(() -> new IllegalStateException("DUPLICATE result 
without record"));
+    return matchOrConflict(existing, operationType, resourceHash, 
principalHash);
+  }
+
+  /**
+   * Resolves (once per request) the realm-scoped {@link IdempotencyStore} for 
the request's realm.
+   * Only called from the idempotency code paths, so it is never invoked when 
the feature is
+   * disabled.
+   */
+  private IdempotencyStore store() {
+    if (store == null) {
+      store = storeFactory.getOrCreateIdempotencyStore(realmContext);
+    }
+    return store;
+  }
+
+  private static Outcome matchOrConflict(
+      IdempotencyRecord existing,
+      String expectedOperationType,
+      String expectedResourceHash,
+      String expectedPrincipalHash) {
+    if (!expectedPrincipalHash.equals(existing.principalHash())) {
+      throw new ConflictException(
+          "Idempotency-Key already used by a different caller for the same 
key");
+    }
+    if (!expectedResourceHash.equals(existing.resourceHash())
+        || !expectedOperationType.equals(existing.operationType())) {
+      throw new ConflictException(
+          "Idempotency-Key already used for a different operation or 
resource");
+    }
+    return Outcome.duplicate(existing);
+  }
+
+  private static String sha256Hex(@Nonnull String input) {

Review Comment:
   Why not `DigestUtils.sha256Hex()`?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +488,167 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    if (idempotencyKey.isEmpty() || !idempotencySupport().isEnabled()) {
+      return doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    }
+    return createTableDirectIdempotently(
+        namespace,
+        request,
+        delegationModes,
+        resolvedMode,
+        refreshCredentialsEndpoint,
+        idempotencyKey.get());
+  }
+
+  private LoadTableResponse createTableDirectIdempotently(
+      Namespace namespace,
+      CreateTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<AccessDelegationMode> resolvedMode,
+      Optional<String> refreshCredentialsEndpoint,
+      String idempotencyKey) {
+
+    String operationType = "create-table";
+    String principalHash =
+        idempotencySupport().principalHash(polarisPrincipal(), 
realmContext().getRealmIdentifier());
+    String resourceHash =
+        idempotencySupport()
+            .resourceHash(
+                "create-table:"
+                    + namespace.toString()
+                    + ":"
+                    + request.name()
+                    + ":"
+                    + delegationModesToken(delegationModes));
+
+    // Pre-flight: replay a prior success, or 422 on a binding mismatch (same 
key, different
+    // request/principal).
+    try {
+      IdempotencyHandlerSupport.Outcome preflight =
+          idempotencySupport()
+              .preflight(idempotencyKey, operationType, resourceHash, 
principalHash);
+      if (preflight instanceof IdempotencyHandlerSupport.Outcome.Duplicate 
dup) {
+        return replayCreateTableDirect(
+            namespace, request, resolvedMode, refreshCredentialsEndpoint, 
dup.existing());
+      }
+    } catch (IdempotencyHandlerSupport.ConflictException e) {
+      throw new UnprocessableEntityException("%s", e.getMessage());
+    }
+
+    // Run the operation. A concurrent request carrying the same key can win 
the catalog-level race
+    // and make this attempt fail with AlreadyExistsException; if that winner 
recorded a matching
+    // idempotency outcome, replay it instead of returning a 409.
+    LoadTableResponse response;
+    try {
+      response = doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    } catch (AlreadyExistsException e) {
+      try {
+        Optional<IdempotencyRecord> raceWinner =
+            resolveConcurrentDuplicate(idempotencyKey, operationType, 
resourceHash, principalHash);

Review Comment:
   The current code weaves idempotency and old logic (`doCreateTableDirect()` 
specifically) back-and-forth. This is not easy to understand and maintaining 
this code will probably be harder.
   
   WDYT about using one flow of code for both cases, but if idempotency is not 
enabled certain actions will be "no op"? For example, here 
`resolveConcurrentDuplicate()` could return a empty `Optional` when idempotency 
is disabled.
   
   The `preflight()` call above could also be conditional.
   
   All in all, I think we can fold this method into `createTableDirect()`. 



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Handler-side helper for the single-transaction ("optimistic commit") 
idempotency model.
+ *
+ * <p>Responsibilities:
+ *
+ * <ul>
+ *   <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 
only).
+ *   <li>Compute the principal/resource hashes that form the binding stored 
alongside each record.
+ *   <li>Pre-flight: look up an existing record for the same {@code (realm, 
key)} and dispatch into
+ *       {@link Outcome#owned()} or {@link 
Outcome#duplicate(IdempotencyRecord)} for the handler.
+ *   <li>Record the terminal outcome after a successful operation, returning 
{@link Outcome#owned()}
+ *       on win and {@link Outcome#duplicate(IdempotencyRecord)} on a 
race-driven duplicate.
+ * </ul>
+ *
+ * <p>This bean is {@link RequestScoped}: a single request operates within one 
realm, so the
+ * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link
+ * IdempotencyStoreFactory} for the request's {@link RealmContext}. When 
idempotency is disabled the
+ * store is never resolved — the bean is an inert shell. No response body is 
stored; duplicate
+ * responses are rebuilt from authoritative catalog state by the handler 
itself.
+ */
+@RequestScoped
+public class IdempotencyHandlerSupport {
+
+  // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+  private static final Pattern UUID_V7_PATTERN =
+      Pattern.compile(
+          
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+          Pattern.CASE_INSENSITIVE);
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject IdempotencyStoreFactory storeFactory;
+  @Inject RealmContext realmContext;
+  @Inject Clock clock;
+
+  // Resolved lazily on first use within the request; never resolved when 
idempotency is disabled.
+  private IdempotencyStore store;
+
+  /**
+   * Returns an instance with idempotency permanently disabled. Useful for 
test fixtures that need a
+   * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent 
code paths.
+   */
+  public static IdempotencyHandlerSupport disabled() {
+    IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport();
+    instance.configuration = DisabledConfiguration.INSTANCE;
+    return instance;
+  }
+
+  /** Returns {@code true} if handler-level idempotency is enabled. */
+  public boolean isEnabled() {
+    return configuration != null && configuration.enabled();
+  }
+
+  /**
+   * Reads and validates the idempotency key from the request headers using 
the deploy-time
+   * configured header name from {@link IdempotencyConfiguration#keyHeader()}.
+   *
+   * @return validated key, or {@link Optional#empty()} if {@code httpHeaders} 
is null, the header
+   *     is absent / blank, or idempotency is disabled
+   * @throws IllegalArgumentException if the header is present but not a valid 
UUIDv7 (callers
+   *     translate this into a 400 Bad Request)
+   */
+  public Optional<String> validatedKey(@Nullable HttpHeaders httpHeaders) {
+    if (httpHeaders == null) {
+      return Optional.empty();
+    }
+    return 
validatedKey(httpHeaders.getHeaderString(configuration.keyHeader()));
+  }
+
+  /**
+   * Validates a raw header value. UUIDv7 is required so that the key has 
enough entropy to be a
+   * meaningful idempotency boundary.
+   */
+  public Optional<String> validatedKey(@Nullable String headerValue) {
+    if (!isEnabled() || headerValue == null) {
+      return Optional.empty();
+    }
+    String trimmed = headerValue.trim();
+    if (trimmed.isEmpty()) {
+      return Optional.empty();
+    }
+    if (!UUID_V7_PATTERN.matcher(trimmed).matches()) {
+      throw new IllegalArgumentException(
+          "Idempotency-Key must be a UUIDv7; got: " + summarizeKey(trimmed));
+    }
+    try {
+      UUID.fromString(trimmed);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Idempotency-Key must be a valid 
UUID");
+    }
+    return Optional.of(trimmed.toLowerCase(Locale.ROOT));
+  }
+
+  /**
+   * SHA-256 (hex) of the calling principal's identity, bound to the realm. 
The input is the
+   * principal name, the realm id, and the activated role set, canonicalised 
so the hash is
+   * deterministic and order-independent.
+   *
+   * <p>Roles are part of the binding so two callers that share a name but 
differ in activated roles
+   * do not collide. Principal properties are intentionally excluded: they are 
admin-mutable and not
+   * authentication context.
+   */
+  public String principalHash(PolarisPrincipal principal, String realmId) {

Review Comment:
   This class has  a `RealmContext` now. Can we remove the `realmId` parameter?



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java:
##########
@@ -20,74 +20,36 @@
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.time.Instant;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import javax.sql.DataSource;
 import org.apache.polaris.core.entity.IdempotencyRecord;
 import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
 import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
 import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
 import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
-import 
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
 import org.apache.polaris.persistence.relational.jdbc.models.Converter;
 import 
org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
 import org.jspecify.annotations.NonNull;
 
+/**
+ * JDBC-backed {@link IdempotencyStore}.
+ *
+ * <p>Implements the "optimistic commit" model: a row is inserted only after 
the originating
+ * operation has finalized. Race conditions between concurrent retries are 
detected via the table's
+ * {@code (realm_id, idempotency_key)} primary key — a duplicate INSERT 
surfaces as a constraint
+ * violation, which we translate into a {@link RecordResultType#DUPLICATE} 
along with the existing
+ * row.
+ */
 public class RelationalJdbcIdempotencyStore implements IdempotencyStore {
 
   private final DatasourceOperations datasourceOperations;
 
-  public RelationalJdbcIdempotencyStore(
-      @NonNull DataSource dataSource, @NonNull RelationalJdbcConfiguration cfg)
-      throws SQLException {
-    this.datasourceOperations = new DatasourceOperations(dataSource, cfg);
-  }
-
-  @Override
-  public ReserveResult reserve(
-      String realmId,
-      String idempotencyKey,
-      String operationType,
-      String normalizedResourceId,
-      Instant expiresAt,
-      String executorId,
-      Instant now) {
-    try {
-      // Build insert values directly to avoid requiring an 
Immutables-generated model type.
-      Map<String, Object> insertMap = new LinkedHashMap<>();
-      insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
-      insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
-      insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
-      insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
-      insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
-      insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
-      insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null);
-      insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
-      insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
-      insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
-      insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now));
-      insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);
-      insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, 
Timestamp.from(expiresAt));
-
-      List<Object> values = insertMap.values().stream().toList();
-      QueryGenerator.PreparedQuery insert =
-          QueryGenerator.generateInsertQuery(
-              ModelIdempotencyRecord.ALL_COLUMNS,
-              ModelIdempotencyRecord.TABLE_NAME,
-              values,
-              realmId);
-      datasourceOperations.executeUpdate(insert);
-      return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
-    } catch (SQLException e) {
-      if (datasourceOperations.isUniquenessConstraintViolation(e)) {
-        return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, 
idempotencyKey));
-      }
-      throw new IdempotencyPersistenceException("Failed to reserve idempotency 
key", e);
-    }
+  public RelationalJdbcIdempotencyStore(@NonNull DatasourceOperations 
datasourceOperations) {
+    this.datasourceOperations = datasourceOperations;

Review Comment:
   `JdbcBasePersistenceImpl` takes a `realmId` as a constructor parameter and 
is effectively a request-scoped bean... Why not follow the same patterns here?



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Handler-side helper for the single-transaction ("optimistic commit") 
idempotency model.
+ *
+ * <p>Responsibilities:
+ *
+ * <ul>
+ *   <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 
only).
+ *   <li>Compute the principal/resource hashes that form the binding stored 
alongside each record.
+ *   <li>Pre-flight: look up an existing record for the same {@code (realm, 
key)} and dispatch into
+ *       {@link Outcome#owned()} or {@link 
Outcome#duplicate(IdempotencyRecord)} for the handler.
+ *   <li>Record the terminal outcome after a successful operation, returning 
{@link Outcome#owned()}
+ *       on win and {@link Outcome#duplicate(IdempotencyRecord)} on a 
race-driven duplicate.
+ * </ul>
+ *
+ * <p>This bean is {@link RequestScoped}: a single request operates within one 
realm, so the
+ * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link
+ * IdempotencyStoreFactory} for the request's {@link RealmContext}. When 
idempotency is disabled the
+ * store is never resolved — the bean is an inert shell. No response body is 
stored; duplicate
+ * responses are rebuilt from authoritative catalog state by the handler 
itself.
+ */
+@RequestScoped
+public class IdempotencyHandlerSupport {
+
+  // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+  private static final Pattern UUID_V7_PATTERN =
+      Pattern.compile(
+          
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+          Pattern.CASE_INSENSITIVE);
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject IdempotencyStoreFactory storeFactory;
+  @Inject RealmContext realmContext;
+  @Inject Clock clock;
+
+  // Resolved lazily on first use within the request; never resolved when 
idempotency is disabled.
+  private IdempotencyStore store;
+
+  /**
+   * Returns an instance with idempotency permanently disabled. Useful for 
test fixtures that need a
+   * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent 
code paths.
+   */
+  public static IdempotencyHandlerSupport disabled() {
+    IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport();
+    instance.configuration = DisabledConfiguration.INSTANCE;
+    return instance;
+  }
+
+  /** Returns {@code true} if handler-level idempotency is enabled. */
+  public boolean isEnabled() {
+    return configuration != null && configuration.enabled();
+  }
+
+  /**
+   * Reads and validates the idempotency key from the request headers using 
the deploy-time
+   * configured header name from {@link IdempotencyConfiguration#keyHeader()}.
+   *
+   * @return validated key, or {@link Optional#empty()} if {@code httpHeaders} 
is null, the header
+   *     is absent / blank, or idempotency is disabled
+   * @throws IllegalArgumentException if the header is present but not a valid 
UUIDv7 (callers
+   *     translate this into a 400 Bad Request)
+   */
+  public Optional<String> validatedKey(@Nullable HttpHeaders httpHeaders) {
+    if (httpHeaders == null) {
+      return Optional.empty();
+    }
+    return 
validatedKey(httpHeaders.getHeaderString(configuration.keyHeader()));
+  }
+
+  /**
+   * Validates a raw header value. UUIDv7 is required so that the key has 
enough entropy to be a
+   * meaningful idempotency boundary.
+   */
+  public Optional<String> validatedKey(@Nullable String headerValue) {
+    if (!isEnabled() || headerValue == null) {
+      return Optional.empty();
+    }
+    String trimmed = headerValue.trim();
+    if (trimmed.isEmpty()) {
+      return Optional.empty();
+    }
+    if (!UUID_V7_PATTERN.matcher(trimmed).matches()) {
+      throw new IllegalArgumentException(
+          "Idempotency-Key must be a UUIDv7; got: " + summarizeKey(trimmed));
+    }
+    try {
+      UUID.fromString(trimmed);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Idempotency-Key must be a valid 
UUID");
+    }
+    return Optional.of(trimmed.toLowerCase(Locale.ROOT));
+  }
+
+  /**
+   * SHA-256 (hex) of the calling principal's identity, bound to the realm. 
The input is the
+   * principal name, the realm id, and the activated role set, canonicalised 
so the hash is
+   * deterministic and order-independent.
+   *
+   * <p>Roles are part of the binding so two callers that share a name but 
differ in activated roles
+   * do not collide. Principal properties are intentionally excluded: they are 
admin-mutable and not
+   * authentication context.
+   */
+  public String principalHash(PolarisPrincipal principal, String realmId) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("name=").append(principal.getName()).append('|');
+    sb.append("realm=").append(realmId).append('|');
+    sb.append("roles=");
+    new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(','));
+    return sha256Hex(sb.toString());

Review Comment:
   Should this be performed by `PolarisPrincipal` itself with the contract that 
materially different principals yield different hashes?
   
   @adutra : WDYT?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -509,6 +689,64 @@ public LoadTableResponse createTableDirect(
     throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
   }
 
+  /**
+   * Replay path for an idempotent {@code createTableDirect}: load the 
existing table and rebuild a
+   * response with freshly-vended credentials for the current caller. No 
credentials from the
+   * original call are stored or returned.
+   *
+   * <p>Authorization is not repeated here: {@link 
#authorizeCreateTableDirect} already ran for the
+   * current caller in {@link #createTableDirect} before the idempotency 
lookup, and the duplicate
+   * was matched on the same {@code principalHash} and request binding.
+   *
+   * <p>The replay reflects <em>current</em> catalog state rather than the 
original response bytes
+   * (no response body is stored). To avoid silently returning a materially 
different table, if the
+   * table has advanced beyond the metadata location captured when the key was 
recorded, this raises
+   * 422 instead of returning divergent state.
+   */
+  private LoadTableResponse replayCreateTableDirect(
+      Namespace namespace,
+      CreateTableRequest request,
+      Optional<AccessDelegationMode> resolvedMode,
+      Optional<String> refreshCredentialsEndpoint,
+      IdempotencyRecord existing) {
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, 
request.name());
+    // If the original create succeeded but the table is no longer there 
(manual drop, retention,
+    // etc.), loadTable raises NoSuchTableException, surfacing the same 
not-found the client would
+    // get from a regular load.
+    Table table = baseCatalog.loadTable(tableIdentifier);
+    if (table instanceof BaseTable baseTable) {
+      TableMetadata tableMetadata = baseTable.operations().current();
+      if (existing.metadataLocation() != null
+          && 
!existing.metadataLocation().equals(tableMetadata.metadataFileLocation())) {
+        throw new UnprocessableEntityException(
+            "Idempotency-Key replay failed: table %s has changed since it was 
created with this key",
+            tableIdentifier);
+      }
+      return buildLoadTableResponseWithDelegationCredentials(
+              tableIdentifier,
+              tableMetadata,
+              resolvedMode,
+              Set.of(
+                  PolarisStorageActions.READ,
+                  PolarisStorageActions.WRITE,
+                  PolarisStorageActions.LIST),
+              refreshCredentialsEndpoint)
+          .build();
+    }
+    if (table instanceof BaseMetadataTable) {
+      throw notFoundExceptionForTableLikeEntity(
+          tableIdentifier, PolarisEntitySubType.ICEBERG_TABLE);
+    }
+    throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
+  }
+
+  private static String delegationModesToken(EnumSet<AccessDelegationMode> 
delegationModes) {

Review Comment:
   Should this not be delegated to `IdempotencyHandlerSupport` for the sake of 
consolidating all fundamental logic in one class?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +488,167 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    if (idempotencyKey.isEmpty() || !idempotencySupport().isEnabled()) {
+      return doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    }
+    return createTableDirectIdempotently(
+        namespace,
+        request,
+        delegationModes,
+        resolvedMode,
+        refreshCredentialsEndpoint,
+        idempotencyKey.get());
+  }
+
+  private LoadTableResponse createTableDirectIdempotently(
+      Namespace namespace,
+      CreateTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<AccessDelegationMode> resolvedMode,
+      Optional<String> refreshCredentialsEndpoint,
+      String idempotencyKey) {
+
+    String operationType = "create-table";
+    String principalHash =
+        idempotencySupport().principalHash(polarisPrincipal(), 
realmContext().getRealmIdentifier());
+    String resourceHash =
+        idempotencySupport()
+            .resourceHash(
+                "create-table:"
+                    + namespace.toString()
+                    + ":"
+                    + request.name()
+                    + ":"
+                    + delegationModesToken(delegationModes));

Review Comment:
   Why not use only `resolvedMode` here?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +488,167 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    if (idempotencyKey.isEmpty() || !idempotencySupport().isEnabled()) {
+      return doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    }
+    return createTableDirectIdempotently(
+        namespace,
+        request,
+        delegationModes,
+        resolvedMode,
+        refreshCredentialsEndpoint,
+        idempotencyKey.get());
+  }
+
+  private LoadTableResponse createTableDirectIdempotently(
+      Namespace namespace,
+      CreateTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<AccessDelegationMode> resolvedMode,
+      Optional<String> refreshCredentialsEndpoint,
+      String idempotencyKey) {
+
+    String operationType = "create-table";
+    String principalHash =
+        idempotencySupport().principalHash(polarisPrincipal(), 
realmContext().getRealmIdentifier());
+    String resourceHash =
+        idempotencySupport()
+            .resourceHash(
+                "create-table:"
+                    + namespace.toString()
+                    + ":"
+                    + request.name()
+                    + ":"
+                    + delegationModesToken(delegationModes));
+
+    // Pre-flight: replay a prior success, or 422 on a binding mismatch (same 
key, different
+    // request/principal).
+    try {
+      IdempotencyHandlerSupport.Outcome preflight =
+          idempotencySupport()
+              .preflight(idempotencyKey, operationType, resourceHash, 
principalHash);
+      if (preflight instanceof IdempotencyHandlerSupport.Outcome.Duplicate 
dup) {
+        return replayCreateTableDirect(
+            namespace, request, resolvedMode, refreshCredentialsEndpoint, 
dup.existing());
+      }
+    } catch (IdempotencyHandlerSupport.ConflictException e) {
+      throw new UnprocessableEntityException("%s", e.getMessage());
+    }
+
+    // Run the operation. A concurrent request carrying the same key can win 
the catalog-level race
+    // and make this attempt fail with AlreadyExistsException; if that winner 
recorded a matching
+    // idempotency outcome, replay it instead of returning a 409.
+    LoadTableResponse response;
+    try {
+      response = doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    } catch (AlreadyExistsException e) {
+      try {
+        Optional<IdempotencyRecord> raceWinner =
+            resolveConcurrentDuplicate(idempotencyKey, operationType, 
resourceHash, principalHash);
+        if (raceWinner.isPresent()) {
+          return replayCreateTableDirect(

Review Comment:
   Would it make sense to refactor `loadTable(...)` call it here, and add a new 
optional parameter for the exact metadata location check?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +488,167 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    if (idempotencyKey.isEmpty() || !idempotencySupport().isEnabled()) {
+      return doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    }
+    return createTableDirectIdempotently(
+        namespace,
+        request,
+        delegationModes,
+        resolvedMode,
+        refreshCredentialsEndpoint,
+        idempotencyKey.get());
+  }
+
+  private LoadTableResponse createTableDirectIdempotently(
+      Namespace namespace,
+      CreateTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<AccessDelegationMode> resolvedMode,
+      Optional<String> refreshCredentialsEndpoint,
+      String idempotencyKey) {
+
+    String operationType = "create-table";

Review Comment:
   enum?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to