dimas-b commented on code in PR #4659: URL: https://github.com/apache/polaris/pull/4659#discussion_r3403946535
########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import jakarta.annotation.Nullable; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.polaris.core.DigestUtils; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; + +/** + * Handler-side helper for the single-transaction ("optimistic commit") idempotency model. + * + * <p>Responsibilities: + * + * <ul> + * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 only). + * <li>Compute the principal/resource hashes that form the binding stored alongside each record. + * <li>Pre-flight: from the raw request inputs, decide whether idempotency applies and, if so, + * look up an existing record for the same {@code (realm, key)}, returning an {@link + * IdempotencyOutcome} for the handler to branch on. + * <li>Record the terminal outcome after a successful operation, returning {@link + * IdempotencyOutcome.Owned} on win and {@link IdempotencyOutcome.Duplicate} on a race-driven + * duplicate. + * <li>Resolve a concurrent create-table race by polling for the winner's record. + * </ul> + * + * <p>This bean is {@link RequestScoped}: a single request operates within one realm, so the + * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link + * IdempotencyStoreFactory} for the request's {@link RealmContext}. When idempotency is disabled the + * store is never resolved — the bean is an inert shell. No response body is stored; duplicate + * responses are rebuilt from authoritative catalog state by the handler itself. + */ +@RequestScoped +public class IdempotencyHandlerSupport { + + // Bounded lookup for the idempotency record written by a concurrent create-table race winner, + // which records only after committing the table (so a loser may briefly not see it yet). Backoff + // is exponential: 5+10+20+40+80 = 155ms total budget across 5 attempts. If the winner's record is + // still not visible after that (e.g. a long GC pause or slow store write), the original 409 + // surfaces instead of a replay — correct but not ideal; widen the budget if this proves too + // tight. + private static final int CONCURRENT_REPLAY_MAX_ATTEMPTS = 5; + private static final long CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS = 5; + + @Inject IdempotencyConfiguration configuration; + @Inject IdempotencyStoreFactory storeFactory; + @Inject RealmContext realmContext; + @Inject Clock clock; + + // Resolved lazily on first use within the request; never resolved when idempotency is disabled. + private IdempotencyStore store; + + /** + * Returns an instance with idempotency permanently disabled. Useful for test fixtures that need a + * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent code paths. + */ + public static IdempotencyHandlerSupport disabled() { + IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport(); + instance.configuration = DisabledConfiguration.INSTANCE; + return instance; + } + + /** Returns {@code true} if handler-level idempotency is enabled. */ + public boolean isEnabled() { + return configuration != null && configuration.enabled(); + } + + /** + * Validates that the {@code Idempotency-Key} is a UUIDv7. UUIDv7 is required so the key carries a + * timestamp and enough entropy to be a meaningful idempotency boundary. The header is already + * parsed into a {@link UUID} upstream (the JAX-RS binding), so this only enforces the version. + * + * @return the key, or {@link Optional#empty()} if {@code idempotencyKey} is null or idempotency + * is disabled + * @throws IllegalArgumentException if the key is not a UUIDv7 (callers translate this into a 400 + * Bad Request) + */ + public Optional<UUID> validatedKey(@Nullable UUID idempotencyKey) { + if (!isEnabled() || idempotencyKey == null) { + return Optional.empty(); + } + // RFC 9562 UUIDv7: version 7 with the RFC 4122/9562 variant (2). + if (idempotencyKey.version() != 7 || idempotencyKey.variant() != 2) { + throw new IllegalArgumentException( + "Idempotency-Key must be a UUIDv7; got: " + idempotencyKey); + } + return Optional.of(idempotencyKey); + } + + /** + * SHA-256 (hex) of the calling principal's identity, bound to the request's realm. The input is + * the principal name, the realm id, and the activated role set, canonicalised so the hash is + * deterministic and order-independent. + * + * <p>Roles are part of the binding so two callers that share a name but differ in activated roles + * do not collide. Principal properties are intentionally excluded: they are admin-mutable and not + * authentication context. + */ + public String principalHash(PolarisPrincipal principal) { + StringBuilder sb = new StringBuilder(); + sb.append("name=").append(principal.getName()).append('|'); + sb.append("realm=").append(realmContext.getRealmIdentifier()).append('|'); + sb.append("roles="); + new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(',')); + return DigestUtils.sha256Hex(sb.toString()); + } + + /** + * SHA-256 hex of the resource-binding component, built from the operation and its stable resource + * identity components (e.g. namespace, name, resolved access-delegation mode). The canonical + * format lives here so all binding/hashing logic stays in one place; callers only supply the + * identity components. The request payload itself is intentionally not part of the binding. + */ + public String resourceHash(IdempotentOperation operation, String... components) { + StringBuilder sb = new StringBuilder(operation.wireName()); + for (String component : components) { + sb.append(':').append(component == null ? "" : component); + } + return DigestUtils.sha256Hex(sb.toString()); + } + + /** + * Pre-flight: decide whether idempotency applies to this request and, if so, look up an existing + * record for {@code (realm, key)}. + * + * <p>All of the work lives here: if idempotency is disabled or no key was supplied, this returns + * {@link IdempotencyOutcome.Disabled} and the handler runs its plain path. Otherwise the + * principal/resource hashes are computed from the raw inputs and the store is consulted: + * + * <ul> + * <li>No record → {@link IdempotencyOutcome.Owned} (carrying the computed binding); the handler + * performs the operation and then calls {@link #recordOutcome}. + * <li>Matching record → {@link IdempotencyOutcome.Duplicate}; the handler rebuilds the response + * from current catalog state. + * <li>Record with a different binding → {@link IdempotencyConflictException} (mapped to 422). + * </ul> + * + * @param idempotencyKey the validated key, or {@link Optional#empty()} when absent/disabled + * @param principal the calling principal (hashed into the binding) + * @param operation the idempotent operation being attempted + * @param resourceComponents the stable resource-identity components (e.g. namespace, name, + * resolved access-delegation mode) + */ + public IdempotencyOutcome preflight( + Optional<UUID> idempotencyKey, + PolarisPrincipal principal, + IdempotentOperation operation, + String... resourceComponents) { + if (!isEnabled() || idempotencyKey.isEmpty()) { + return IdempotencyOutcome.disabled(); + } + IdempotencyOutcome.Owned owned = + IdempotencyOutcome.owned( + idempotencyKey.get(), + operation, + resourceHash(operation, resourceComponents), + principalHash(principal)); + return lookup(owned); + } + + /** + * Records the terminal outcome of an operation that just completed, using the binding carried by + * the {@link IdempotencyOutcome.Owned} returned from {@link #preflight}. + * + * <p>The insert is atomic on {@code (realm, key)}; if another caller raced ahead and inserted + * first, this returns {@link IdempotencyOutcome.Duplicate} carrying that existing record so the + * handler can rebuild an equivalent response from current state. Otherwise the same {@code owned} + * outcome is returned to signal a clean win. + */ + public IdempotencyOutcome recordOutcome( + IdempotencyOutcome.Owned owned, int httpStatus, @Nullable String metadataLocation) { + Instant now = clock.instant(); + Instant expiresAt = now.plus(configuration.ttl()); + IdempotencyStore.RecordResult result = + store() + .recordIfAbsent( + owned.idempotencyKey(), + owned.operation().wireName(), + owned.resourceHash(), + owned.principalHash(), + httpStatus, + metadataLocation, + now, + expiresAt); + if (result.type() == IdempotencyStore.RecordResultType.OWNED) { + return owned; + } + IdempotencyRecord existing = + result + .existing() + .orElseThrow(() -> new IllegalStateException("DUPLICATE result without record")); + return matchOrConflict(existing, owned); + } + + /** + * After a concurrent {@code createTable} loses the catalog race (AlreadyExistsException), checks + * whether the race winner recorded a matching idempotency outcome so this caller can replay it. + * + * <p>The winner records its outcome only after committing the table, so a loser may observe the + * conflict slightly before the record is visible; this polls a bounded number of times. A binding + * mismatch surfaces as {@link IdempotencyConflictException} (mapped to 422). Returns empty if no + * matching record appears, meaning the conflict was a genuine pre-existing table rather than a + * same-key retry. + */ + public Optional<IdempotencyRecord> resolveConcurrentDuplicate(IdempotencyOutcome.Owned owned) { + long backoffMillis = CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS; + for (int attempt = 0; attempt < CONCURRENT_REPLAY_MAX_ATTEMPTS; attempt++) { + if (lookup(owned) instanceof IdempotencyOutcome.Duplicate dup) { + return Optional.of(dup.existing()); + } + try { + Thread.sleep(backoffMillis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + backoffMillis *= 2; + } + return Optional.empty(); + } + + /** Single store lookup: returns {@code owned} if no record exists, else matches or conflicts. */ + private IdempotencyOutcome lookup(IdempotencyOutcome.Owned owned) { + Optional<IdempotencyRecord> existing = store().load(owned.idempotencyKey()); + if (existing.isEmpty()) { + return owned; Review Comment: The term "owned" feels a bit misleading in this context. How can this node own something that is not reflected in the store? How about `IdempotencyOutcome.New`? ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -460,14 +485,108 @@ public void authorizeCreateTableDirect( } } + /** + * Create a table, optionally honoring an {@code Idempotency-Key} from the REST request. + * + * <p>When an idempotency key is supplied and the feature is enabled: + * + * <ol> + * <li>Authorization runs first, so idempotency cannot bypass it. + * <li>Pre-flight loads any prior record for {@code (realm, key)}. A match (same caller, same + * resource binding) replays the response from authoritative catalog state; a binding + * mismatch raises 422. + * <li>On a fresh key, the table is created and the record is inserted afterwards. A concurrent + * caller that wins the race causes our insert to return DUPLICATE — we then replay too, so + * the response is equivalent to what the winner returned. + * </ol> + * + * <p>No response body is stored. Replays go through {@code loadTable + + * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh credentials for the + * current caller. + */ public LoadTableResponse createTableDirect( Namespace namespace, CreateTableRequest request, EnumSet<AccessDelegationMode> delegationModes, - Optional<String> refreshCredentialsEndpoint) { + Optional<String> refreshCredentialsEndpoint, + Optional<UUID> idempotencyKey) { authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty()); Optional<AccessDelegationMode> resolvedMode = resolveAccessDelegationModes(delegationModes); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); + + // Pre-flight owns the full idempotency decision: it returns Disabled when the feature is off or + // no key was supplied (plain create path), Duplicate to replay a prior success, or Owned to + // proceed. Authorization already ran above, so a replay reloads current catalog state (no + // response body is stored) and re-vends credentials for this caller; + // buildLoadTableResponseForExistingTable raises 422 if the table has advanced beyond the + // metadata location captured when the key was recorded. A binding mismatch surfaces as + // IdempotencyConflictException, which IcebergExceptionMapper maps to HTTP 422. + IdempotencyOutcome preflight = + idempotencySupport() + .preflight( + idempotencyKey, + polarisPrincipal(), + IdempotentOperation.CREATE_TABLE, + namespace.toString(), + request.name(), + resolvedMode.map(Enum::name).orElse("none")); + if (preflight instanceof IdempotencyOutcome.Duplicate dup) { + return buildLoadTableResponseForExistingTable( + tableIdentifier, + resolvedMode, + CREATE_TABLE_STORAGE_ACTIONS, + refreshCredentialsEndpoint, + dup.existing().metadataLocation()); + } + + // Run the operation. A concurrent request carrying the same key can win the catalog-level race + // and make this attempt fail with AlreadyExistsException; if that winner recorded a matching + // idempotency outcome, replay it instead of returning a 409. + LoadTableResponse response; + try { + response = doCreateTableDirect(namespace, request, resolvedMode, refreshCredentialsEndpoint); + } catch (AlreadyExistsException e) { + if (preflight instanceof IdempotencyOutcome.Owned owned) { + Optional<IdempotencyRecord> raceWinner = + idempotencySupport().resolveConcurrentDuplicate(owned); + if (raceWinner.isPresent()) { + return buildLoadTableResponseForExistingTable( + tableIdentifier, + resolvedMode, + CREATE_TABLE_STORAGE_ACTIONS, + refreshCredentialsEndpoint, + raceWinner.get().metadataLocation()); + } + } + // Not a same-key retry: the table genuinely pre-existed, so this is a real conflict. + throw e; + } + + // Record the successful outcome. If a concurrent caller recorded first, replay theirs. + if (preflight instanceof IdempotencyOutcome.Owned owned) { + String metadataLocation = response.tableMetadata().metadataFileLocation(); + IdempotencyOutcome recordOutcome = + idempotencySupport().recordOutcome(owned, 200, metadataLocation); Review Comment: suggestion: same here: let's send the whole `preflight` and `response` into `recordOutcome()` and let the latter make the decision whether to write or not based on the `preflight` value. ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -460,14 +485,108 @@ public void authorizeCreateTableDirect( } } + /** + * Create a table, optionally honoring an {@code Idempotency-Key} from the REST request. + * + * <p>When an idempotency key is supplied and the feature is enabled: + * + * <ol> + * <li>Authorization runs first, so idempotency cannot bypass it. + * <li>Pre-flight loads any prior record for {@code (realm, key)}. A match (same caller, same + * resource binding) replays the response from authoritative catalog state; a binding + * mismatch raises 422. + * <li>On a fresh key, the table is created and the record is inserted afterwards. A concurrent + * caller that wins the race causes our insert to return DUPLICATE — we then replay too, so + * the response is equivalent to what the winner returned. + * </ol> + * + * <p>No response body is stored. Replays go through {@code loadTable + + * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh credentials for the + * current caller. + */ public LoadTableResponse createTableDirect( Namespace namespace, CreateTableRequest request, EnumSet<AccessDelegationMode> delegationModes, - Optional<String> refreshCredentialsEndpoint) { + Optional<String> refreshCredentialsEndpoint, + Optional<UUID> idempotencyKey) { authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty()); Optional<AccessDelegationMode> resolvedMode = resolveAccessDelegationModes(delegationModes); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); + + // Pre-flight owns the full idempotency decision: it returns Disabled when the feature is off or + // no key was supplied (plain create path), Duplicate to replay a prior success, or Owned to + // proceed. Authorization already ran above, so a replay reloads current catalog state (no + // response body is stored) and re-vends credentials for this caller; + // buildLoadTableResponseForExistingTable raises 422 if the table has advanced beyond the + // metadata location captured when the key was recorded. A binding mismatch surfaces as + // IdempotencyConflictException, which IcebergExceptionMapper maps to HTTP 422. + IdempotencyOutcome preflight = + idempotencySupport() + .preflight( + idempotencyKey, + polarisPrincipal(), + IdempotentOperation.CREATE_TABLE, + namespace.toString(), + request.name(), + resolvedMode.map(Enum::name).orElse("none")); + if (preflight instanceof IdempotencyOutcome.Duplicate dup) { + return buildLoadTableResponseForExistingTable( + tableIdentifier, + resolvedMode, + CREATE_TABLE_STORAGE_ACTIONS, + refreshCredentialsEndpoint, + dup.existing().metadataLocation()); + } + + // Run the operation. A concurrent request carrying the same key can win the catalog-level race + // and make this attempt fail with AlreadyExistsException; if that winner recorded a matching + // idempotency outcome, replay it instead of returning a 409. + LoadTableResponse response; + try { + response = doCreateTableDirect(namespace, request, resolvedMode, refreshCredentialsEndpoint); + } catch (AlreadyExistsException e) { + if (preflight instanceof IdempotencyOutcome.Owned owned) { + Optional<IdempotencyRecord> raceWinner = + idempotencySupport().resolveConcurrentDuplicate(owned); Review Comment: nit: I'd send the whole `preflight` object into `resolveConcurrentDuplicate()` and let the latter do the `instanceof`. This way `IdempotencyHandlerSupport` owns all the critical idempotency decisions. ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyStoreFactoryProducer.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Singleton; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; + +/** + * Produces the active {@link IdempotencyStoreFactory} by selecting the registered backend whose + * {@link Identifier} matches {@link IdempotencyConfiguration#type()}. + */ +@ApplicationScoped +public class IdempotencyStoreFactoryProducer { + + @Produces + @Singleton + public IdempotencyStoreFactory idempotencyStoreFactory( Review Comment: nit: I'm not sure producing a `@Singleton` from an `@ApplicationScoped` is ideal... perhaps this method should be `@ApplicationScoped` too 🤔 ... but it's a minor point. ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -460,14 +485,108 @@ public void authorizeCreateTableDirect( } } + /** + * Create a table, optionally honoring an {@code Idempotency-Key} from the REST request. + * + * <p>When an idempotency key is supplied and the feature is enabled: + * + * <ol> + * <li>Authorization runs first, so idempotency cannot bypass it. + * <li>Pre-flight loads any prior record for {@code (realm, key)}. A match (same caller, same + * resource binding) replays the response from authoritative catalog state; a binding + * mismatch raises 422. + * <li>On a fresh key, the table is created and the record is inserted afterwards. A concurrent + * caller that wins the race causes our insert to return DUPLICATE — we then replay too, so + * the response is equivalent to what the winner returned. + * </ol> + * + * <p>No response body is stored. Replays go through {@code loadTable + + * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh credentials for the + * current caller. + */ public LoadTableResponse createTableDirect( Namespace namespace, CreateTableRequest request, EnumSet<AccessDelegationMode> delegationModes, - Optional<String> refreshCredentialsEndpoint) { + Optional<String> refreshCredentialsEndpoint, + Optional<UUID> idempotencyKey) { authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty()); Optional<AccessDelegationMode> resolvedMode = resolveAccessDelegationModes(delegationModes); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); + + // Pre-flight owns the full idempotency decision: it returns Disabled when the feature is off or + // no key was supplied (plain create path), Duplicate to replay a prior success, or Owned to + // proceed. Authorization already ran above, so a replay reloads current catalog state (no + // response body is stored) and re-vends credentials for this caller; + // buildLoadTableResponseForExistingTable raises 422 if the table has advanced beyond the + // metadata location captured when the key was recorded. A binding mismatch surfaces as + // IdempotencyConflictException, which IcebergExceptionMapper maps to HTTP 422. + IdempotencyOutcome preflight = + idempotencySupport() + .preflight( + idempotencyKey, + polarisPrincipal(), + IdempotentOperation.CREATE_TABLE, + namespace.toString(), + request.name(), + resolvedMode.map(Enum::name).orElse("none")); + if (preflight instanceof IdempotencyOutcome.Duplicate dup) { + return buildLoadTableResponseForExistingTable( + tableIdentifier, + resolvedMode, + CREATE_TABLE_STORAGE_ACTIONS, + refreshCredentialsEndpoint, + dup.existing().metadataLocation()); + } + + // Run the operation. A concurrent request carrying the same key can win the catalog-level race + // and make this attempt fail with AlreadyExistsException; if that winner recorded a matching + // idempotency outcome, replay it instead of returning a 409. + LoadTableResponse response; + try { + response = doCreateTableDirect(namespace, request, resolvedMode, refreshCredentialsEndpoint); + } catch (AlreadyExistsException e) { + if (preflight instanceof IdempotencyOutcome.Owned owned) { + Optional<IdempotencyRecord> raceWinner = + idempotencySupport().resolveConcurrentDuplicate(owned); + if (raceWinner.isPresent()) { + return buildLoadTableResponseForExistingTable( + tableIdentifier, + resolvedMode, + CREATE_TABLE_STORAGE_ACTIONS, + refreshCredentialsEndpoint, + raceWinner.get().metadataLocation()); + } + } + // Not a same-key retry: the table genuinely pre-existed, so this is a real conflict. + throw e; + } + + // Record the successful outcome. If a concurrent caller recorded first, replay theirs. + if (preflight instanceof IdempotencyOutcome.Owned owned) { + String metadataLocation = response.tableMetadata().metadataFileLocation(); + IdempotencyOutcome recordOutcome = + idempotencySupport().recordOutcome(owned, 200, metadataLocation); + if (recordOutcome instanceof IdempotencyOutcome.Duplicate dup) { + // Another caller raced ahead and recorded first. Replay so the response is the same shape Review Comment: How is that possible with createTable? 🤔 Only one of the actors should be able to go past lines 550-560, I'd think 🤔 If the key got reused for another request type, `recordOutcome` would throw, right? ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java: ########## @@ -42,51 +43,25 @@ public interface ModelIdempotencyRecord extends Converter<IdempotencyRecord> { String TABLE_NAME = "idempotency_records"; - // Logical tenant / realm identifier. String REALM_ID = "realm_id"; - // Client-provided idempotency key. String IDEMPOTENCY_KEY = "idempotency_key"; - // Logical operation type (e.g. commit-table). String OPERATION_TYPE = "operation_type"; - // Normalized identifier of the affected resource. - String RESOURCE_ID = "resource_id"; - - // Final HTTP status code once the operation is completed (null while in-progress). + String RESOURCE_HASH = "resource_hash"; + String PRINCIPAL_HASH = "principal_hash"; Review Comment: just wondering: do we really need two hashes here? It is not sufficient to hash all relevant inputs into one persisted value? ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import jakarta.annotation.Nullable; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.polaris.core.DigestUtils; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; + +/** + * Handler-side helper for the single-transaction ("optimistic commit") idempotency model. + * + * <p>Responsibilities: + * + * <ul> + * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 only). + * <li>Compute the principal/resource hashes that form the binding stored alongside each record. + * <li>Pre-flight: from the raw request inputs, decide whether idempotency applies and, if so, + * look up an existing record for the same {@code (realm, key)}, returning an {@link + * IdempotencyOutcome} for the handler to branch on. + * <li>Record the terminal outcome after a successful operation, returning {@link + * IdempotencyOutcome.Owned} on win and {@link IdempotencyOutcome.Duplicate} on a race-driven + * duplicate. + * <li>Resolve a concurrent create-table race by polling for the winner's record. + * </ul> + * + * <p>This bean is {@link RequestScoped}: a single request operates within one realm, so the + * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link + * IdempotencyStoreFactory} for the request's {@link RealmContext}. When idempotency is disabled the + * store is never resolved — the bean is an inert shell. No response body is stored; duplicate + * responses are rebuilt from authoritative catalog state by the handler itself. + */ +@RequestScoped +public class IdempotencyHandlerSupport { + + // Bounded lookup for the idempotency record written by a concurrent create-table race winner, + // which records only after committing the table (so a loser may briefly not see it yet). Backoff + // is exponential: 5+10+20+40+80 = 155ms total budget across 5 attempts. If the winner's record is + // still not visible after that (e.g. a long GC pause or slow store write), the original 409 + // surfaces instead of a replay — correct but not ideal; widen the budget if this proves too + // tight. + private static final int CONCURRENT_REPLAY_MAX_ATTEMPTS = 5; + private static final long CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS = 5; + + @Inject IdempotencyConfiguration configuration; + @Inject IdempotencyStoreFactory storeFactory; + @Inject RealmContext realmContext; + @Inject Clock clock; + + // Resolved lazily on first use within the request; never resolved when idempotency is disabled. + private IdempotencyStore store; + + /** + * Returns an instance with idempotency permanently disabled. Useful for test fixtures that need a + * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent code paths. + */ + public static IdempotencyHandlerSupport disabled() { + IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport(); + instance.configuration = DisabledConfiguration.INSTANCE; + return instance; + } + + /** Returns {@code true} if handler-level idempotency is enabled. */ + public boolean isEnabled() { + return configuration != null && configuration.enabled(); + } + + /** + * Validates that the {@code Idempotency-Key} is a UUIDv7. UUIDv7 is required so the key carries a + * timestamp and enough entropy to be a meaningful idempotency boundary. The header is already + * parsed into a {@link UUID} upstream (the JAX-RS binding), so this only enforces the version. + * + * @return the key, or {@link Optional#empty()} if {@code idempotencyKey} is null or idempotency + * is disabled + * @throws IllegalArgumentException if the key is not a UUIDv7 (callers translate this into a 400 + * Bad Request) + */ + public Optional<UUID> validatedKey(@Nullable UUID idempotencyKey) { + if (!isEnabled() || idempotencyKey == null) { + return Optional.empty(); + } + // RFC 9562 UUIDv7: version 7 with the RFC 4122/9562 variant (2). + if (idempotencyKey.version() != 7 || idempotencyKey.variant() != 2) { + throw new IllegalArgumentException( + "Idempotency-Key must be a UUIDv7; got: " + idempotencyKey); + } + return Optional.of(idempotencyKey); + } + + /** + * SHA-256 (hex) of the calling principal's identity, bound to the request's realm. The input is + * the principal name, the realm id, and the activated role set, canonicalised so the hash is + * deterministic and order-independent. + * + * <p>Roles are part of the binding so two callers that share a name but differ in activated roles + * do not collide. Principal properties are intentionally excluded: they are admin-mutable and not + * authentication context. + */ + public String principalHash(PolarisPrincipal principal) { + StringBuilder sb = new StringBuilder(); + sb.append("name=").append(principal.getName()).append('|'); + sb.append("realm=").append(realmContext.getRealmIdentifier()).append('|'); + sb.append("roles="); + new TreeSet<>(principal.getRoles()).forEach(r -> sb.append(r).append(',')); + return DigestUtils.sha256Hex(sb.toString()); + } + + /** + * SHA-256 hex of the resource-binding component, built from the operation and its stable resource + * identity components (e.g. namespace, name, resolved access-delegation mode). The canonical + * format lives here so all binding/hashing logic stays in one place; callers only supply the + * identity components. The request payload itself is intentionally not part of the binding. + */ + public String resourceHash(IdempotentOperation operation, String... components) { + StringBuilder sb = new StringBuilder(operation.wireName()); + for (String component : components) { + sb.append(':').append(component == null ? "" : component); + } + return DigestUtils.sha256Hex(sb.toString()); + } + + /** + * Pre-flight: decide whether idempotency applies to this request and, if so, look up an existing + * record for {@code (realm, key)}. + * + * <p>All of the work lives here: if idempotency is disabled or no key was supplied, this returns + * {@link IdempotencyOutcome.Disabled} and the handler runs its plain path. Otherwise the + * principal/resource hashes are computed from the raw inputs and the store is consulted: + * + * <ul> + * <li>No record → {@link IdempotencyOutcome.Owned} (carrying the computed binding); the handler + * performs the operation and then calls {@link #recordOutcome}. + * <li>Matching record → {@link IdempotencyOutcome.Duplicate}; the handler rebuilds the response + * from current catalog state. + * <li>Record with a different binding → {@link IdempotencyConflictException} (mapped to 422). + * </ul> + * + * @param idempotencyKey the validated key, or {@link Optional#empty()} when absent/disabled + * @param principal the calling principal (hashed into the binding) + * @param operation the idempotent operation being attempted + * @param resourceComponents the stable resource-identity components (e.g. namespace, name, + * resolved access-delegation mode) + */ + public IdempotencyOutcome preflight( + Optional<UUID> idempotencyKey, + PolarisPrincipal principal, + IdempotentOperation operation, + String... resourceComponents) { + if (!isEnabled() || idempotencyKey.isEmpty()) { + return IdempotencyOutcome.disabled(); + } + IdempotencyOutcome.Owned owned = + IdempotencyOutcome.owned( + idempotencyKey.get(), + operation, + resourceHash(operation, resourceComponents), + principalHash(principal)); + return lookup(owned); + } + + /** + * Records the terminal outcome of an operation that just completed, using the binding carried by + * the {@link IdempotencyOutcome.Owned} returned from {@link #preflight}. + * + * <p>The insert is atomic on {@code (realm, key)}; if another caller raced ahead and inserted + * first, this returns {@link IdempotencyOutcome.Duplicate} carrying that existing record so the + * handler can rebuild an equivalent response from current state. Otherwise the same {@code owned} + * outcome is returned to signal a clean win. + */ + public IdempotencyOutcome recordOutcome( + IdempotencyOutcome.Owned owned, int httpStatus, @Nullable String metadataLocation) { + Instant now = clock.instant(); + Instant expiresAt = now.plus(configuration.ttl()); + IdempotencyStore.RecordResult result = + store() + .recordIfAbsent( + owned.idempotencyKey(), + owned.operation().wireName(), + owned.resourceHash(), + owned.principalHash(), + httpStatus, + metadataLocation, + now, + expiresAt); + if (result.type() == IdempotencyStore.RecordResultType.OWNED) { + return owned; + } + IdempotencyRecord existing = + result + .existing() + .orElseThrow(() -> new IllegalStateException("DUPLICATE result without record")); + return matchOrConflict(existing, owned); + } + + /** + * After a concurrent {@code createTable} loses the catalog race (AlreadyExistsException), checks + * whether the race winner recorded a matching idempotency outcome so this caller can replay it. + * + * <p>The winner records its outcome only after committing the table, so a loser may observe the + * conflict slightly before the record is visible; this polls a bounded number of times. A binding + * mismatch surfaces as {@link IdempotencyConflictException} (mapped to 422). Returns empty if no + * matching record appears, meaning the conflict was a genuine pre-existing table rather than a + * same-key retry. + */ + public Optional<IdempotencyRecord> resolveConcurrentDuplicate(IdempotencyOutcome.Owned owned) { + long backoffMillis = CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS; + for (int attempt = 0; attempt < CONCURRENT_REPLAY_MAX_ATTEMPTS; attempt++) { + if (lookup(owned) instanceof IdempotencyOutcome.Duplicate dup) { + return Optional.of(dup.existing()); + } + try { + Thread.sleep(backoffMillis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + backoffMillis *= 2; + } + return Optional.empty(); + } + + /** Single store lookup: returns {@code owned} if no record exists, else matches or conflicts. */ + private IdempotencyOutcome lookup(IdempotencyOutcome.Owned owned) { + Optional<IdempotencyRecord> existing = store().load(owned.idempotencyKey()); + if (existing.isEmpty()) { Review Comment: suggested by IJ: `return existing.map(record -> matchOrConflict(record, owned)).orElse(owned);` ########## runtime/service/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java: ########## @@ -176,6 +177,7 @@ static int mapExceptionToResponseCode(RuntimeException rex) { case AlreadyExistsException e -> Status.CONFLICT.getStatusCode(); case CommitFailedException e -> Status.CONFLICT.getStatusCode(); case UnprocessableEntityException e -> 422; + case IdempotencyConflictException e -> 422; Review Comment: This mapper is supposed to handle Iceberg exceptions (I think), but `IdempotencyConflictException` is a Polaris exception 🤔 Should we make `IdempotencyConflictException` extend `PolarisException` and map it in `PolarisExceptionMapper`? ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import jakarta.annotation.Nullable; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.polaris.core.DigestUtils; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; + +/** + * Handler-side helper for the single-transaction ("optimistic commit") idempotency model. + * + * <p>Responsibilities: + * + * <ul> + * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 only). + * <li>Compute the principal/resource hashes that form the binding stored alongside each record. + * <li>Pre-flight: from the raw request inputs, decide whether idempotency applies and, if so, + * look up an existing record for the same {@code (realm, key)}, returning an {@link + * IdempotencyOutcome} for the handler to branch on. + * <li>Record the terminal outcome after a successful operation, returning {@link + * IdempotencyOutcome.Owned} on win and {@link IdempotencyOutcome.Duplicate} on a race-driven + * duplicate. + * <li>Resolve a concurrent create-table race by polling for the winner's record. + * </ul> + * + * <p>This bean is {@link RequestScoped}: a single request operates within one realm, so the + * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link + * IdempotencyStoreFactory} for the request's {@link RealmContext}. When idempotency is disabled the + * store is never resolved — the bean is an inert shell. No response body is stored; duplicate + * responses are rebuilt from authoritative catalog state by the handler itself. + */ +@RequestScoped +public class IdempotencyHandlerSupport { + + // Bounded lookup for the idempotency record written by a concurrent create-table race winner, + // which records only after committing the table (so a loser may briefly not see it yet). Backoff + // is exponential: 5+10+20+40+80 = 155ms total budget across 5 attempts. If the winner's record is + // still not visible after that (e.g. a long GC pause or slow store write), the original 409 + // surfaces instead of a replay — correct but not ideal; widen the budget if this proves too + // tight. + private static final int CONCURRENT_REPLAY_MAX_ATTEMPTS = 5; + private static final long CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS = 5; + + @Inject IdempotencyConfiguration configuration; + @Inject IdempotencyStoreFactory storeFactory; + @Inject RealmContext realmContext; + @Inject Clock clock; + + // Resolved lazily on first use within the request; never resolved when idempotency is disabled. + private IdempotencyStore store; + + /** + * Returns an instance with idempotency permanently disabled. Useful for test fixtures that need a + * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent code paths. + */ + public static IdempotencyHandlerSupport disabled() { + IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport(); + instance.configuration = DisabledConfiguration.INSTANCE; + return instance; + } + + /** Returns {@code true} if handler-level idempotency is enabled. */ + public boolean isEnabled() { + return configuration != null && configuration.enabled(); Review Comment: How can `configuration` be `null`? Why? ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import jakarta.annotation.Nullable; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.polaris.core.DigestUtils; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; + +/** + * Handler-side helper for the single-transaction ("optimistic commit") idempotency model. + * + * <p>Responsibilities: + * + * <ul> + * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 only). + * <li>Compute the principal/resource hashes that form the binding stored alongside each record. + * <li>Pre-flight: from the raw request inputs, decide whether idempotency applies and, if so, + * look up an existing record for the same {@code (realm, key)}, returning an {@link + * IdempotencyOutcome} for the handler to branch on. + * <li>Record the terminal outcome after a successful operation, returning {@link + * IdempotencyOutcome.Owned} on win and {@link IdempotencyOutcome.Duplicate} on a race-driven + * duplicate. + * <li>Resolve a concurrent create-table race by polling for the winner's record. + * </ul> + * + * <p>This bean is {@link RequestScoped}: a single request operates within one realm, so the + * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link + * IdempotencyStoreFactory} for the request's {@link RealmContext}. When idempotency is disabled the + * store is never resolved — the bean is an inert shell. No response body is stored; duplicate + * responses are rebuilt from authoritative catalog state by the handler itself. + */ +@RequestScoped +public class IdempotencyHandlerSupport { + + // Bounded lookup for the idempotency record written by a concurrent create-table race winner, + // which records only after committing the table (so a loser may briefly not see it yet). Backoff + // is exponential: 5+10+20+40+80 = 155ms total budget across 5 attempts. If the winner's record is + // still not visible after that (e.g. a long GC pause or slow store write), the original 409 + // surfaces instead of a replay — correct but not ideal; widen the budget if this proves too + // tight. + private static final int CONCURRENT_REPLAY_MAX_ATTEMPTS = 5; + private static final long CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS = 5; Review Comment: Should we make them configurable via `IdempotencyConfiguration`? ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import jakarta.annotation.Nullable; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.polaris.core.DigestUtils; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; + +/** + * Handler-side helper for the single-transaction ("optimistic commit") idempotency model. + * + * <p>Responsibilities: + * + * <ul> + * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 only). + * <li>Compute the principal/resource hashes that form the binding stored alongside each record. + * <li>Pre-flight: from the raw request inputs, decide whether idempotency applies and, if so, + * look up an existing record for the same {@code (realm, key)}, returning an {@link + * IdempotencyOutcome} for the handler to branch on. + * <li>Record the terminal outcome after a successful operation, returning {@link + * IdempotencyOutcome.Owned} on win and {@link IdempotencyOutcome.Duplicate} on a race-driven + * duplicate. + * <li>Resolve a concurrent create-table race by polling for the winner's record. + * </ul> + * + * <p>This bean is {@link RequestScoped}: a single request operates within one realm, so the + * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link + * IdempotencyStoreFactory} for the request's {@link RealmContext}. When idempotency is disabled the + * store is never resolved — the bean is an inert shell. No response body is stored; duplicate + * responses are rebuilt from authoritative catalog state by the handler itself. + */ +@RequestScoped +public class IdempotencyHandlerSupport { + + // Bounded lookup for the idempotency record written by a concurrent create-table race winner, + // which records only after committing the table (so a loser may briefly not see it yet). Backoff + // is exponential: 5+10+20+40+80 = 155ms total budget across 5 attempts. If the winner's record is + // still not visible after that (e.g. a long GC pause or slow store write), the original 409 + // surfaces instead of a replay — correct but not ideal; widen the budget if this proves too + // tight. + private static final int CONCURRENT_REPLAY_MAX_ATTEMPTS = 5; + private static final long CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS = 5; + + @Inject IdempotencyConfiguration configuration; + @Inject IdempotencyStoreFactory storeFactory; + @Inject RealmContext realmContext; + @Inject Clock clock; + + // Resolved lazily on first use within the request; never resolved when idempotency is disabled. + private IdempotencyStore store; + + /** + * Returns an instance with idempotency permanently disabled. Useful for test fixtures that need a + * non-null {@code IdempotencyHandlerSupport} but exercise non-idempotent code paths. + */ + public static IdempotencyHandlerSupport disabled() { + IdempotencyHandlerSupport instance = new IdempotencyHandlerSupport(); + instance.configuration = DisabledConfiguration.INSTANCE; Review Comment: This is not used on "prod" call paths, it seems 🤔 Can it be moved to test code? ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyMaintenance.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Vertx; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; +import org.apache.polaris.service.context.RealmContextConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Background maintenance for handler-level idempotency: periodically purges expired records. + * + * <p>On {@link StartupEvent}, registers a Vert.x periodic timer that ticks at {@link + * IdempotencyConfiguration#purgeInterval()}. Each tick offloads the actual store I/O to the worker + * pool to avoid blocking the event loop. A non-reentrant flag prevents overlapping purges if a tick + * takes longer than the configured interval. The timer is cancelled on {@link ShutdownEvent}. + * + * <p>The timer is a no-op unless {@link IdempotencyConfiguration#purgeEnabled()} is {@code true}. + * In multi-node deployments, enabling purge on all replicas may cause unnecessary contention; + * operators can either disable it on most replicas or run an external scheduled job instead. + */ +@ApplicationScoped +public class IdempotencyMaintenance { + + private static final Logger LOGGER = LoggerFactory.getLogger(IdempotencyMaintenance.class); + + @Inject IdempotencyConfiguration configuration; + @Inject RealmContextConfiguration realmContextConfiguration; + @Inject IdempotencyStoreFactory storeFactory; + @Inject Clock clock; + @Inject Vertx vertx; + + private volatile Long purgeTimerId; + private final AtomicBoolean purgeRunning = new AtomicBoolean(false); + + void onStart(@Observes StartupEvent event) { + if (!configuration.enabled() || !configuration.purgeEnabled()) { + return; + } + long intervalMs = configuration.purgeInterval().toMillis(); + purgeTimerId = + vertx.setPeriodic( + intervalMs, + ignored -> { + if (!purgeRunning.compareAndSet(false, true)) { + return; + } + Infrastructure.getDefaultWorkerPool() + .execute( + () -> { + try { + purgeOnce(); Review Comment: Should this be an Admin Tool command perhaps? Similar to NoSQL and Metrics maintenance? Cf. https://lists.apache.org/thread/5nst0f2ygnl2gj3j910q7m8nk2fvokc7 ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStoreFactory.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.core.persistence; + +import com.google.common.annotations.Beta; +import org.apache.polaris.core.context.RealmContext; + +/** + * Factory for per-realm {@link IdempotencyStore} instances. + * + * <p>This SPI is intentionally kept separate from {@link MetaStoreManagerFactory}: handler-level + * idempotency persistence is conceptually independent from the catalog metastore, and a deployment + * may want to back it with a different store (e.g. a dedicated PostgreSQL instance) than the + * primary metastore. Backends register implementations with {@link + * io.smallrye.common.annotation.Identifier} matching the {@code polaris.idempotency.type} + * configuration value. + * + * <p>Returned instances must be safe to share across threads. + */ +@Beta +public interface IdempotencyStoreFactory { + + /** Returns the {@link IdempotencyStore} to use for the given realm. */ + IdempotencyStore getOrCreateIdempotencyStore(RealmContext realmContext); Review Comment: This method is _not_ used in `polaris-core`, it is used only in `runtime/service` where `IdempotencyStoreFactoryProducer` lives, but it's whole purpose appears to be to support lazy init in `IdempotencyHandlerSupport`. Plus it's a new interface. So, I propose to simplify injections: * Remove this factory * Add a local (not for reuse) class `IdempotencyStoreProducer` in `runtime/service` with ` * Make `IdempotencyStoreFactoryProducer` produce an app-scoped `IdempotencyStoreProducer` (may be an inner class) * `IdempotencyStoreProducer` has a request-scoped producer method for `store(RealmContext)` * If disabled `IdempotencyStoreProducer` will produce a do-nothing impl. * `IdempotencyHandlerSupport` receives a normal `IdempotencyStore` (not a factory). No lazy init logic necessary. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
