dimas-b commented on code in PR #4659:
URL: https://github.com/apache/polaris/pull/4659#discussion_r3397982566
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -509,6 +662,75 @@ public LoadTableResponse createTableDirect(
throw new IllegalStateException("Cannot wrap catalog that does not produce
BaseTable");
}
+ /**
+ * Replay path for an idempotent {@code createTableDirect}: load the
existing table and rebuild a
+ * response with freshly-vended credentials for the current caller. No
credentials from the
+ * original call are stored or returned.
+ *
+ * <p>Authorization is not repeated here: {@link
#authorizeCreateTableDirect} already ran for the
+ * current caller in {@link #createTableDirect} before the idempotency
lookup, and the duplicate
+ * was matched on the same {@code principalHash} and request binding.
+ *
+ * <p>The replay reflects <em>current</em> catalog state rather than the
original response bytes
+ * (no response body is stored). To avoid silently returning a materially
different table, if the
+ * table has advanced beyond the metadata location captured when the key was
recorded, this raises
+ * 422 instead of returning divergent state.
+ */
+ private LoadTableResponse replayCreateTableDirect(
Review Comment:
This method is called from only one place is merely redirects again to
`buildLoadTableResponseForExistingTable()`. Why not inline it?
##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.DigestUtils;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Handler-side helper for the single-transaction ("optimistic commit")
idempotency model.
+ *
+ * <p>Responsibilities:
+ *
+ * <ul>
+ * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7
only).
+ * <li>Compute the principal/resource hashes that form the binding stored
alongside each record.
+ * <li>Pre-flight: look up an existing record for the same {@code (realm,
key)} and dispatch into
+ * {@link Outcome#owned()} or {@link
Outcome#duplicate(IdempotencyRecord)} for the handler.
+ * <li>Record the terminal outcome after a successful operation, returning
{@link Outcome#owned()}
+ * on win and {@link Outcome#duplicate(IdempotencyRecord)} on a
race-driven duplicate.
+ * </ul>
+ *
+ * <p>This bean is {@link RequestScoped}: a single request operates within one
realm, so the
+ * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link
+ * IdempotencyStoreFactory} for the request's {@link RealmContext}. When
idempotency is disabled the
+ * store is never resolved — the bean is an inert shell. No response body is
stored; duplicate
+ * responses are rebuilt from authoritative catalog state by the handler
itself.
+ */
+@RequestScoped
+public class IdempotencyHandlerSupport {
+
+ // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+ private static final Pattern UUID_V7_PATTERN =
+ Pattern.compile(
+
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+ Pattern.CASE_INSENSITIVE);
+
+ @Inject IdempotencyConfiguration configuration;
+ @Inject IdempotencyStoreFactory storeFactory;
+ @Inject RealmContext realmContext;
+ @Inject Clock clock;
+
+ // Resolved lazily on first use within the request; never resolved when
idempotency is disabled.
+ private IdempotencyStore store;
+
+ /**
+ * Returns an instance with idempotency permanently disabled. Useful for
test fixtures that need a
+ * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent
code paths.
+ */
+ public static IdempotencyHandlerSupport disabled() {
+ IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport();
+ instance.configuration = DisabledConfiguration.INSTANCE;
+ return instance;
+ }
+
+ /** Returns {@code true} if handler-level idempotency is enabled. */
+ public boolean isEnabled() {
+ return configuration != null && configuration.enabled();
+ }
+
+ /**
+ * Reads and validates the idempotency key from the request headers using
the deploy-time
+ * configured header name from {@link IdempotencyConfiguration#keyHeader()}.
+ *
+ * @return validated key, or {@link Optional#empty()} if {@code httpHeaders}
is null, the header
+ * is absent / blank, or idempotency is disabled
+ * @throws IllegalArgumentException if the header is present but not a valid
UUIDv7 (callers
+ * translate this into a 400 Bad Request)
+ */
+ public Optional<String> validatedKey(@Nullable HttpHeaders httpHeaders) {
+ if (httpHeaders == null) {
+ return Optional.empty();
+ }
+ return
validatedKey(httpHeaders.getHeaderString(configuration.keyHeader()));
+ }
+
+ /**
+ * Validates a raw header value. UUIDv7 is required so that the key has
enough entropy to be a
+ * meaningful idempotency boundary.
+ */
+ public Optional<String> validatedKey(@Nullable String headerValue) {
+ if (!isEnabled() || headerValue == null) {
+ return Optional.empty();
+ }
+ String trimmed = headerValue.trim();
+ if (trimmed.isEmpty()) {
+ return Optional.empty();
+ }
+ if (!UUID_V7_PATTERN.matcher(trimmed).matches()) {
Review Comment:
Isn't it easier to check `UUID.variant()`? Note that all actual call paths
leading here already have the `UUID` object.
##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.DigestUtils;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Handler-side helper for the single-transaction ("optimistic commit")
idempotency model.
+ *
+ * <p>Responsibilities:
+ *
+ * <ul>
+ * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7
only).
+ * <li>Compute the principal/resource hashes that form the binding stored
alongside each record.
+ * <li>Pre-flight: look up an existing record for the same {@code (realm,
key)} and dispatch into
+ * {@link Outcome#owned()} or {@link
Outcome#duplicate(IdempotencyRecord)} for the handler.
+ * <li>Record the terminal outcome after a successful operation, returning
{@link Outcome#owned()}
+ * on win and {@link Outcome#duplicate(IdempotencyRecord)} on a
race-driven duplicate.
+ * </ul>
+ *
+ * <p>This bean is {@link RequestScoped}: a single request operates within one
realm, so the
+ * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link
+ * IdempotencyStoreFactory} for the request's {@link RealmContext}. When
idempotency is disabled the
+ * store is never resolved — the bean is an inert shell. No response body is
stored; duplicate
+ * responses are rebuilt from authoritative catalog state by the handler
itself.
+ */
+@RequestScoped
+public class IdempotencyHandlerSupport {
+
+ // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+ private static final Pattern UUID_V7_PATTERN =
+ Pattern.compile(
+
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+ Pattern.CASE_INSENSITIVE);
+
+ @Inject IdempotencyConfiguration configuration;
+ @Inject IdempotencyStoreFactory storeFactory;
+ @Inject RealmContext realmContext;
+ @Inject Clock clock;
+
+ // Resolved lazily on first use within the request; never resolved when
idempotency is disabled.
+ private IdempotencyStore store;
+
+ /**
+ * Returns an instance with idempotency permanently disabled. Useful for
test fixtures that need a
+ * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent
code paths.
+ */
+ public static IdempotencyHandlerSupport disabled() {
+ IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport();
+ instance.configuration = DisabledConfiguration.INSTANCE;
+ return instance;
+ }
+
+ /** Returns {@code true} if handler-level idempotency is enabled. */
+ public boolean isEnabled() {
+ return configuration != null && configuration.enabled();
+ }
+
+ /**
+ * Reads and validates the idempotency key from the request headers using
the deploy-time
+ * configured header name from {@link IdempotencyConfiguration#keyHeader()}.
+ *
+ * @return validated key, or {@link Optional#empty()} if {@code httpHeaders}
is null, the header
+ * is absent / blank, or idempotency is disabled
+ * @throws IllegalArgumentException if the header is present but not a valid
UUIDv7 (callers
+ * translate this into a 400 Bad Request)
+ */
+ public Optional<String> validatedKey(@Nullable HttpHeaders httpHeaders) {
Review Comment:
Never called?
##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java:
##########
@@ -16,152 +16,103 @@
*/
package org.apache.polaris.core.persistence;
+import com.google.common.annotations.Beta;
import java.time.Instant;
import java.util.Optional;
import org.apache.polaris.core.entity.IdempotencyRecord;
/**
- * Abstraction for persisting and querying idempotency records.
+ * SPI for persisting and looking up handler-level idempotency records.
*
- * <p>An {@link IdempotencyStore} is responsible for:
+ * <p>Polaris uses an "optimistic commit" model: a record is inserted only
<em>after</em> the
+ * originating operation has reached a terminal HTTP status. The SPI exposes
just two state
+ * transitions:
*
* <ul>
- * <li>Reserving an idempotency key for a particular operation and resource
- * <li>Recording completion status and response metadata
- * <li>Allowing callers to look up existing records to detect duplicates
- * <li>Expiring and purging old reservations
+ * <li>{@link #load(String)} to detect a duplicate request before doing any
work;
+ * <li>{@link #recordIfAbsent(String, String, String, String, int, String,
Instant, Instant)} to
+ * atomically insert the record after the operation has finalized,
returning {@link
+ * RecordResult#OWNED} when the caller wins the race and {@link
RecordResult#DUPLICATE} (with
+ * the existing record) when another caller raced ahead.
* </ul>
*
- * <p>Implementations must be thread-safe if used concurrently.
+ * <p>There is no in-progress / lease / heartbeat state in this design, and no
response body is
+ * stored — duplicate requests rebuild an equivalent response from
authoritative catalog state.
+ *
+ * <p>The handler-level idempotency design always runs after authorization, so
the store does not
+ * need to enforce identity itself; it only needs to persist {@code
principalHash} so the handler
+ * can validate it on replay and reject cross-principal cache hits.
+ *
+ * <p>A store instance is bound to a single realm at construction (see {@link
+ * IdempotencyStoreFactory#getOrCreateIdempotencyStore}), so the realm is not
part of any method
+ * signature. Implementations must be thread-safe.
*/
+@Beta
public interface IdempotencyStore {
- /** High-level outcome of attempting to reserve an idempotency key. */
- enum ReserveResultType {
- /** The caller successfully acquired ownership of the idempotency key. */
+ /** Outcome of an attempted record insertion. */
+ enum RecordResultType {
+ /** The caller successfully inserted a new idempotency record. */
OWNED,
- /** A reservation already exists for the key; the caller does not own it.
*/
+ /** A record already exists for the same key in this realm; the caller did
not insert. */
DUPLICATE
}
/**
- * Result of attempting to update the heartbeat for an in-progress
idempotency record.
- *
- * <p>This allows callers to distinguish between different "no update"
scenarios instead of
- * overloading them into a boolean.
- */
- enum HeartbeatResult {
- /** The heartbeat was successfully updated for the in-progress record
owned by this executor. */
- UPDATED,
-
- /**
- * The idempotency record exists but has already been finalized with an
HTTP status; there is no
- * longer an in-progress reservation to heartbeat.
- */
- FINALIZED,
-
- /**
- * No idempotency record exists for the specified realm and key. This can
happen if the record
- * was never created or has already been purged.
- */
- NOT_FOUND,
-
- /**
- * An in-progress idempotency record exists for the key, but it is owned
by a different
- * executor. The caller should stop heartbeating as it no longer owns the
reservation.
- */
- LOST_OWNERSHIP
- }
-
- /**
- * Result of a {@link #reserve(String, String, String, String, Instant,
String, Instant)} call,
- * including the outcome and, when applicable, the existing idempotency
record.
- *
- * @param type outcome of the reservation attempt
- * @param existing existing idempotency record, when {@link #type ()} is
{@link
- * ReserveResultType#DUPLICATE}, otherwise {@link Optional#empty()}.
- */
- record ReserveResult(ReserveResultType type, Optional<IdempotencyRecord>
existing) {}
-
- /**
- * Attempts to reserve an idempotency key for a given operation and resource.
- *
- * <p>If no record exists yet, the implementation should create a new
reservation owned by {@code
- * executorId}. If a record already exists, the implementation should return
{@link
- * ReserveResultType#DUPLICATE} along with the existing record.
- *
- * @param realmId logical tenant or realm identifier
- * @param idempotencyKey application-provided idempotency key
- * @param operationType logical operation name (e.g., {@code "commit-table"})
- * @param normalizedResourceId normalized identifier of the affected resource
- * @param expiresAt timestamp after which the reservation is considered
expired
- * @param executorId identifier of the caller attempting the reservation
- * @param now timestamp representing the current time
- * @return {@link ReserveResult} describing whether the caller owns the
reservation or hit a
- * duplicate
- */
- ReserveResult reserve(
- String realmId,
- String idempotencyKey,
- String operationType,
- String normalizedResourceId,
- Instant expiresAt,
- String executorId,
- Instant now);
-
- /**
- * Loads an existing idempotency record for the given realm and key, if
present.
+ * Result of {@link #recordIfAbsent(String, String, String, String, int,
String, Instant,
+ * Instant)}, including the outcome and, when {@link
RecordResultType#DUPLICATE}, the existing
+ * record so the caller can compare bindings without an extra round-trip.
*
- * @param realmId logical tenant or realm identifier
- * @param idempotencyKey application-provided idempotency key
- * @return the corresponding {@link IdempotencyRecord}, if it exists
+ * <p>Invariant: when {@link #type()} is {@link RecordResultType#DUPLICATE},
{@link #existing()}
+ * is always present. Implementations that cannot reload the conflicting
record must raise an
+ * exception rather than return a {@code DUPLICATE} with an empty {@code
existing}.
*/
- Optional<IdempotencyRecord> load(String realmId, String idempotencyKey);
+ record RecordResult(RecordResultType type, Optional<IdempotencyRecord>
existing) {}
/**
- * Updates the heartbeat for an in-progress reservation to indicate that the
executor is still
- * actively processing.
+ * Loads an existing record for the given key in this store's realm, if
present.
*
- * @param realmId logical tenant or realm identifier
* @param idempotencyKey application-provided idempotency key
- * @param executorId identifier of the executor that owns the reservation
- * @param now timestamp representing the current time
- * @return {@link HeartbeatResult} describing whether the heartbeat was
updated or why it was not
*/
- HeartbeatResult updateHeartbeat(
- String realmId, String idempotencyKey, String executorId, Instant now);
+ Optional<IdempotencyRecord> load(String idempotencyKey);
/**
- * Marks an idempotency record as finalized, recording HTTP status and
response metadata.
+ * Atomically inserts an idempotency record if no record exists yet for
{@code idempotencyKey} in
+ * this store's realm; otherwise returns the existing record.
*
- * <p>Implementations should be tolerant of idempotent re-finalization
attempts and typically
- * return {@code false} when a record was already finalized.
+ * <p>This is the only "write" path in the SPI. It is invoked after the
originating operation has
+ * succeeded, so {@code httpStatus} is always set.
*
- * @param realmId logical tenant or realm identifier
* @param idempotencyKey application-provided idempotency key
- * @param httpStatus HTTP status code returned to the client, or {@code
null} if not applicable
- * @param errorSubtype optional error subtype or code, if the operation
failed
- * @param responseSummary short, serialized representation of the response
body
- * @param responseHeaders serialized representation of response headers
- * @param finalizedAt timestamp when the operation completed
- * @return {@code true} if the record was transitioned to a finalized state,
{@code false}
- * otherwise
+ * @param operationType logical operation name (e.g. {@code "create-table"})
+ * @param resourceHash opaque hash of the request-derived resource binding
(not a human-readable
+ * identifier, and not the request payload); persisted so replay can
detect reuse of the same
+ * key for a different resource
+ * @param principalHash hash of the caller principal identity; persisted so
replay can verify the
+ * same caller and reject cross-principal cache hits
+ * @param httpStatus HTTP status code returned to the client
+ * @param metadataLocation resource state pointer captured at record time
(for tables, the
+ * metadata-file location); used on replay to detect the resource
advancing beyond the
+ * originally-created state; may be {@code null}
+ * @param createdAt timestamp representing when the record was created
+ * @param expiresAt timestamp after which the record is eligible for purging
*/
- boolean finalizeRecord(
- String realmId,
+ RecordResult recordIfAbsent(
String idempotencyKey,
Review Comment:
The Idempotency Key is required to be a UUID at the API layer, why not use
`UUID` as the parameter type here?
I think it would improve clarity.
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +490,138 @@ public void authorizeCreateTableDirect(
}
}
+ /**
+ * Create a table, optionally honoring an {@code Idempotency-Key} from the
REST request.
+ *
+ * <p>When an idempotency key is supplied and the feature is enabled:
+ *
+ * <ol>
+ * <li>Authorization runs first, so idempotency cannot bypass it.
+ * <li>Pre-flight loads any prior record for {@code (realm, key)}. A match
(same caller, same
+ * resource binding) replays the response from authoritative catalog
state; a binding
+ * mismatch raises 422.
+ * <li>On a fresh key, the table is created and the record is inserted
afterwards. A concurrent
+ * caller that wins the race causes our insert to return DUPLICATE —
we then replay too, so
+ * the response is equivalent to what the winner returned.
+ * </ol>
+ *
+ * <p>No response body is stored. Replays go through {@code loadTable +
+ * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh
credentials for the
+ * current caller.
+ */
public LoadTableResponse createTableDirect(
Namespace namespace,
CreateTableRequest request,
EnumSet<AccessDelegationMode> delegationModes,
- Optional<String> refreshCredentialsEndpoint) {
+ Optional<String> refreshCredentialsEndpoint,
+ Optional<String> idempotencyKey) {
authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
Optional<AccessDelegationMode> resolvedMode =
resolveAccessDelegationModes(delegationModes);
+ // Idempotency is opt-in per request (key present) and per deployment
(feature enabled). When
+ // off, the pre-flight / race-resolution / record steps below are simply
skipped, leaving the
+ // plain create path. A binding mismatch surfaces as
IdempotencyConflictException, which
+ // IcebergExceptionMapper maps to HTTP 422 — no inline translation needed.
+ boolean idempotent = idempotencyKey.isPresent() &&
idempotencySupport().isEnabled();
+ IdempotentOperation operation = IdempotentOperation.CREATE_TABLE;
+ String key = idempotencyKey.orElse(null);
+ String principalHash =
+ idempotent ? idempotencySupport().principalHash(polarisPrincipal()) :
null;
+ String resourceHash =
+ idempotent
+ ? idempotencySupport()
+ .resourceHash(
+ operation,
+ namespace.toString(),
+ request.name(),
+ resolvedMode.map(Enum::name).orElse("none"))
+ : null;
+
+ // Pre-flight: replay a prior success for this key.
+ if (idempotent) {
+ IdempotencyHandlerSupport.Outcome preflight =
+ idempotencySupport().preflight(key, operation, resourceHash,
principalHash);
Review Comment:
Suggestion: pass all the inputs required for this decision into preflight()
, i.e. principal, operation, request, etc... do not perform any extra work like
hashing upfront but always call preflight(), which will compute hashes, etc.
If idempotency is disabled if will return something like `Outcome.Disabled`.
Basically `Outcome` will enrich and replace the `idempotent` flag.
Actually, the `Outcome` can probably be a top-level class (e.g.
`IdempotencyOutcome`) because it is used in the main "logic" here.
WDYT?
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +490,138 @@ public void authorizeCreateTableDirect(
}
}
+ /**
+ * Create a table, optionally honoring an {@code Idempotency-Key} from the
REST request.
+ *
+ * <p>When an idempotency key is supplied and the feature is enabled:
+ *
+ * <ol>
+ * <li>Authorization runs first, so idempotency cannot bypass it.
+ * <li>Pre-flight loads any prior record for {@code (realm, key)}. A match
(same caller, same
+ * resource binding) replays the response from authoritative catalog
state; a binding
+ * mismatch raises 422.
+ * <li>On a fresh key, the table is created and the record is inserted
afterwards. A concurrent
+ * caller that wins the race causes our insert to return DUPLICATE —
we then replay too, so
+ * the response is equivalent to what the winner returned.
+ * </ol>
+ *
+ * <p>No response body is stored. Replays go through {@code loadTable +
+ * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh
credentials for the
+ * current caller.
+ */
public LoadTableResponse createTableDirect(
Namespace namespace,
CreateTableRequest request,
EnumSet<AccessDelegationMode> delegationModes,
- Optional<String> refreshCredentialsEndpoint) {
+ Optional<String> refreshCredentialsEndpoint,
+ Optional<String> idempotencyKey) {
authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
Optional<AccessDelegationMode> resolvedMode =
resolveAccessDelegationModes(delegationModes);
+ // Idempotency is opt-in per request (key present) and per deployment
(feature enabled). When
+ // off, the pre-flight / race-resolution / record steps below are simply
skipped, leaving the
+ // plain create path. A binding mismatch surfaces as
IdempotencyConflictException, which
+ // IcebergExceptionMapper maps to HTTP 422 — no inline translation needed.
+ boolean idempotent = idempotencyKey.isPresent() &&
idempotencySupport().isEnabled();
+ IdempotentOperation operation = IdempotentOperation.CREATE_TABLE;
+ String key = idempotencyKey.orElse(null);
+ String principalHash =
+ idempotent ? idempotencySupport().principalHash(polarisPrincipal()) :
null;
+ String resourceHash =
+ idempotent
+ ? idempotencySupport()
+ .resourceHash(
+ operation,
+ namespace.toString(),
+ request.name(),
+ resolvedMode.map(Enum::name).orElse("none"))
+ : null;
+
+ // Pre-flight: replay a prior success for this key.
+ if (idempotent) {
+ IdempotencyHandlerSupport.Outcome preflight =
+ idempotencySupport().preflight(key, operation, resourceHash,
principalHash);
+ if (preflight instanceof IdempotencyHandlerSupport.Outcome.Duplicate
dup) {
+ return replayCreateTableDirect(
+ namespace, request, resolvedMode, refreshCredentialsEndpoint,
dup.existing());
+ }
+ }
+
+ // Run the operation. A concurrent request carrying the same key can win
the catalog-level race
+ // and make this attempt fail with AlreadyExistsException; if that winner
recorded a matching
+ // idempotency outcome, replay it instead of returning a 409.
+ LoadTableResponse response;
+ try {
+ response = doCreateTableDirect(namespace, request, resolvedMode,
refreshCredentialsEndpoint);
+ } catch (AlreadyExistsException e) {
+ if (idempotent) {
+ Optional<IdempotencyRecord> raceWinner =
+ resolveConcurrentDuplicate(key, operation, resourceHash,
principalHash);
+ if (raceWinner.isPresent()) {
+ return replayCreateTableDirect(
+ namespace, request, resolvedMode, refreshCredentialsEndpoint,
raceWinner.get());
+ }
+ }
+ // Not a same-key retry: the table genuinely pre-existed, so this is a
real conflict.
+ throw e;
+ }
+
+ // Record the successful outcome. If a concurrent caller recorded first,
replay theirs.
+ if (idempotent) {
+ String metadataLocation =
response.tableMetadata().metadataFileLocation();
+ IdempotencyHandlerSupport.Outcome recordOutcome =
+ idempotencySupport()
+ .recordOutcome(key, operation, resourceHash, principalHash, 200,
metadataLocation);
+ if (recordOutcome instanceof IdempotencyHandlerSupport.Outcome.Duplicate
dup) {
+ // Another caller raced ahead and recorded first. Replay so the
response is the same shape
+ // (and credentials are freshly vended for this caller) as what the
race winner returned.
+ return replayCreateTableDirect(
+ namespace, request, resolvedMode, refreshCredentialsEndpoint,
dup.existing());
+ }
+ }
+ return response;
+ }
+
+ /**
+ * After a concurrent {@code createTable} loses the catalog race
(AlreadyExistsException), checks
+ * whether the race winner recorded a matching idempotency outcome so this
caller can replay it.
+ *
+ * <p>The winner records its outcome only after committing the table, so a
loser may observe the
+ * conflict slightly before the record is visible; this retries the lookup a
bounded number of
+ * times. A binding mismatch surfaces as {@link
IdempotencyConflictException} (mapped to 422).
+ * Returns empty if no matching record appears, meaning the conflict was a
genuine pre-existing
+ * table rather than a same-key retry.
+ */
+ private Optional<IdempotencyRecord> resolveConcurrentDuplicate(
Review Comment:
It feel like this method naturally belongs in `IdempotencyHandlerSupport`.
We could define a light-weight (immutable) holder object for its parameters
for ease of invocation / sharing with `preflight()`... WDYT?
##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Handler-side helper for the single-transaction ("optimistic commit")
idempotency model.
+ *
+ * <p>Responsibilities:
+ *
+ * <ul>
+ * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7
only).
+ * <li>Compute the principal/resource hashes that form the binding stored
alongside each record.
+ * <li>Pre-flight: look up an existing record for the same {@code (realm,
key)} and dispatch into
+ * {@link Outcome#owned()} or {@link
Outcome#duplicate(IdempotencyRecord)} for the handler.
+ * <li>Record the terminal outcome after a successful operation, returning
{@link Outcome#owned()}
+ * on win and {@link Outcome#duplicate(IdempotencyRecord)} on a
race-driven duplicate.
+ * </ul>
+ *
+ * <p>This bean is {@link RequestScoped}: a single request operates within one
realm, so the
+ * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link
+ * IdempotencyStoreFactory} for the request's {@link RealmContext}. When
idempotency is disabled the
+ * store is never resolved — the bean is an inert shell. No response body is
stored; duplicate
+ * responses are rebuilt from authoritative catalog state by the handler
itself.
+ */
+@RequestScoped
+public class IdempotencyHandlerSupport {
+
+ // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+ private static final Pattern UUID_V7_PATTERN =
+ Pattern.compile(
+
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+ Pattern.CASE_INSENSITIVE);
+
+ @Inject IdempotencyConfiguration configuration;
+ @Inject IdempotencyStoreFactory storeFactory;
+ @Inject RealmContext realmContext;
+ @Inject Clock clock;
+
+ // Resolved lazily on first use within the request; never resolved when
idempotency is disabled.
+ private IdempotencyStore store;
+
+ /**
+ * Returns an instance with idempotency permanently disabled. Useful for
test fixtures that need a
+ * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent
code paths.
+ */
+ public static IdempotencyHandlerSupport disabled() {
+ IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport();
+ instance.configuration = DisabledConfiguration.INSTANCE;
+ return instance;
+ }
+
+ /** Returns {@code true} if handler-level idempotency is enabled. */
+ public boolean isEnabled() {
+ return configuration != null && configuration.enabled();
+ }
+
+ /**
+ * Reads and validates the idempotency key from the request headers using
the deploy-time
+ * configured header name from {@link IdempotencyConfiguration#keyHeader()}.
+ *
+ * @return validated key, or {@link Optional#empty()} if {@code httpHeaders}
is null, the header
+ * is absent / blank, or idempotency is disabled
+ * @throws IllegalArgumentException if the header is present but not a valid
UUIDv7 (callers
+ * translate this into a 400 Bad Request)
+ */
+ public Optional<String> validatedKey(@Nullable HttpHeaders httpHeaders) {
+ if (httpHeaders == null) {
+ return Optional.empty();
+ }
+ return
validatedKey(httpHeaders.getHeaderString(configuration.keyHeader()));
+ }
+
+ /**
+ * Validates a raw header value. UUIDv7 is required so that the key has
enough entropy to be a
+ * meaningful idempotency boundary.
+ */
+ public Optional<String> validatedKey(@Nullable String headerValue) {
+ if (!isEnabled() || headerValue == null) {
+ return Optional.empty();
+ }
+ String trimmed = headerValue.trim();
+ if (trimmed.isEmpty()) {
+ return Optional.empty();
+ }
+ if (!UUID_V7_PATTERN.matcher(trimmed).matches()) {
+ throw new IllegalArgumentException(
+ "Idempotency-Key must be a UUIDv7; got: " + summarizeKey(trimmed));
+ }
+ try {
+ UUID.fromString(trimmed);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Idempotency-Key must be a valid
UUID");
+ }
+ return Optional.of(trimmed.toLowerCase(Locale.ROOT));
+ }
+
+ /**
+ * SHA-256 (hex) of the calling principal's identity, bound to the realm.
The input is the
+ * principal name, the realm id, and the activated role set, canonicalised
so the hash is
+ * deterministic and order-independent.
+ *
+ * <p>Roles are part of the binding so two callers that share a name but
differ in activated roles
+ * do not collide. Principal properties are intentionally excluded: they are
admin-mutable and not
+ * authentication context.
+ */
+ public String principalHash(PolarisPrincipal principal, String realmId) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("name=").append(principal.getName()).append('|');
+ sb.append("realm=").append(realmId).append('|');
+ sb.append("roles=");
+ new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(','));
+ return sha256Hex(sb.toString());
Review Comment:
(posting this comment just to group unresolved issues under the latest
review).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]