huaxingao commented on code in PR #4659: URL: https://github.com/apache/polaris/pull/4659#discussion_r3406143589
########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import jakarta.annotation.Nullable; +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.polaris.core.DigestUtils; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyStore; +import org.apache.polaris.core.persistence.IdempotencyStoreFactory; + +/** + * Handler-side helper for the single-transaction ("optimistic commit") idempotency model. + * + * <p>Responsibilities: + * + * <ul> + * <li>Read and validate the {@code Idempotency-Key} request header (UUIDv7 only). + * <li>Compute the principal/resource hashes that form the binding stored alongside each record. + * <li>Pre-flight: from the raw request inputs, decide whether idempotency applies and, if so, + * look up an existing record for the same {@code (realm, key)}, returning an {@link + * IdempotencyOutcome} for the handler to branch on. + * <li>Record the terminal outcome after a successful operation, returning {@link + * IdempotencyOutcome.Owned} on win and {@link IdempotencyOutcome.Duplicate} on a race-driven + * duplicate. + * <li>Resolve a concurrent create-table race by polling for the winner's record. + * </ul> + * + * <p>This bean is {@link RequestScoped}: a single request operates within one realm, so the + * realm-scoped {@link IdempotencyStore} is resolved once (lazily) from {@link + * IdempotencyStoreFactory} for the request's {@link RealmContext}. When idempotency is disabled the + * store is never resolved — the bean is an inert shell. No response body is stored; duplicate + * responses are rebuilt from authoritative catalog state by the handler itself. + */ +@RequestScoped +public class IdempotencyHandlerSupport { + + // Bounded lookup for the idempotency record written by a concurrent create-table race winner, + // which records only after committing the table (so a loser may briefly not see it yet). Backoff + // is exponential: 5+10+20+40+80 = 155ms total budget across 5 attempts. If the winner's record is + // still not visible after that (e.g. a long GC pause or slow store write), the original 409 + // surfaces instead of a replay — correct but not ideal; widen the budget if this proves too + // tight. + private static final int CONCURRENT_REPLAY_MAX_ATTEMPTS = 5; + private static final long CONCURRENT_REPLAY_INITIAL_BACKOFF_MILLIS = 5; Review Comment: Done. Exposed both via `IdempotencyConfiguration` (`concurrentReplayMaxAttempts`, default 5; `concurrentReplayInitialBackoff`, default PT0.005S). Defaults match the previous hardcoded values. ########## runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -460,14 +485,108 @@ public void authorizeCreateTableDirect( } } + /** + * Create a table, optionally honoring an {@code Idempotency-Key} from the REST request. + * + * <p>When an idempotency key is supplied and the feature is enabled: + * + * <ol> + * <li>Authorization runs first, so idempotency cannot bypass it. + * <li>Pre-flight loads any prior record for {@code (realm, key)}. A match (same caller, same + * resource binding) replays the response from authoritative catalog state; a binding + * mismatch raises 422. + * <li>On a fresh key, the table is created and the record is inserted afterwards. A concurrent + * caller that wins the race causes our insert to return DUPLICATE — we then replay too, so + * the response is equivalent to what the winner returned. + * </ol> + * + * <p>No response body is stored. Replays go through {@code loadTable + + * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh credentials for the + * current caller. + */ public LoadTableResponse createTableDirect( Namespace namespace, CreateTableRequest request, EnumSet<AccessDelegationMode> delegationModes, - Optional<String> refreshCredentialsEndpoint) { + Optional<String> refreshCredentialsEndpoint, + Optional<UUID> idempotencyKey) { authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty()); Optional<AccessDelegationMode> resolvedMode = resolveAccessDelegationModes(delegationModes); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); + + // Pre-flight owns the full idempotency decision: it returns Disabled when the feature is off or + // no key was supplied (plain create path), Duplicate to replay a prior success, or Owned to + // proceed. Authorization already ran above, so a replay reloads current catalog state (no + // response body is stored) and re-vends credentials for this caller; + // buildLoadTableResponseForExistingTable raises 422 if the table has advanced beyond the + // metadata location captured when the key was recorded. A binding mismatch surfaces as + // IdempotencyConflictException, which IcebergExceptionMapper maps to HTTP 422. + IdempotencyOutcome preflight = + idempotencySupport() + .preflight( + idempotencyKey, + polarisPrincipal(), + IdempotentOperation.CREATE_TABLE, + namespace.toString(), + request.name(), + resolvedMode.map(Enum::name).orElse("none")); + if (preflight instanceof IdempotencyOutcome.Duplicate dup) { + return buildLoadTableResponseForExistingTable( + tableIdentifier, + resolvedMode, + CREATE_TABLE_STORAGE_ACTIONS, + refreshCredentialsEndpoint, + dup.existing().metadataLocation()); + } + + // Run the operation. A concurrent request carrying the same key can win the catalog-level race + // and make this attempt fail with AlreadyExistsException; if that winner recorded a matching + // idempotency outcome, replay it instead of returning a 409. + LoadTableResponse response; + try { + response = doCreateTableDirect(namespace, request, resolvedMode, refreshCredentialsEndpoint); + } catch (AlreadyExistsException e) { + if (preflight instanceof IdempotencyOutcome.Owned owned) { + Optional<IdempotencyRecord> raceWinner = + idempotencySupport().resolveConcurrentDuplicate(owned); + if (raceWinner.isPresent()) { + return buildLoadTableResponseForExistingTable( + tableIdentifier, + resolvedMode, + CREATE_TABLE_STORAGE_ACTIONS, + refreshCredentialsEndpoint, + raceWinner.get().metadataLocation()); + } + } + // Not a same-key retry: the table genuinely pre-existed, so this is a real conflict. + throw e; + } + + // Record the successful outcome. If a concurrent caller recorded first, replay theirs. + if (preflight instanceof IdempotencyOutcome.Owned owned) { + String metadataLocation = response.tableMetadata().metadataFileLocation(); + IdempotencyOutcome recordOutcome = + idempotencySupport().recordOutcome(owned, 200, metadataLocation); + if (recordOutcome instanceof IdempotencyOutcome.Duplicate dup) { + // Another caller raced ahead and recorded first. Replay so the response is the same shape Review Comment: You are right, the Duplicate-replay branch here was unreachable. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
