dimas-b commented on code in PR #4061:
URL: https://github.com/apache/polaris/pull/4061#discussion_r2991079943


##########
runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventMetadataFactory.java:
##########
@@ -100,23 +99,22 @@ private Optional<PolarisPrincipal> getUser() {
   }
 
   /**
-   * Extracts the request ID from the current request context.
+   * Extracts the request ID from the current {@link RequestIdSupplier}, if 
one is resolvable.
    *
-   * <p>Note: we must avoid injecting {@link 
jakarta.ws.rs.container.ContainerRequestContext} here,
-   * because this may cause some tests to fail, e.g. when running with no 
active request scope.
-   *
-   * <p>Using {@code Instance<ContainerRequestContext>} injection doesn't work 
either, because it's
-   * a special bean that always appears resolvable, even when it's not.
+   * <p>On normal HTTP request threads the supplier is backed by the 
request-scoped {@link
+   * org.apache.polaris.service.context.catalog.RequestIdHolder}. On async 
task threads it is
+   * populated by {@link org.apache.polaris.service.task.RequestIdPropagator}.
    */
   private Optional<String> getRequestId() {
-    // See org.jboss.resteasy.reactive.server.injection.ContextProducers
-    ResteasyReactiveRequestContext context = CurrentRequestManager.get();
-    if (context != null) {
-      ContainerRequestContextImpl request = 
context.getContainerRequestContext();
-      String requestId = (String) 
request.getProperty(RequestIdFilter.REQUEST_ID_KEY);
-      return Optional.ofNullable(requestId);
+    if (!requestIdSupplier.isResolvable()) {
+      return Optional.empty();

Review Comment:
   I believe `RequestIdSupplier` should always be resolvable, but actual 
request ID may not be available in all contexts.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventMetadataFactory.java:
##########
@@ -23,24 +23,23 @@
 import io.quarkus.security.identity.CurrentIdentityAssociation;
 import io.quarkus.security.identity.SecurityIdentity;
 import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.ContextNotActiveException;
 import jakarta.enterprise.inject.Instance;
 import jakarta.inject.Inject;
 import java.time.Clock;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.context.RealmContext;
-import org.apache.polaris.service.tracing.RequestIdFilter;
-import org.jboss.resteasy.reactive.server.core.CurrentRequestManager;
-import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
-import org.jboss.resteasy.reactive.server.jaxrs.ContainerRequestContextImpl;
+import org.apache.polaris.core.context.RequestIdSupplier;
 
 @ApplicationScoped
 public class PolarisEventMetadataFactory {
 
   @Inject Clock clock;
   @Inject CurrentIdentityAssociation currentIdentityAssociation;
   @Inject Instance<RealmContext> realmContext;
+  @Inject Instance<RequestIdSupplier> requestIdSupplier;

Review Comment:
   Why does it have to be an `Instance`?



##########
runtime/service/src/main/java/org/apache/polaris/service/task/PrincipalContextPropagator.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.ContextNotActiveException;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import org.apache.polaris.core.auth.ImmutablePolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Propagates the authenticated principal across the async task boundary via 
{@link
+ * PolarisPrincipalHolder}.
+ *
+ * <p>A clone of the principal is captured at submission time so the task 
thread uses a stable
+ * snapshot that is independent of the originating request scope's lifecycle.
+ */
+@ApplicationScoped
+public class PrincipalContextPropagator implements AsyncContextPropagator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PrincipalContextPropagator.class);
+
+  private final PolarisPrincipalHolder polarisPrincipalHolder;
+  private final Instance<PolarisPrincipal> polarisPrincipal;
+
+  @SuppressWarnings("unused") // Required by CDI
+  protected PrincipalContextPropagator() {
+    this(null, null);
+  }
+
+  @Inject
+  public PrincipalContextPropagator(
+      PolarisPrincipalHolder polarisPrincipalHolder, 
Instance<PolarisPrincipal> polarisPrincipal) {
+    this.polarisPrincipalHolder = polarisPrincipalHolder;
+    this.polarisPrincipal = polarisPrincipal;
+  }
+
+  @Nullable
+  @Override
+  public Object capture() {
+    PolarisPrincipal clone = null;
+    if (polarisPrincipal.isResolvable()) {
+      try {
+        // Clone to allow task thread get a stable snapshot regardless of the 
request scope lifecycle.
+        clone = 
ImmutablePolarisPrincipal.builder().from(polarisPrincipal.get()).build();
+      } catch (ContextNotActiveException e) {
+        // scope not active, return null
+      }
+    }
+    LOGGER.trace("capture principal={}", clone != null ? clone.getName() : 
null);
+    return clone;
+  }
+
+  @Override
+  public AutoCloseable restore(@Nullable Object capturedState) {
+    LOGGER.trace("restore principal={}", capturedState != null ? 
((PolarisPrincipal) capturedState).getName() : null);
+    if (capturedState != null) {
+      polarisPrincipalHolder.set((PolarisPrincipal) capturedState);

Review Comment:
   Would it be possible to use generics and avoid arbitrary type casts here?



##########
CHANGELOG.md:
##########
@@ -47,6 +47,7 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
 
 ### New Features
 
+- Request IDs are now propagated to async task threads via CDI-injectable 
`RequestIdSupplier`, so task log messages carry the originating request's ID. 
Context propagation across task boundaries is now handled by the pluggable 
`AsyncContextPropagator` SPI.

Review Comment:
   nit: I tend to think "New Features" is for end users. This message is 
targeted to downstream developers, so it might belong in "Changes" instead.



##########
runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java:
##########
@@ -70,22 +67,23 @@ public class TaskExecutorImpl implements TaskExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskExecutorImpl.class);
   private static final long TASK_RETRY_DELAY = 1000;
 
+  /** Pairs an {@link AsyncContextPropagator} with the opaque state it 
captured. */
+  record CapturedPropagator(AsyncContextPropagator propagator, @Nullable 
Object state) {}
+
   private final Executor executor;
   private final Clock clock;
   private final MetaStoreManagerFactory metaStoreManagerFactory;
   private final TaskFileIOSupplier fileIOSupplier;
-  private final RealmContextHolder realmContextHolder;
-  private final PolarisPrincipalHolder polarisPrincipalHolder;
-  private final PolarisPrincipal polarisPrincipal;
   private final List<TaskHandler> taskHandlers = new CopyOnWriteArrayList<>();
   private final Optional<TriConsumer<Long, Boolean, Throwable>> errorHandler;
   private final PolarisEventDispatcher polarisEventDispatcher;
   private final PolarisEventMetadataFactory eventMetadataFactory;
   @Nullable private final Tracer tracer;
+  private final Instance<AsyncContextPropagator> contextPropagators;

Review Comment:
   Why not inject a `List<AsyncContextPropagator>`?



##########
runtime/service/src/main/java/org/apache/polaris/service/task/PrincipalContextPropagator.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.ContextNotActiveException;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import org.apache.polaris.core.auth.ImmutablePolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Propagates the authenticated principal across the async task boundary via 
{@link
+ * PolarisPrincipalHolder}.
+ *
+ * <p>A clone of the principal is captured at submission time so the task 
thread uses a stable
+ * snapshot that is independent of the originating request scope's lifecycle.
+ */
+@ApplicationScoped
+public class PrincipalContextPropagator implements AsyncContextPropagator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PrincipalContextPropagator.class);
+
+  private final PolarisPrincipalHolder polarisPrincipalHolder;
+  private final Instance<PolarisPrincipal> polarisPrincipal;
+
+  @SuppressWarnings("unused") // Required by CDI
+  protected PrincipalContextPropagator() {
+    this(null, null);
+  }
+
+  @Inject
+  public PrincipalContextPropagator(
+      PolarisPrincipalHolder polarisPrincipalHolder, 
Instance<PolarisPrincipal> polarisPrincipal) {
+    this.polarisPrincipalHolder = polarisPrincipalHolder;
+    this.polarisPrincipal = polarisPrincipal;
+  }
+
+  @Nullable
+  @Override
+  public Object capture() {
+    PolarisPrincipal clone = null;
+    if (polarisPrincipal.isResolvable()) {

Review Comment:
   `PolarisPrincipal` should always be resolvable in contexts that initiate 
principal propagation. If it is not, it would be a bug in Polaris AuthN.



##########
runtime/service/src/main/java/org/apache/polaris/service/task/RequestIdPropagator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.ContextNotActiveException;
+import jakarta.inject.Inject;
+import org.apache.polaris.service.context.catalog.RequestIdHolder;
+import org.apache.polaris.service.tracing.RequestIdFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Propagates the request ID across the async task boundary.
+ *
+ * <p>At capture time the request ID is read from {@link RequestIdHolder}, 
which is populated by
+ * {@code RequestIdFilter} on HTTP request threads and by this propagator's 
{@link #restore} path
+ * on task threads (enabling nested task submission).
+ *
+ * <p>At restore time the ID is written to both the {@link RequestIdHolder} 
(so that {@code
+ * RequestIdSupplier} works in task threads) and to the SLF4J MDC (so that log 
messages emitted by
+ * the task carry the originating request ID).
+ *
+ * <p>MDC cleanup is performed by the returned {@link AutoCloseable} so that 
thread-pool threads are
+ * left in a clean state after the task completes.
+ */
+@ApplicationScoped
+public class RequestIdPropagator implements AsyncContextPropagator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RequestIdPropagator.class);
+
+  private final RequestIdHolder requestIdHolder;
+
+  @SuppressWarnings("unused") // Required by CDI
+  protected RequestIdPropagator() {
+    this(null);
+  }
+
+  @Inject
+  public RequestIdPropagator(RequestIdHolder requestIdHolder) {
+    this.requestIdHolder = requestIdHolder;
+  }
+
+  @Nullable
+  @Override
+  public Object capture() {
+    String id = null;
+    try {
+      id = requestIdHolder.get();
+    } catch (ContextNotActiveException e) {
+      // scope not active, return null
+    }
+    LOGGER.trace("capture requestId={}", id);
+    return id;
+  }
+
+  @Override
+  public AutoCloseable restore(@Nullable Object capturedState) {
+    String requestId = (String) capturedState;
+    LOGGER.trace("restore requestId={}", requestId);
+    requestIdHolder.set(requestId);
+
+    if (requestId == null) {
+      return () -> {};
+    }
+
+    String previous = MDC.get(RequestIdFilter.REQUEST_ID_KEY);

Review Comment:
   Why rely on MDC (thread locals essentially), when we can leverage CDI-based 
injection that is aware of async processing?



##########
runtime/service/src/main/java/org/apache/polaris/service/task/RealmContextPropagator.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.ContextNotActiveException;
+import jakarta.inject.Inject;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.service.context.catalog.RealmContextHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Propagates the realm context across the async task boundary via {@link 
RealmContextHolder}.
+ *
+ * <p>The full {@link RealmContext} object is captured — not just the realm 
identifier string — so
+ * that vendor-specific {@code RealmContext} implementations (which may carry 
additional routing
+ * information) survive the async boundary intact.
+ *
+ * <p>At capture time {@link RealmContextHolder} is request-scoped; its CDI 
proxy resolves to the
+ * holder in the currently active request scope. On a normal HTTP request 
thread that scope holds the
+ * realm set by the request filter. When an async task handler schedules a 
follow-up task (no active
+ * JAX-RS request), {@code RealmContextHolder} in that task's scope already 
contains the realm
+ * restored by this propagator's {@link #restore} path, so capture continues 
to work correctly for
+ * nested task submission.
+ */
+@ApplicationScoped
+public class RealmContextPropagator implements AsyncContextPropagator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealmContextPropagator.class);
+
+  private final RealmContextHolder realmContextHolder;
+
+  @SuppressWarnings("unused") // Required by CDI
+  protected RealmContextPropagator() {
+    this(null);
+  }
+
+  @Inject
+  public RealmContextPropagator(RealmContextHolder realmContextHolder) {
+    this.realmContextHolder = realmContextHolder;
+  }
+
+  @Nullable
+  @Override
+  public Object capture() {

Review Comment:
   I believe we do not really need to expose the "value" object to callers.
   
   The propagator can handle state internally and expose an action object to 
caller. For example:
   
   ```
   State s = p.capture();
   // switch to a different CDI context
   s.restore(); // calls realmContextHolder.set(...)
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java:
##########
@@ -159,27 +153,32 @@ public void addTaskHandlerContext(long taskEntityId, 
CallContext callContext) {
 
     // Capture the metadata now in order to capture the principal and request 
ID, if any.
     PolarisEventMetadata eventMetadata = eventMetadataFactory.create();
-    tryHandleTask(taskEntityId, clone, eventMetadata, null, 1);
+
+    // Capture request-scoped context for propagation into the task thread.
+    // Each propagator independently snapshots its own piece of context.
+    List<CapturedPropagator> captured = new ArrayList<>();
+    for (AsyncContextPropagator propagator : contextPropagators) {
+      captured.add(new CapturedPropagator(propagator, propagator.capture()));

Review Comment:
   It looks like handing multiple `AsyncContextPropagator` deserves a helper 
class, so that callers like this `TaskExecutorImpl` only need to deal with one 
object (for simplicity). WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to