dimas-b commented on code in PR #4269: URL: https://github.com/apache/polaris/pull/4269#discussion_r3235517648
########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java: ########## @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.polaris.persistence.relational.jdbc.idempotency; - -import jakarta.annotation.Nonnull; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import javax.sql.DataSource; -import org.apache.polaris.core.entity.IdempotencyRecord; -import org.apache.polaris.core.persistence.IdempotencyPersistenceException; -import org.apache.polaris.core.persistence.IdempotencyStore; -import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations; -import org.apache.polaris.persistence.relational.jdbc.QueryGenerator; -import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration; -import org.apache.polaris.persistence.relational.jdbc.models.Converter; -import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord; - -public class RelationalJdbcIdempotencyStore implements IdempotencyStore { Review Comment: I actually think it is good to keep idempotency code in a separate class and avoid mixing it into `JdbcBasePersistenceImpl`. This may become more relevant if my point about having a separate `IdempotencyPersistence` factory is accepted. ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java: ########## @@ -161,6 +162,13 @@ public synchronized TransactionalPersistence getOrCreateSession(RealmContext rea return sessionSupplierMap.get(realmContext.getRealmIdentifier()).get(); } + @Override + public synchronized IdempotencyPersistence getOrCreateIdempotencyPersistence( + RealmContext realmContext) { + return idempotencyPersistenceMap.computeIfAbsent( + realmContext.getRealmIdentifier(), k -> new InMemoryIdempotencyPersistence()); Review Comment: I'm not sure this is correct. A `LocalPolarisMetaStoreManagerFactory` may have sub-classes that a not "in memory" (e.g. in downstream projects). While overriding this method is possible in such cases, the default behaviour would not be correct. This is another point for introducing a separate factory for `IdempotencyPersistence`. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java: ########## @@ -50,115 +52,17 @@ public record IdempotencyRecord( String idempotencyKey, String operationType, String normalizedResourceId, + String principalHash, Review Comment: Since this class existed before, and current PR concentrates on integrating idempotency support into Polaris services, would you mind extracting functional changes into a new PR to be merged before this PR? I believe this will make reviews easier for isolating functional changes from infrastructural changes. ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/MetaStoreManagerFactory.java: ########## @@ -34,6 +34,14 @@ public interface MetaStoreManagerFactory { BasePersistence getOrCreateSession(RealmContext realmContext); + /** + * Returns the per-realm {@link IdempotencyPersistence}. Implementations are free to back this + * with the same storage as {@link #getOrCreateSession(RealmContext)} or with a different one + * (e.g. an in-memory store for dev mode). Backends that do not support handler-level idempotency + * should throw {@link UnsupportedOperationException}. + */ + IdempotencyPersistence getOrCreateIdempotencyPersistence(RealmContext realmContext); Review Comment: Let's introduce a separate factory for `IdempotencyPersistence`. I do believe it is functionally not related to the "Metastore" Manager. ########## polaris-core/src/test/java/org/apache/polaris/core/persistence/IdempotencyPersistenceSpiTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.core.persistence; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.junit.jupiter.api.Test; + +/** + * SPI conformance tests for {@link IdempotencyPersistence} implementations against the in-memory + * implementation. The same scenarios are covered against the JDBC implementation in {@code + * RelationalJdbcIdempotencyPersistencePostgresIT}. Review Comment: `RelationalJdbcIdempotencyPersistencePostgresIT` does not exist, does it? ########## runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyHandlerSupport.java: ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.idempotency; + +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.core.HttpHeaders; +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Locale; +import java.util.Optional; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.regex.Pattern; +import org.apache.polaris.core.auth.PolarisPrincipal; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.apache.polaris.core.persistence.IdempotencyPersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reusable helper for handler-level idempotency. + * + * <p>Encapsulates the parts that are independent of any specific handler method: idempotency-key + * validation (UUID v7), principal/resource hashing, executor id resolution, polling for in-progress + * duplicates, and thin wrappers around the {@link IdempotencyPersistence} reserve / cancel / + * finalize operations. + * + * <p>All configuration (whether the feature is on, header name, executor identity, TTLs, + * in-progress wait, lease TTL) comes from {@link IdempotencyConfiguration} via CDI as a single + * deployment-wide source. Per-realm or per-catalog overrides are intentionally not modelled in this + * iteration; if operators need them later they can be added without changing handler-side call + * shapes. + * + * <p>The actual persistence is sourced per-realm from {@link + * MetaStoreManagerFactory#getOrCreateIdempotencyPersistence(org.apache.polaris.core.context.RealmContext)}, + * which each backend implements independently of {@link + * org.apache.polaris.core.persistence.BasePersistence}. + * + * <p>The handler decides what to do on each {@link Outcome}: for {@link Outcome#owned()} it + * executes the operation and finalizes; for {@link Outcome#duplicate(IdempotencyRecord)} it + * rebuilds the response from authoritative state (no stored response replay). + */ +@ApplicationScoped +public class IdempotencyHandlerSupport { + + private static final Logger LOGGER = LoggerFactory.getLogger(IdempotencyHandlerSupport.class); + + // RFC 9562 UUID v7 has version nibble 7 in time_hi_and_version. + private static final Pattern UUID_V7_PATTERN = + Pattern.compile( + "^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", + Pattern.CASE_INSENSITIVE); + + @Inject IdempotencyConfiguration configuration; + @Inject MetaStoreManagerFactory metaStoreManagerFactory; + @Inject Clock clock; + + private final AtomicReference<String> resolvedExecutorId = new AtomicReference<>(); + + /** + * Test-only factory that builds an instance without going through CDI. Tests pass a {@code + * persistenceLookup} function returning the {@link IdempotencyPersistence} for a given realm id; + * production code goes through {@link MetaStoreManagerFactory#getOrCreateSession}. + */ + public static IdempotencyHandlerSupport forTesting( + IdempotencyConfiguration configuration, + Function<String, IdempotencyPersistence> persistenceLookup, + Clock clock) { + IdempotencyHandlerSupport support = new IdempotencyHandlerSupport(); + support.configuration = configuration; + support.clock = clock; + support.testPersistenceLookup = persistenceLookup; + return support; + } + + // Test-only override; null in production. + private Function<String, IdempotencyPersistence> testPersistenceLookup; + + /** Returns true if handler-level idempotency is enabled. */ + public boolean isEnabled() { + return configuration.enabled(); Review Comment: I believe it is preferable to control idempotency-related call paths in a way that takes actual java code into account. For example, if the idempontency SPI is not implemented, it should not be called even if configuration enables it. This can be achieved via a dedicated factory (injected by CDI). The factory could provide a this flag in a way consistent with impl. code. Configuration would be done in a way similar to `PolarisAuthorizerFactory`. ########## polaris-core/src/test/java/org/apache/polaris/core/persistence/IdempotencyPersistenceSpiTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.core.persistence; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import org.apache.polaris.core.entity.IdempotencyRecord; +import org.junit.jupiter.api.Test; + +/** + * SPI conformance tests for {@link IdempotencyPersistence} implementations against the in-memory + * implementation. The same scenarios are covered against the JDBC implementation in {@code Review Comment: Is this is meant to be a conformance test suite, should it not be shared among backend-specific implementations (rather than copied)? ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java: ########## @@ -53,9 +53,9 @@ public String getDisplayName() { */ public int getLatestSchemaVersion() { return switch (this) { - case POSTGRES -> 4; // PostgreSQL has schemas v1, v2, v3, v4 - case COCKROACHDB -> 4; // CockroachDB schema version kept in sync with PostgreSQL - case H2 -> 4; // H2 uses same schemas as PostgreSQL + case POSTGRES -> 5; // PostgreSQL has schemas v1, v2, v3, v4, v5 Review Comment: This is ok, but it might be best to wait until 1.5.0 is branched to avoid major schema changes just before the release... WDYT? ########## polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistence.java: ########## @@ -84,30 +103,38 @@ enum HeartbeatResult { record ReserveResult(ReserveResultType type, Optional<IdempotencyRecord> existing) {} /** - * Attempts to reserve an idempotency key for a given operation and resource. + * Attempts to reserve an idempotency key for a given operation, resource, and caller. * * <p>If no record exists yet, the implementation should create a new reservation owned by {@code * executorId}. If a record already exists, the implementation should return {@link - * ReserveResultType#DUPLICATE} along with the existing record. + * ReserveResultType#DUPLICATE} along with the existing record. The caller is responsible for + * comparing the existing record's {@code principalHash} and {@code normalizedResourceId} against + * the current request and rejecting mismatches as conflicts. * * @param realmId logical tenant or realm identifier * @param idempotencyKey application-provided idempotency key - * @param operationType logical operation name (e.g., {@code "commit-table"}) + * @param operationType logical operation name (e.g., {@code "create-table"}) * @param normalizedResourceId normalized identifier of the affected resource + * @param principalHash hash of the caller principal identity (e.g., {@code SHA256(name + ":" + + * realmId)}); persisted so replay can verify the same caller * @param expiresAt timestamp after which the reservation is considered expired * @param executorId identifier of the caller attempting the reservation * @param now timestamp representing the current time * @return {@link ReserveResult} describing whether the caller owns the reservation or hit a * duplicate */ - ReserveResult reserve( + default ReserveResult reserve( String realmId, String idempotencyKey, String operationType, String normalizedResourceId, + String principalHash, Instant expiresAt, String executorId, - Instant now); + Instant now) { + throw new UnsupportedOperationException( Review Comment: Even in this PR, the `default` qualifier does not appear to be necessary... Removing it would resolve my concern 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
