huaxingao commented on code in PR #3803:
URL: https://github.com/apache/polaris/pull/3803#discussion_r2831656111


##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyFilter.java:
##########
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.vertx.core.Vertx;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.Priorities;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.container.ContainerResponseContext;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.SecurityContext;
+import java.lang.reflect.Method;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.resolver.Resolver;
+import org.apache.polaris.core.persistence.resolver.ResolverFactory;
+import org.apache.polaris.core.persistence.resolver.ResolverStatus;
+import org.apache.polaris.service.catalog.CatalogPrefixParser;
+import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter;
+import org.apache.polaris.service.context.RealmContextFilter;
+import org.jboss.resteasy.reactive.server.ServerRequestFilter;
+import org.jboss.resteasy.reactive.server.ServerResponseFilter;
+import org.jboss.resteasy.reactive.server.core.CurrentRequestManager;
+import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
+import org.jboss.resteasy.reactive.server.spi.ResteasyReactiveResourceInfo;
+
+/**
+ * HTTP idempotency integration at the request/response layer.
+ *
+ * <p>For in-scope requests (see {@link IdempotencyConfiguration#scopes()}), 
this filter reserves an
+ * idempotency key before executing the request, and replays the previously 
finalized response when
+ * a duplicate key is received. For owned requests, it finalizes the response 
summary/headers for
+ * future replay.
+ */
+public class IdempotencyFilter {
+
+  private static final String PROP_IDEMPOTENCY_KEY = "idempotency.key";
+  private static final String PROP_IDEMPOTENCY_OPERATION = 
"idempotency.operation";
+  private static final String PROP_IDEMPOTENCY_RESOURCE = 
"idempotency.resource";
+  private static final String PROP_IDEMPOTENCY_OWNED = "idempotency.owned";
+  private static final String PROP_IDEMPOTENCY_HEARTBEAT_TIMER_ID = 
"idempotency.heartbeat.timerId";
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject IdempotencyStore store;
+  @Inject Clock clock;
+  @Inject ObjectMapper objectMapper;
+  @Inject Vertx vertx;
+  @Inject ResolverFactory resolverFactory;
+  @Inject CatalogPrefixParser prefixParser;
+
+  @ServerRequestFilter(priority = Priorities.AUTHORIZATION + 10)
+  public Uni<Response> reserveOrReplay(ContainerRequestContext rc) {
+    if (!configuration.enabled()) {
+      return Uni.createFrom().nullItem();
+    }
+
+    SecurityContext securityContext = rc.getSecurityContext();
+
+    String rawKey = rc.getHeaderString(configuration.keyHeader());
+    if (rawKey == null || rawKey.isBlank()) {
+      return Uni.createFrom().nullItem();
+    }
+    String key = normalizeIdempotencyKey(rawKey);
+    if (key == null) {
+      return Uni.createFrom()
+          .item(
+              error(
+                  400,
+                  "idempotency_key_invalid",
+                  "Idempotency-Key must be a UUIDv7 string (RFC 9562)"));
+    }
+
+    boolean scopedMode = !configuration.scopes().isEmpty();
+    if (!scopedMode) {
+      // Without an explicit scope allowlist, only apply idempotency to 
Iceberg REST catalog
+      // mutating endpoints under /v1/{prefix}/...
+      if (!isMutatingMethod(rc.getMethod()) || !isIcebergMutationEndpoint(rc)) 
{
+        return Uni.createFrom().nullItem();
+      }
+    }
+
+    RealmContext realmContext = (RealmContext) 
rc.getProperty(RealmContextFilter.REALM_CONTEXT_KEY);
+    if (realmContext == null) {
+      // RealmContextFilter should run before this; treat missing realm as a 
server error.
+      return Uni.createFrom().item(error(500, "MissingRealmContext", "Missing 
realm context"));
+    }
+
+    // If scopes are configured, apply idempotency only to matching endpoints 
and use the configured
+    // operationType for stable binding.
+    IdempotencyConfiguration.Scope scope = matchScope(rc);
+    if (scope == null && scopedMode) {
+      return Uni.createFrom().nullItem();
+    }
+
+    PolarisPrincipal principal =
+        securityContext != null && securityContext.getUserPrincipal() 
instanceof PolarisPrincipal p
+            ? p
+            : null;
+    if (principal != null && 
!isPolarisManagedInternalIcebergCatalogRequest(rc, principal)) {
+      // for federated/external catalogs, Polaris may not enforce idempotency 
end-to-end,
+      // so treat Idempotency-Key as a no-op (do not reserve/finalize/replay).
+      return Uni.createFrom().nullItem();
+    }
+
+    String realmId = realmContext.getRealmIdentifier();
+    String operationType = scope == null ? normalizeOperationType(rc) : 
scope.operationType();
+    String resourceId = normalizeResourceId(rc, scope != null);
+
+    Instant now = clock.instant();
+    Instant expiresAt =
+        now.plusSeconds(Math.max(0L, configuration.ttlSeconds()))
+            .plusSeconds(Math.max(0L, configuration.ttlGraceSeconds()));
+
+    final IdempotencyStore.ReserveResult r;
+    try {
+      r =
+          store.reserve(

Review Comment:
   Fixed — moved idempotency store calls off the event-loop (run reserve() on 
the worker pool; dispatch finalize/cancel/heartbeat to the worker pool too)



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyMaintenance.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import io.quarkus.runtime.ShutdownEvent;
+import io.quarkus.runtime.StartupEvent;
+import io.vertx.core.Vertx;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.time.Instant;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.service.context.RealmContextConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Background maintenance for idempotency (purge of expired keys). */
+@ApplicationScoped
+public class IdempotencyMaintenance {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IdempotencyMaintenance.class);
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject RealmContextConfiguration realmContextConfiguration;
+  @Inject IdempotencyStore store;
+  @Inject Clock clock;
+  @Inject Vertx vertx;
+
+  private volatile Long purgeTimerId;
+
+  void onStart(@Observes StartupEvent event) {
+    if (!configuration.enabled() || !configuration.purgeEnabled()) {
+      return;
+    }
+    long intervalMs = Math.max(1000L, configuration.purgeIntervalSeconds() * 
1000L);
+    purgeTimerId = vertx.setPeriodic(intervalMs, ignored -> purgeOnce());

Review Comment:
   Good catch. setPeriodic runs on a Vert.x event-loop thread. I changed it so 
purgeOnce() runs on the worker pool instead, and I added a guard so two purges 
can’t run at the same time.



##########
runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java:
##########
@@ -217,6 +219,20 @@ public MetaStoreManagerFactory metaStoreManagerFactory(
     return 
metaStoreManagerFactories.select(Identifier.Literal.of(config.type())).get();
   }
 
+  @Produces
+  @ApplicationScoped
+  public IdempotencyStoreFactory idempotencyStoreFactory(
+      PersistenceConfiguration config,
+      @Any Instance<IdempotencyStoreFactory> idempotencyFactories) {
+    return 
idempotencyFactories.select(Identifier.Literal.of(config.type())).get();
+  }
+
+  @Produces
+  @ApplicationScoped
+  public IdempotencyStore idempotencyStore(IdempotencyStoreFactory factory) {
+    return factory.create();

Review Comment:
   I saw Polaris uses this factory + @Identifier pattern. I’m following the 
same approach for IdempotencyStoreFactory for consistency.can you reply in 



##########
runtime/service/src/main/java/org/apache/polaris/service/idempotency/IdempotencyFilter.java:
##########
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.idempotency;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.vertx.core.Vertx;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.Priorities;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.container.ContainerResponseContext;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.SecurityContext;
+import java.lang.reflect.Method;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.resolver.Resolver;
+import org.apache.polaris.core.persistence.resolver.ResolverFactory;
+import org.apache.polaris.core.persistence.resolver.ResolverStatus;
+import org.apache.polaris.service.catalog.CatalogPrefixParser;
+import org.apache.polaris.service.catalog.iceberg.IcebergCatalogAdapter;
+import org.apache.polaris.service.context.RealmContextFilter;
+import org.jboss.resteasy.reactive.server.ServerRequestFilter;
+import org.jboss.resteasy.reactive.server.ServerResponseFilter;
+import org.jboss.resteasy.reactive.server.core.CurrentRequestManager;
+import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
+import org.jboss.resteasy.reactive.server.spi.ResteasyReactiveResourceInfo;
+
+/**
+ * HTTP idempotency integration at the request/response layer.
+ *
+ * <p>For in-scope requests (see {@link IdempotencyConfiguration#scopes()}), 
this filter reserves an
+ * idempotency key before executing the request, and replays the previously 
finalized response when
+ * a duplicate key is received. For owned requests, it finalizes the response 
summary/headers for
+ * future replay.
+ */
+public class IdempotencyFilter {
+
+  private static final String PROP_IDEMPOTENCY_KEY = "idempotency.key";
+  private static final String PROP_IDEMPOTENCY_OPERATION = 
"idempotency.operation";
+  private static final String PROP_IDEMPOTENCY_RESOURCE = 
"idempotency.resource";
+  private static final String PROP_IDEMPOTENCY_OWNED = "idempotency.owned";
+  private static final String PROP_IDEMPOTENCY_HEARTBEAT_TIMER_ID = 
"idempotency.heartbeat.timerId";
+
+  @Inject IdempotencyConfiguration configuration;
+  @Inject IdempotencyStore store;
+  @Inject Clock clock;
+  @Inject ObjectMapper objectMapper;
+  @Inject Vertx vertx;
+  @Inject ResolverFactory resolverFactory;
+  @Inject CatalogPrefixParser prefixParser;
+
+  @ServerRequestFilter(priority = Priorities.AUTHORIZATION + 10)
+  public Uni<Response> reserveOrReplay(ContainerRequestContext rc) {
+    if (!configuration.enabled()) {
+      return Uni.createFrom().nullItem();

Review Comment:
   It probably wouldn’t work for our use case. Quarkus discovers 
@ServerRequestFilter at build time, but polaris.idempotency.enabled is a 
runtime config. So we can’t reliably “not register the filter” based on that 
flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to