Copilot commented on code in PR #4659:
URL: https://github.com/apache/polaris/pull/4659#discussion_r3377120951


##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java:
##########
@@ -134,99 +93,41 @@ public Map<String, Object> toMap(
   }
 
   @Override
-  public HeartbeatResult updateHeartbeat(
-      String realmId, String idempotencyKey, String executorId, Instant now) {
-    Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
-    if (existing.isEmpty()) {
-      return HeartbeatResult.NOT_FOUND;
-    }
-
-    IdempotencyRecord record = existing.get();
-    if (record.httpStatus() != null) {
-      return HeartbeatResult.FINALIZED;
-    }
-    if (record.executorId() == null || 
!record.executorId().equals(executorId)) {
-      return HeartbeatResult.LOST_OWNERSHIP;
-    }
-
-    QueryGenerator.PreparedQuery update =
-        QueryGenerator.generateUpdateQuery(
-            ModelIdempotencyRecord.ALL_COLUMNS,
-            ModelIdempotencyRecord.TABLE_NAME,
-            Map.of(
-                ModelIdempotencyRecord.HEARTBEAT_AT,
-                Timestamp.from(now),
-                ModelIdempotencyRecord.UPDATED_AT,
-                Timestamp.from(now)),
-            Map.of(
-                ModelIdempotencyRecord.REALM_ID,
-                realmId,
-                ModelIdempotencyRecord.IDEMPOTENCY_KEY,
-                idempotencyKey,
-                ModelIdempotencyRecord.EXECUTOR_ID,
-                executorId),
-            Map.of(),
-            Map.of(),
-            Set.of(ModelIdempotencyRecord.HTTP_STATUS),
-            Set.of());
-
-    try {
-      int updated = datasourceOperations.executeUpdate(update);
-      if (updated > 0) {
-        return HeartbeatResult.UPDATED;
-      }
-    } catch (SQLException e) {
-      throw new IdempotencyPersistenceException("Failed to update idempotency 
heartbeat", e);
-    }
-
-    // Raced with finalize/ownership loss; re-check to return a meaningful 
result.
-    Optional<IdempotencyRecord> after = load(realmId, idempotencyKey);
-    if (after.isEmpty()) {
-      return HeartbeatResult.NOT_FOUND;
-    }
-    if (after.get().httpStatus() != null) {
-      return HeartbeatResult.FINALIZED;
-    }
-    return HeartbeatResult.LOST_OWNERSHIP;
-  }
-
-  @Override
-  public boolean finalizeRecord(
+  public RecordResult recordIfAbsent(
       String realmId,
       String idempotencyKey,
-      Integer httpStatus,
+      String operationType,
+      String normalizedResourceId,
+      String principalHash,
+      int httpStatus,
       String errorSubtype,
-      String responseSummary,
-      String responseHeaders,
-      Instant finalizedAt) {
-    // Use ordered/set maps so we can include nullable values (Map.of 
disallows nulls).
-    Map<String, Object> setClause = new LinkedHashMap<>();
-    setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
-    setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
-    setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
-    setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders);
-    setClause.put(ModelIdempotencyRecord.FINALIZED_AT, 
Timestamp.from(finalizedAt));
-    setClause.put(ModelIdempotencyRecord.UPDATED_AT, 
Timestamp.from(finalizedAt));
-
-    Map<String, Object> whereEquals = new HashMap<>();
-    whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
-    whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
-
-    QueryGenerator.PreparedQuery update =
-        QueryGenerator.generateUpdateQuery(
-            ModelIdempotencyRecord.ALL_COLUMNS,
-            ModelIdempotencyRecord.TABLE_NAME,
-            setClause,
-            whereEquals,
-            Map.of(),
-            Map.of(),
-            Set.of(ModelIdempotencyRecord.HTTP_STATUS),
-            Set.of());
-
+      Instant createdAt,
+      Instant expiresAt) {
     try {
-      return datasourceOperations.executeUpdate(update) > 0;
+      Map<String, Object> insertMap = new LinkedHashMap<>();
+      insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
+      insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
+      insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
+      insertMap.put(ModelIdempotencyRecord.PRINCIPAL_HASH, principalHash);
+      insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
+      insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
+      insertMap.put(ModelIdempotencyRecord.CREATED_AT, 
Timestamp.from(createdAt));
+      insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, 
Timestamp.from(expiresAt));
+
+      List<Object> values = insertMap.values().stream().toList();
+      QueryGenerator.PreparedQuery insert =
+          QueryGenerator.generateInsertQuery(
+              ModelIdempotencyRecord.ALL_COLUMNS,
+              ModelIdempotencyRecord.TABLE_NAME,
+              values,
+              realmId);
+      datasourceOperations.executeUpdate(insert);
+      return new RecordResult(RecordResultType.OWNED, Optional.empty());
     } catch (SQLException e) {
-      throw new IdempotencyPersistenceException("Failed to finalize 
idempotency record", e);
+      if (datasourceOperations.isUniquenessConstraintViolation(e)) {
+        return new RecordResult(RecordResultType.DUPLICATE, load(realmId, 
idempotencyKey));
+      }
+      throw new IdempotencyPersistenceException("Failed to record idempotency 
entry", e);
     }

Review Comment:
   On a uniqueness constraint violation you return `DUPLICATE` with `load(...)` 
as the existing record. Under concurrent purge/cleanup (or an insert that later 
rolls back in another transaction), it's possible for `load(...)` to return 
empty even though the insert saw a conflict, which can cascade into an 
`IllegalStateException` in `IdempotencyHandlerSupport` (it assumes `DUPLICATE` 
always includes an existing record). To make this robust, ensure `DUPLICATE` 
always carries a record (e.g., retry load/insert in a bounded loop, or throw a 
dedicated persistence exception if the record can't be retrieved) so handler 
logic never sees a duplicate without an existing row.



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyStoreFactoryProducer.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Any;
+import jakarta.enterprise.inject.Instance;
+import jakarta.enterprise.inject.Produces;
+import jakarta.inject.Singleton;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+
+/**
+ * Produces the active {@link IdempotencyStoreFactory} by selecting the 
registered backend whose
+ * {@link Identifier} matches {@link IdempotencyConfiguration#type()}.
+ */
+@ApplicationScoped
+public class IdempotencyStoreFactoryProducer {
+
+  @Produces
+  @Singleton
+  public IdempotencyStoreFactory idempotencyStoreFactory(
+      IdempotencyConfiguration configuration, @Any 
Instance<IdempotencyStoreFactory> factories) {
+    return factories.select(Identifier.Literal.of(configuration.type())).get();
+  }

Review Comment:
   This producer always selects a backend based on `configuration.type()`, even 
when `configuration.enabled()` is false. That means a deployment can fail at 
startup due to an unavailable/misspelled backend identifier even though 
idempotency is explicitly disabled. Consider short-circuiting when disabled 
(e.g., return a known-safe default/no-op factory) and/or translating an 
unsatisfied selection into a clearer configuration error that only applies when 
`enabled=true`.



##########
runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java:
##########
@@ -202,6 +204,15 @@ public Builder withEventDelegator(boolean 
useEventDelegator) {
       return this;
     }
 
+    /**
+     * Wires a specific {@link IdempotencyHandlerSupport} into the handler and 
adapter. When unset,
+     * idempotency is disabled (the default for the vast majority of tests).
+     */
+    public Builder idempotencySupport(IdempotencyHandlerSupport 
idempotencySupport) {
+      this.idempotencySupport = idempotencySupport;
+      return this;
+    }

Review Comment:
   Builder method naming is inconsistent with nearby `withEventDelegator(...)` 
(and similar fluent setters). To improve discoverability and consistency, 
consider renaming this to `withIdempotencySupport(...)` (or aligning 
surrounding builder method names to a consistent convention).



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -460,15 +478,114 @@ public void authorizeCreateTableDirect(
     }
   }
 
+  /**
+   * Create a table, optionally honoring an {@code Idempotency-Key} from the 
REST request.
+   *
+   * <p>When an idempotency key is supplied and the feature is enabled:
+   *
+   * <ol>
+   *   <li>Authorization runs first, so idempotency cannot bypass it.
+   *   <li>Pre-flight loads any prior record for {@code (realm, key)}. A match 
(same caller, same
+   *       resource binding) replays the response from authoritative catalog 
state; a binding
+   *       mismatch raises 422.
+   *   <li>On a fresh key, the table is created and the record is inserted 
afterwards. A concurrent
+   *       caller that wins the race causes our insert to return DUPLICATE — 
we then replay too, so
+   *       the response is equivalent to what the winner returned.
+   * </ol>
+   *
+   * <p>No response body is stored. Replays go through {@code loadTable +
+   * buildLoadTableResponseWithDelegationCredentials}, which re-vends fresh 
credentials for the
+   * current caller.
+   */
   public LoadTableResponse createTableDirect(
       Namespace namespace,
       CreateTableRequest request,
       EnumSet<AccessDelegationMode> delegationModes,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<String> idempotencyKey) {
 
     authorizeCreateTableDirect(namespace, request, !delegationModes.isEmpty());
     Optional<AccessDelegationMode> resolvedMode = 
resolveAccessDelegationModes(delegationModes);
 
+    if (idempotencyKey.isEmpty() || !idempotencySupport().isEnabled()) {
+      return doCreateTableDirect(namespace, request, resolvedMode, 
refreshCredentialsEndpoint);
+    }
+    return createTableDirectIdempotently(
+        namespace,
+        request,
+        delegationModes,
+        resolvedMode,
+        refreshCredentialsEndpoint,
+        idempotencyKey.get());
+  }
+
+  private LoadTableResponse createTableDirectIdempotently(
+      Namespace namespace,
+      CreateTableRequest request,
+      EnumSet<AccessDelegationMode> delegationModes,
+      Optional<AccessDelegationMode> resolvedMode,
+      Optional<String> refreshCredentialsEndpoint,
+      String idempotencyKey) {
+
+    String operationType = "create-table";
+    String principalHash =
+        idempotencySupport().principalHash(polarisPrincipal(), 
realmContext().getRealmIdentifier());
+    String resourceId =
+        idempotencySupport()
+            .resourceHash(
+                "create-table:"
+                    + namespace.toString()
+                    + ":"
+                    + request.name()
+                    + ":"
+                    + delegationModesToken(delegationModes));

Review Comment:
   The value passed as `normalizedResourceId` is a SHA-256 hash of a derived 
string, but the persistence schema and model documentation describe 
`resource_id` / `normalizedResourceId` as a normalized request-derived 
identifier (typically human-readable). This mismatch makes operational 
debugging and supportability harder (operators can’t inspect bindings) and 
contradicts the comments in `schema-v4.sql` / model docs. Consider either (a) 
storing the canonical, human-readable resource identifier (e.g., 
`catalog/namespace/table + delegation token`) and optionally hashing in a 
separate field, or (b) updating the field naming/docs everywhere to clearly 
indicate it is a binding hash rather than a normalized resource id.



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyMaintenance.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import io.quarkus.runtime.ShutdownEvent;
+import io.quarkus.runtime.StartupEvent;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.vertx.core.Vertx;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
+import org.apache.polaris.service.context.RealmContextConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background maintenance for handler-level idempotency: periodically purges 
expired records.
+ *
+ * <p>On {@link StartupEvent}, registers a Vert.x periodic timer that ticks at 
{@link
+ * IdempotencyConfiguration#purgeInterval()}. Each tick offloads the actual 
store I/O to the worker
+ * pool to avoid blocking the event loop. A non-reentrant flag prevents 
overlapping purges if a tick
+ * takes longer than the configured interval. The timer is cancelled on {@link 
ShutdownEvent}.
+ *
+ * <p>The timer is a no-op unless {@link 
IdempotencyConfiguration#purgeEnabled()} is {@code true}.
+ * In multi-node deployments, enabling purge on all replicas may cause 
unnecessary contention;
+ * operators can either disable it on most replicas or run an external 
scheduled job instead.
+ */
+@ApplicationScoped
+public class IdempotencyMaintenance {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdempotencyMaintenance.class);
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject RealmContextConfiguration realmContextConfiguration;
+  @Inject IdempotencyStoreFactory storeFactory;
+  @Inject Clock clock;
+  @Inject Vertx vertx;
+
+  private volatile Long purgeTimerId;
+  private final AtomicBoolean purgeRunning = new AtomicBoolean(false);
+
+  void onStart(@Observes StartupEvent event) {
+    if (!configuration.enabled() || !configuration.purgeEnabled()) {
+      return;
+    }
+    long intervalMs = configuration.purgeInterval().toMillis();
+    purgeTimerId =
+        vertx.setPeriodic(
+            intervalMs,
+            ignored -> {
+              if (!purgeRunning.compareAndSet(false, true)) {
+                return;
+              }
+              Infrastructure.getDefaultWorkerPool()
+                  .execute(
+                      () -> {
+                        try {
+                          purgeOnce();
+                        } finally {
+                          purgeRunning.set(false);
+                        }
+                      });
+            });
+  }

Review Comment:
   This introduces important operational behavior (periodic purge scheduling + 
non-overlap guard via `purgeRunning`) but there’s no test coverage validating: 
(1) the timer is not registered when `enabled=false` or `purgeEnabled=false`, 
and (2) overlapping ticks are suppressed while a purge is in progress. Adding a 
focused unit test (e.g., with a controlled/fake Vertx scheduler and worker 
execution) would help prevent regressions in purge scheduling and reentrancy 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to