dennishuo commented on code in PR #3205:
URL: https://github.com/apache/polaris/pull/3205#discussion_r2636798316


##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+
+/**
+ * Abstraction for persisting and querying idempotency records.
+ *
+ * <p>An {@link IdempotencyStore} is responsible for:
+ *
+ * <ul>
+ *   <li>Reserving an idempotency key for a particular operation and resource
+ *   <li>Recording completion status and response metadata
+ *   <li>Allowing callers to look up existing records to detect duplicates
+ *   <li>Expiring and purging old reservations
+ * </ul>
+ *
+ * <p>Implementations must be thread-safe if used concurrently.
+ */
+public interface IdempotencyStore {
+
+  /** High-level outcome of attempting to reserve an idempotency key. */
+  enum ReserveResultType {
+    /** The caller successfully acquired ownership of the idempotency key. */
+    OWNED,
+    /** A reservation already exists for the key; the caller does not own it. 
*/
+    DUPLICATE
+  }
+
+  /**
+   * Result of a {@link #reserve(String, String, String, String, Instant, 
String, Instant)} call,
+   * including the outcome and, when applicable, the existing idempotency 
record.
+   */
+  final class ReserveResult {
+    private final ReserveResultType type;
+    private final Optional<IdempotencyRecord> existing;
+
+    public ReserveResult(ReserveResultType type, Optional<IdempotencyRecord> 
existing) {
+      this.type = type;
+      this.existing = existing == null ? Optional.empty() : existing;
+    }
+
+    /**
+     * Returns the outcome of the reservation attempt.
+     *
+     * @return the {@link ReserveResultType}
+     */
+    public ReserveResultType getType() {
+      return type;
+    }
+
+    /**
+     * Returns the existing idempotency record when {@link #getType()} is 
{@link
+     * ReserveResultType#DUPLICATE}, otherwise {@link Optional#empty()}.
+     *
+     * @return the existing {@link IdempotencyRecord}, if present
+     */
+    public Optional<IdempotencyRecord> getExisting() {
+      return existing;
+    }
+  }
+
+  /**
+   * Attempts to reserve an idempotency key for a given operation and resource.
+   *
+   * <p>If no record exists yet, the implementation should create a new 
reservation owned by {@code
+   * executorId}. If a record already exists, the implementation should return 
{@link
+   * ReserveResultType#DUPLICATE} along with the existing record.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param operationType logical operation name (e.g., {@code "commit-table"})
+   * @param normalizedResourceId normalized identifier of the affected resource
+   * @param expiresAt timestamp after which the reservation is considered 
expired
+   * @param executorId identifier of the caller attempting the reservation
+   * @param now timestamp representing the current time
+   * @return {@link ReserveResult} describing whether the caller owns the 
reservation or hit a
+   *     duplicate
+   */
+  ReserveResult reserve(
+      String realmId,
+      String idempotencyKey,
+      String operationType,
+      String normalizedResourceId,
+      Instant expiresAt,
+      String executorId,
+      Instant now);
+
+  /**
+   * Loads an existing idempotency record for the given realm and key, if 
present.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @return the corresponding {@link IdempotencyRecord}, if it exists
+   */
+  Optional<IdempotencyRecord> load(String realmId, String idempotencyKey);
+
+  /**
+   * Updates the heartbeat for an in-progress reservation to indicate that the 
executor is still
+   * actively processing.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param executorId identifier of the executor that owns the reservation
+   * @param now timestamp representing the current time
+   * @return {@code true} if the heartbeat was updated, {@code false} otherwise

Review Comment:
   My cursory reading of the query used in the postgres impl indicates there 
might be distinct causes of failing to update the heartbeat -- specifically:
   
   1. If executorId mismatches another in-progress one
   2. If already finalized with an HTTP status
   3. If the idempotencyKey doesn't already exist
   
   We should probably include these details in the javadoc if these are indeed 
the intended scenarios. Is it intended for returning `false` to be a code error 
case (as in, callers should've already guaranteed none of these situations 
applies due to calling `reserve` before), or is it an expected occurrence? If 
it's only expected in cases of unexpected code bugs, an exception is probably 
better than a boolean return value.



##########
persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql:
##########
@@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events (
     additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB,
     PRIMARY KEY (event_id)
 );
+
+-- Idempotency records (key-only idempotency; durable replay)
+CREATE TABLE IF NOT EXISTS idempotency_records (
+    realm_id TEXT NOT NULL,
+    idempotency_key TEXT NOT NULL,
+    operation_type TEXT NOT NULL,
+    resource_id TEXT NOT NULL,
+
+    -- Finalization/replay
+    http_status INTEGER,                 -- NULL while IN_PROGRESS; set only 
on finalized 2xx/terminal 4xx
+    error_subtype TEXT,                  -- optional: e.g., already_exists, 
namespace_not_empty, idempotency_replay_failed
+    response_summary TEXT,               -- minimal body to reproduce 
equivalent response (JSON string)
+    response_headers TEXT,               -- small whitelisted headers to 
replay (JSON string)
+    finalized_at TIMESTAMP,              -- when http_status was written
+
+    -- Liveness/ops
+    created_at TIMESTAMP NOT NULL,
+    updated_at TIMESTAMP NOT NULL,
+    heartbeat_at TIMESTAMP,              -- updated by owner while IN_PROGRESS
+    executor_id TEXT,                    -- owner pod/worker id
+    expires_at TIMESTAMP,
+
+    PRIMARY KEY (realm_id, idempotency_key)
+);
+
+-- Helpful indexes
+CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records 
(expires_at);
+CREATE INDEX IF NOT EXISTS idx_idemp_active  ON idempotency_records 
(http_status, heartbeat_at);

Review Comment:
   What's the expected access pattern for this index? Do we expect this one to 
participate in the updateHeartbeat call? We probably want this to be `realm_id` 
prefixed if we really intend to use it outside of a single `realm_id, 
idempotency_key` lookup. Like if we're going to add some kind of 
listRecentIdempotencyKeys call based on `heartbeat_at`. But then it's not clear 
whether we really want `http_status` as the prefix first. Maybe best to add the 
index in the same PR that adds a use case for it, so I'd just remove this index 
for now unless I misunderstood the existing query patterns.
   
   If we're just thinking of serving the additional `WHERE` clauses in 
update/finalize where we already have `realm_id, idempotency_key` as predicates 
we probably don't actually want a whole explicit index on this.



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.idempotency.IdempotencyRecord;
+
+/**
+ * Abstraction for persisting and querying idempotency records.
+ *
+ * <p>An {@link IdempotencyStore} is responsible for:
+ *
+ * <ul>
+ *   <li>Reserving an idempotency key for a particular operation and resource
+ *   <li>Recording completion status and response metadata
+ *   <li>Allowing callers to look up existing records to detect duplicates
+ *   <li>Expiring and purging old reservations
+ * </ul>
+ *
+ * <p>Implementations must be thread-safe if used concurrently.
+ */
+public interface IdempotencyStore {
+
+  /** High-level outcome of attempting to reserve an idempotency key. */
+  enum ReserveResultType {
+    /** The caller successfully acquired ownership of the idempotency key. */
+    OWNED,
+    /** A reservation already exists for the key; the caller does not own it. 
*/
+    DUPLICATE
+  }
+
+  /**
+   * Result of a {@link #reserve(String, String, String, String, Instant, 
String, Instant)} call,
+   * including the outcome and, when applicable, the existing idempotency 
record.
+   */
+  final class ReserveResult {
+    private final ReserveResultType type;
+    private final Optional<IdempotencyRecord> existing;
+
+    public ReserveResult(ReserveResultType type, Optional<IdempotencyRecord> 
existing) {
+      this.type = type;
+      this.existing = existing == null ? Optional.empty() : existing;
+    }
+
+    /**
+     * Returns the outcome of the reservation attempt.
+     *
+     * @return the {@link ReserveResultType}
+     */
+    public ReserveResultType getType() {
+      return type;
+    }
+
+    /**
+     * Returns the existing idempotency record when {@link #getType()} is 
{@link
+     * ReserveResultType#DUPLICATE}, otherwise {@link Optional#empty()}.
+     *
+     * @return the existing {@link IdempotencyRecord}, if present
+     */
+    public Optional<IdempotencyRecord> getExisting() {
+      return existing;
+    }
+  }
+
+  /**
+   * Attempts to reserve an idempotency key for a given operation and resource.
+   *
+   * <p>If no record exists yet, the implementation should create a new 
reservation owned by {@code
+   * executorId}. If a record already exists, the implementation should return 
{@link
+   * ReserveResultType#DUPLICATE} along with the existing record.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param operationType logical operation name (e.g., {@code "commit-table"})
+   * @param normalizedResourceId normalized identifier of the affected resource
+   * @param expiresAt timestamp after which the reservation is considered 
expired
+   * @param executorId identifier of the caller attempting the reservation
+   * @param now timestamp representing the current time
+   * @return {@link ReserveResult} describing whether the caller owns the 
reservation or hit a
+   *     duplicate
+   */
+  ReserveResult reserve(
+      String realmId,
+      String idempotencyKey,
+      String operationType,
+      String normalizedResourceId,
+      Instant expiresAt,
+      String executorId,
+      Instant now);
+
+  /**
+   * Loads an existing idempotency record for the given realm and key, if 
present.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @return the corresponding {@link IdempotencyRecord}, if it exists
+   */
+  Optional<IdempotencyRecord> load(String realmId, String idempotencyKey);
+
+  /**
+   * Updates the heartbeat for an in-progress reservation to indicate that the 
executor is still
+   * actively processing.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param executorId identifier of the executor that owns the reservation
+   * @param now timestamp representing the current time
+   * @return {@code true} if the heartbeat was updated, {@code false} otherwise
+   */
+  boolean updateHeartbeat(String realmId, String idempotencyKey, String 
executorId, Instant now);
+
+  /**
+   * Marks an idempotency record as finalized, recording HTTP status and 
response metadata.
+   *
+   * <p>Implementations should be tolerant of idempotent re-finalization 
attempts and typically
+   * return {@code false} when a record was already finalized.
+   *
+   * @param realmId logical tenant or realm identifier
+   * @param idempotencyKey application-provided idempotency key
+   * @param httpStatus HTTP status code returned to the client, or {@code 
null} if not applicable
+   * @param errorSubtype optional error subtype or code, if the operation 
failed
+   * @param responseSummary short, serialized representation of the response 
body
+   * @param responseHeaders serialized representation of response headers
+   * @param finalizedAt timestamp when the operation completed
+   * @return {@code true} if the record was transitioned to a finalized state, 
{@code false}
+   *     otherwise
+   */
+  boolean finalizeRecord(
+      String realmId,
+      String idempotencyKey,
+      Integer httpStatus,
+      String errorSubtype,
+      String responseSummary,
+      String responseHeaders,
+      Instant finalizedAt);
+
+  /**
+   * Purges records whose expiration time is strictly before the given instant.
+   *
+   * @param before cutoff instant; records expiring before this time may be 
removed
+   * @return number of records that were purged
+   */
+  int purgeExpired(Instant before);

Review Comment:
   Do we have any precedent yet for cross-realm mutations in any of the 
existing persistence functionality? I see how this could be appealing to purge 
across all realms in the typical case where the single Postgres database serves 
all realms, but IIRC it was still open-ended for the persistence factory to be 
allowed to route to different Postgres databases or even fully different 
instances for different realms, in which case we'd expect the async job that 
purges these things to be able to selectively purge for different realms 
independently.



##########
persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql:
##########
@@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events (
     additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB,
     PRIMARY KEY (event_id)
 );
+
+-- Idempotency records (key-only idempotency; durable replay)
+CREATE TABLE IF NOT EXISTS idempotency_records (
+    realm_id TEXT NOT NULL,
+    idempotency_key TEXT NOT NULL,
+    operation_type TEXT NOT NULL,
+    resource_id TEXT NOT NULL,
+
+    -- Finalization/replay
+    http_status INTEGER,                 -- NULL while IN_PROGRESS; set only 
on finalized 2xx/terminal 4xx
+    error_subtype TEXT,                  -- optional: e.g., already_exists, 
namespace_not_empty, idempotency_replay_failed
+    response_summary TEXT,               -- minimal body to reproduce 
equivalent response (JSON string)
+    response_headers TEXT,               -- small whitelisted headers to 
replay (JSON string)
+    finalized_at TIMESTAMP,              -- when http_status was written
+
+    -- Liveness/ops
+    created_at TIMESTAMP NOT NULL,
+    updated_at TIMESTAMP NOT NULL,
+    heartbeat_at TIMESTAMP,              -- updated by owner while IN_PROGRESS
+    executor_id TEXT,                    -- owner pod/worker id
+    expires_at TIMESTAMP,
+
+    PRIMARY KEY (realm_id, idempotency_key)
+);
+
+-- Helpful indexes
+CREATE INDEX IF NOT EXISTS idx_idemp_expires ON idempotency_records 
(expires_at);

Review Comment:
   Same as my comment in `IdempotencyStore`, keeping `realm_id` as a prefix to 
this might better reflect the control flow we'd need in the purge driver to 
support more sophisticated realm routing use cases (e.g. can't assume 
sophisticated use cases are satisfied with a global cross-realm purge call, but 
rather might need to be structured as having some realm iterator interface and 
calling purge individually on each realm).
   
   That said, I guess the realm-splitting design is also something we need to 
consider for all other async maintenance types of operations, and it's possible 
I'm forgetting other precedent on cross-realm maintenance. If so, happy to 
align with that precedent.



##########
persistence/relational-jdbc/src/main/resources/postgres/schema-v3.sql:
##########
@@ -134,3 +134,31 @@ CREATE TABLE IF NOT EXISTS events (
     additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB,
     PRIMARY KEY (event_id)
 );
+
+-- Idempotency records (key-only idempotency; durable replay)
+CREATE TABLE IF NOT EXISTS idempotency_records (
+    realm_id TEXT NOT NULL,
+    idempotency_key TEXT NOT NULL,
+    operation_type TEXT NOT NULL,
+    resource_id TEXT NOT NULL,
+
+    -- Finalization/replay
+    http_status INTEGER,                 -- NULL while IN_PROGRESS; set only 
on finalized 2xx/terminal 4xx
+    error_subtype TEXT,                  -- optional: e.g., already_exists, 
namespace_not_empty, idempotency_replay_failed
+    response_summary TEXT,               -- minimal body to reproduce 
equivalent response (JSON string)

Review Comment:
   These field descriptors should also be copied as javadoc comments into the 
accessors in `IdempotencyRecord`, since that's where the persistence-agnostic 
source of truth description of Polaris types/interfaces resides.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to