huaxingao commented on code in PR #3205: URL: https://github.com/apache/polaris/pull/3205#discussion_r2638325266
########## polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.core.persistence; + +import java.time.Instant; +import java.util.Optional; +import org.apache.polaris.idempotency.IdempotencyRecord; + +/** + * Abstraction for persisting and querying idempotency records. + * + * <p>An {@link IdempotencyStore} is responsible for: + * + * <ul> + * <li>Reserving an idempotency key for a particular operation and resource + * <li>Recording completion status and response metadata + * <li>Allowing callers to look up existing records to detect duplicates + * <li>Expiring and purging old reservations + * </ul> + * + * <p>Implementations must be thread-safe if used concurrently. + */ +public interface IdempotencyStore { + + /** High-level outcome of attempting to reserve an idempotency key. */ + enum ReserveResultType { + /** The caller successfully acquired ownership of the idempotency key. */ + OWNED, + /** A reservation already exists for the key; the caller does not own it. */ + DUPLICATE + } + + /** + * Result of a {@link #reserve(String, String, String, String, Instant, String, Instant)} call, + * including the outcome and, when applicable, the existing idempotency record. + */ + final class ReserveResult { + private final ReserveResultType type; + private final Optional<IdempotencyRecord> existing; + + public ReserveResult(ReserveResultType type, Optional<IdempotencyRecord> existing) { + this.type = type; + this.existing = existing == null ? Optional.empty() : existing; + } + + /** + * Returns the outcome of the reservation attempt. + * + * @return the {@link ReserveResultType} + */ + public ReserveResultType getType() { + return type; + } + + /** + * Returns the existing idempotency record when {@link #getType()} is {@link + * ReserveResultType#DUPLICATE}, otherwise {@link Optional#empty()}. + * + * @return the existing {@link IdempotencyRecord}, if present + */ + public Optional<IdempotencyRecord> getExisting() { + return existing; + } + } + + /** + * Attempts to reserve an idempotency key for a given operation and resource. + * + * <p>If no record exists yet, the implementation should create a new reservation owned by {@code + * executorId}. If a record already exists, the implementation should return {@link + * ReserveResultType#DUPLICATE} along with the existing record. + * + * @param realmId logical tenant or realm identifier + * @param idempotencyKey application-provided idempotency key + * @param operationType logical operation name (e.g., {@code "commit-table"}) + * @param normalizedResourceId normalized identifier of the affected resource + * @param expiresAt timestamp after which the reservation is considered expired + * @param executorId identifier of the caller attempting the reservation + * @param now timestamp representing the current time + * @return {@link ReserveResult} describing whether the caller owns the reservation or hit a + * duplicate + */ + ReserveResult reserve( + String realmId, + String idempotencyKey, + String operationType, + String normalizedResourceId, + Instant expiresAt, + String executorId, + Instant now); + + /** + * Loads an existing idempotency record for the given realm and key, if present. + * + * @param realmId logical tenant or realm identifier + * @param idempotencyKey application-provided idempotency key + * @return the corresponding {@link IdempotencyRecord}, if it exists + */ + Optional<IdempotencyRecord> load(String realmId, String idempotencyKey); + + /** + * Updates the heartbeat for an in-progress reservation to indicate that the executor is still + * actively processing. + * + * @param realmId logical tenant or realm identifier + * @param idempotencyKey application-provided idempotency key + * @param executorId identifier of the executor that owns the reservation + * @param now timestamp representing the current time + * @return {@code true} if the heartbeat was updated, {@code false} otherwise Review Comment: Good catch, you’re right: updateHeartbeat can return false in exactly the three cases you described. The intent is for these to be normal outcomes, not “exception” cases. From the caller’s point of view, updateHeartbeat is basically “refresh the heartbeat if I still own this in‑progress reservation.” Returning true means “yes, you still own it and we updated the row,” and returning false means “there is no matching in‑progress row you own anymore, so stop heartbeating and treat this reservation as lost.” Actual failures (SQL errors, connectivity issues, etc.) will still surface as exceptions from the implementation. I’ll update the Javadoc to spell out the three false scenarios and clarify this contract. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
