dimas-b commented on code in PR #4269:
URL: https://github.com/apache/polaris/pull/4269#discussion_r3235517648


##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java:
##########
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.polaris.persistence.relational.jdbc.idempotency;
-
-import jakarta.annotation.Nonnull;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import javax.sql.DataSource;
-import org.apache.polaris.core.entity.IdempotencyRecord;
-import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
-import org.apache.polaris.core.persistence.IdempotencyStore;
-import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
-import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
-import 
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
-import org.apache.polaris.persistence.relational.jdbc.models.Converter;
-import 
org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
-
-public class RelationalJdbcIdempotencyStore implements IdempotencyStore {

Review Comment:
   I actually think it is good to keep idempotency code in a separate class and 
avoid mixing it into `JdbcBasePersistenceImpl`.
   
   This may become more relevant if my point about having a separate 
`IdempotencyPersistence` factory is accepted.



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java:
##########
@@ -161,6 +162,13 @@ public synchronized TransactionalPersistence 
getOrCreateSession(RealmContext rea
     return sessionSupplierMap.get(realmContext.getRealmIdentifier()).get();
   }
 
+  @Override
+  public synchronized IdempotencyPersistence getOrCreateIdempotencyPersistence(
+      RealmContext realmContext) {
+    return idempotencyPersistenceMap.computeIfAbsent(
+        realmContext.getRealmIdentifier(), k -> new 
InMemoryIdempotencyPersistence());

Review Comment:
   I'm not sure this is correct. A `LocalPolarisMetaStoreManagerFactory` may 
have sub-classes that a not "in memory" (e.g. in downstream projects).
   
   While overriding this method is possible in such cases, the default 
behaviour would not be correct.
   
   This is another point for introducing a separate factory for 
`IdempotencyPersistence`.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java:
##########
@@ -50,115 +52,17 @@ public record IdempotencyRecord(
     String idempotencyKey,
     String operationType,
     String normalizedResourceId,
+    String principalHash,

Review Comment:
   Since this class existed before, and current PR concentrates on integrating 
idempotency support into Polaris services, would you mind extracting functional 
changes into a new PR to be merged before this PR?
   
   I believe this will make reviews easier for isolating functional changes 
from infrastructural changes.



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/MetaStoreManagerFactory.java:
##########
@@ -34,6 +34,14 @@ public interface MetaStoreManagerFactory {
 
   BasePersistence getOrCreateSession(RealmContext realmContext);
 
+  /**
+   * Returns the per-realm {@link IdempotencyPersistence}. Implementations are 
free to back this
+   * with the same storage as {@link #getOrCreateSession(RealmContext)} or 
with a different one
+   * (e.g. an in-memory store for dev mode). Backends that do not support 
handler-level idempotency
+   * should throw {@link UnsupportedOperationException}.
+   */
+  IdempotencyPersistence getOrCreateIdempotencyPersistence(RealmContext 
realmContext);

Review Comment:
   Let's introduce a separate factory for `IdempotencyPersistence`.
   
   I do believe it is functionally not related to the "Metastore" Manager.



##########
polaris-core/src/test/java/org/apache/polaris/core/persistence/IdempotencyPersistenceSpiTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.junit.jupiter.api.Test;
+
+/**
+ * SPI conformance tests for {@link IdempotencyPersistence} implementations 
against the in-memory
+ * implementation. The same scenarios are covered against the JDBC 
implementation in {@code
+ * RelationalJdbcIdempotencyPersistencePostgresIT}.

Review Comment:
   `RelationalJdbcIdempotencyPersistencePostgresIT` does not exist, does it?



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.HttpHeaders;
+import java.net.InetAddress;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyPersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reusable helper for handler-level idempotency.
+ *
+ * <p>Encapsulates the parts that are independent of any specific handler 
method: idempotency-key
+ * validation (UUID v7), principal/resource hashing, executor id resolution, 
polling for in-progress
+ * duplicates, and thin wrappers around the {@link IdempotencyPersistence} 
reserve / cancel /
+ * finalize operations.
+ *
+ * <p>All configuration (whether the feature is on, header name, executor 
identity, TTLs,
+ * in-progress wait, lease TTL) comes from {@link IdempotencyConfiguration} 
via CDI as a single
+ * deployment-wide source. Per-realm or per-catalog overrides are 
intentionally not modelled in this
+ * iteration; if operators need them later they can be added without changing 
handler-side call
+ * shapes.
+ *
+ * <p>The actual persistence is sourced per-realm from {@link
+ * 
MetaStoreManagerFactory#getOrCreateIdempotencyPersistence(org.apache.polaris.core.context.RealmContext)},
+ * which each backend implements independently of {@link
+ * org.apache.polaris.core.persistence.BasePersistence}.
+ *
+ * <p>The handler decides what to do on each {@link Outcome}: for {@link 
Outcome#owned()} it
+ * executes the operation and finalizes; for {@link 
Outcome#duplicate(IdempotencyRecord)} it
+ * rebuilds the response from authoritative state (no stored response replay).
+ */
+@ApplicationScoped
+public class IdempotencyHandlerSupport {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdempotencyHandlerSupport.class);
+
+  // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version.
+  private static final Pattern UUID_V7_PATTERN =
+      Pattern.compile(
+          
"^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
+          Pattern.CASE_INSENSITIVE);
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject MetaStoreManagerFactory metaStoreManagerFactory;
+  @Inject Clock clock;
+
+  private final AtomicReference<String> resolvedExecutorId = new 
AtomicReference<>();
+
+  /**
+   * Test-only factory that builds an instance without going through CDI. 
Tests pass a {@code
+   * persistenceLookup} function returning the {@link IdempotencyPersistence} 
for a given realm id;
+   * production code goes through {@link 
MetaStoreManagerFactory#getOrCreateSession}.
+   */
+  public static IdempotencyHandlerSupport forTesting(
+      IdempotencyConfiguration configuration,
+      Function<String, IdempotencyPersistence> persistenceLookup,
+      Clock clock) {
+    IdempotencyHandlerSupport support = new IdempotencyHandlerSupport();
+    support.configuration = configuration;
+    support.clock = clock;
+    support.testPersistenceLookup = persistenceLookup;
+    return support;
+  }
+
+  // Test-only override; null in production.
+  private Function<String, IdempotencyPersistence> testPersistenceLookup;
+
+  /** Returns true if handler-level idempotency is enabled. */
+  public boolean isEnabled() {
+    return configuration.enabled();

Review Comment:
   I believe it is preferable to control idempotency-related call paths in a 
way that takes actual java code into account. For example, if the idempontency 
SPI is not implemented, it should not be called even if configuration enables 
it.
   
   This can be achieved via a dedicated factory (injected by CDI). The factory 
could provide a this flag in a way consistent with impl. code.
   
   Configuration would be done in a way similar to `PolarisAuthorizerFactory`.



##########
polaris-core/src/test/java/org/apache/polaris/core/persistence/IdempotencyPersistenceSpiTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Optional;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.junit.jupiter.api.Test;
+
+/**
+ * SPI conformance tests for {@link IdempotencyPersistence} implementations 
against the in-memory
+ * implementation. The same scenarios are covered against the JDBC 
implementation in {@code

Review Comment:
   Is this is meant to be a conformance test suite, should it not be shared 
among backend-specific implementations (rather than copied)?



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java:
##########
@@ -53,9 +53,9 @@ public String getDisplayName() {
    */
   public int getLatestSchemaVersion() {
     return switch (this) {
-      case POSTGRES -> 4; // PostgreSQL has schemas v1, v2, v3, v4
-      case COCKROACHDB -> 4; // CockroachDB schema version kept in sync with 
PostgreSQL
-      case H2 -> 4; // H2 uses same schemas as PostgreSQL
+      case POSTGRES -> 5; // PostgreSQL has schemas v1, v2, v3, v4, v5

Review Comment:
   This is ok, but it might be best to wait until 1.5.0 is branched to avoid 
major schema changes just before the release... WDYT?



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistence.java:
##########
@@ -84,30 +103,38 @@ enum HeartbeatResult {
   record ReserveResult(ReserveResultType type, Optional<IdempotencyRecord> 
existing) {}
 
   /**
-   * Attempts to reserve an idempotency key for a given operation and resource.
+   * Attempts to reserve an idempotency key for a given operation, resource, 
and caller.
    *
    * <p>If no record exists yet, the implementation should create a new 
reservation owned by {@code
    * executorId}. If a record already exists, the implementation should return 
{@link
-   * ReserveResultType#DUPLICATE} along with the existing record.
+   * ReserveResultType#DUPLICATE} along with the existing record. The caller 
is responsible for
+   * comparing the existing record's {@code principalHash} and {@code 
normalizedResourceId} against
+   * the current request and rejecting mismatches as conflicts.
    *
    * @param realmId logical tenant or realm identifier
    * @param idempotencyKey application-provided idempotency key
-   * @param operationType logical operation name (e.g., {@code "commit-table"})
+   * @param operationType logical operation name (e.g., {@code "create-table"})
    * @param normalizedResourceId normalized identifier of the affected resource
+   * @param principalHash hash of the caller principal identity (e.g., {@code 
SHA256(name + ":" +
+   *     realmId)}); persisted so replay can verify the same caller
    * @param expiresAt timestamp after which the reservation is considered 
expired
    * @param executorId identifier of the caller attempting the reservation
    * @param now timestamp representing the current time
    * @return {@link ReserveResult} describing whether the caller owns the 
reservation or hit a
    *     duplicate
    */
-  ReserveResult reserve(
+  default ReserveResult reserve(
       String realmId,
       String idempotencyKey,
       String operationType,
       String normalizedResourceId,
+      String principalHash,
       Instant expiresAt,
       String executorId,
-      Instant now);
+      Instant now) {
+    throw new UnsupportedOperationException(

Review Comment:
   Even in this PR, the `default` qualifier does not appear to be necessary... 
Removing it would resolve my concern 🙂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to