flyingImer commented on code in PR #4061:
URL: https://github.com/apache/polaris/pull/4061#discussion_r2991322121


##########
runtime/service/src/main/java/org/apache/polaris/service/task/PrincipalContextPropagator.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.ContextNotActiveException;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import org.apache.polaris.core.auth.ImmutablePolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Propagates the authenticated principal across the async task boundary via 
{@link
+ * PolarisPrincipalHolder}.
+ *
+ * <p>A clone of the principal is captured at submission time so the task 
thread uses a stable
+ * snapshot that is independent of the originating request scope's lifecycle.
+ */
+@ApplicationScoped
+public class PrincipalContextPropagator implements AsyncContextPropagator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PrincipalContextPropagator.class);
+
+  private final PolarisPrincipalHolder polarisPrincipalHolder;
+  private final Instance<PolarisPrincipal> polarisPrincipal;
+
+  @SuppressWarnings("unused") // Required by CDI
+  protected PrincipalContextPropagator() {
+    this(null, null);
+  }
+
+  @Inject
+  public PrincipalContextPropagator(
+      PolarisPrincipalHolder polarisPrincipalHolder, 
Instance<PolarisPrincipal> polarisPrincipal) {
+    this.polarisPrincipalHolder = polarisPrincipalHolder;
+    this.polarisPrincipal = polarisPrincipal;
+  }
+
+  @Nullable
+  @Override
+  public Object capture() {
+    PolarisPrincipal clone = null;
+    if (polarisPrincipal.isResolvable()) {
+      try {
+        // Clone to allow task thread get a stable snapshot regardless of the 
request scope lifecycle.
+        clone = 
ImmutablePolarisPrincipal.builder().from(polarisPrincipal.get()).build();
+      } catch (ContextNotActiveException e) {
+        // scope not active, return null
+      }
+    }
+    LOGGER.trace("capture principal={}", clone != null ? clone.getName() : 
null);
+    return clone;
+  }
+
+  @Override
+  public AutoCloseable restore(@Nullable Object capturedState) {
+    LOGGER.trace("restore principal={}", capturedState != null ? 
((PolarisPrincipal) capturedState).getName() : null);
+    if (capturedState != null) {
+      polarisPrincipalHolder.set((PolarisPrincipal) capturedState);

Review Comment:
   If we adopt the State pattern from your earlier comment, the raw Object 
disappears from the public API entirely i.e., each state object handles its own 
type internally. I believe that addresses this concern without adding generics 
to the interface.
   
   On the other hand, adding AsyncContextPropagator<T> would also work, but CDI 
Instance iteration loses generic type parameters, so the caller would still 
work with AsyncContextPropagator<?>, the type safety gain would only be within 
each implementation's own capture/restore pair.
   
   Happy to revisit if you see a case where explicit generics would still add 
value after the State refactoring.



##########
runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java:
##########
@@ -70,22 +67,23 @@ public class TaskExecutorImpl implements TaskExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskExecutorImpl.class);
   private static final long TASK_RETRY_DELAY = 1000;
 
+  /** Pairs an {@link AsyncContextPropagator} with the opaque state it 
captured. */
+  record CapturedPropagator(AsyncContextPropagator propagator, @Nullable 
Object state) {}
+
   private final Executor executor;
   private final Clock clock;
   private final MetaStoreManagerFactory metaStoreManagerFactory;
   private final TaskFileIOSupplier fileIOSupplier;
-  private final RealmContextHolder realmContextHolder;
-  private final PolarisPrincipalHolder polarisPrincipalHolder;
-  private final PolarisPrincipal polarisPrincipal;
   private final List<TaskHandler> taskHandlers = new CopyOnWriteArrayList<>();
   private final Optional<TriConsumer<Long, Boolean, Throwable>> errorHandler;
   private final PolarisEventDispatcher polarisEventDispatcher;
   private final PolarisEventMetadataFactory eventMetadataFactory;
   @Nullable private final Tracer tracer;
+  private final Instance<AsyncContextPropagator> contextPropagators;

Review Comment:
   IIUC, Instance<T> is the standard CDI programmatic lookup while List<T> 
injection is a Quarkus extension not part of the CDI spec. Went with Instance 
for portability, but happy to switch if the project prefers List. Any 
preference?



##########
runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java:
##########
@@ -159,27 +153,32 @@ public void addTaskHandlerContext(long taskEntityId, 
CallContext callContext) {
 
     // Capture the metadata now in order to capture the principal and request 
ID, if any.
     PolarisEventMetadata eventMetadata = eventMetadataFactory.create();
-    tryHandleTask(taskEntityId, clone, eventMetadata, null, 1);
+
+    // Capture request-scoped context for propagation into the task thread.
+    // Each propagator independently snapshots its own piece of context.
+    List<CapturedPropagator> captured = new ArrayList<>();
+    for (AsyncContextPropagator propagator : contextPropagators) {
+      captured.add(new CapturedPropagator(propagator, propagator.capture()));

Review Comment:
   Agreed, this deserves a cleaner abstraction. I think this pairs naturally 
with the State pattern discussion above. The helper shape depends on whether we 
go with State.restore() or the current capture()/restore(Object) contract.
   
   I'd prefer to bundle this with that follow-up so we refactor once rather 
than twice. Sound reasonable?



##########
runtime/service/src/main/java/org/apache/polaris/service/task/RequestIdPropagator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import jakarta.annotation.Nullable;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.ContextNotActiveException;
+import jakarta.inject.Inject;
+import org.apache.polaris.service.context.catalog.RequestIdHolder;
+import org.apache.polaris.service.tracing.RequestIdFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Propagates the request ID across the async task boundary.
+ *
+ * <p>At capture time the request ID is read from {@link RequestIdHolder}, 
which is populated by
+ * {@code RequestIdFilter} on HTTP request threads and by this propagator's 
{@link #restore} path
+ * on task threads (enabling nested task submission).
+ *
+ * <p>At restore time the ID is written to both the {@link RequestIdHolder} 
(so that {@code
+ * RequestIdSupplier} works in task threads) and to the SLF4J MDC (so that log 
messages emitted by
+ * the task carry the originating request ID).
+ *
+ * <p>MDC cleanup is performed by the returned {@link AutoCloseable} so that 
thread-pool threads are
+ * left in a clean state after the task completes.
+ */
+@ApplicationScoped
+public class RequestIdPropagator implements AsyncContextPropagator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RequestIdPropagator.class);
+
+  private final RequestIdHolder requestIdHolder;
+
+  @SuppressWarnings("unused") // Required by CDI
+  protected RequestIdPropagator() {
+    this(null);
+  }
+
+  @Inject
+  public RequestIdPropagator(RequestIdHolder requestIdHolder) {
+    this.requestIdHolder = requestIdHolder;
+  }
+
+  @Nullable
+  @Override
+  public Object capture() {
+    String id = null;
+    try {
+      id = requestIdHolder.get();
+    } catch (ContextNotActiveException e) {
+      // scope not active, return null
+    }
+    LOGGER.trace("capture requestId={}", id);
+    return id;
+  }
+
+  @Override
+  public AutoCloseable restore(@Nullable Object capturedState) {
+    String requestId = (String) capturedState;
+    LOGGER.trace("restore requestId={}", requestId);
+    requestIdHolder.set(requestId);
+
+    if (requestId == null) {
+      return () -> {};
+    }
+
+    String previous = MDC.get(RequestIdFilter.REQUEST_ID_KEY);

Review Comment:
   IIUC, they serve different purposes:
   
   - RequestIdHolder (CDI) -> programmatic access via @Inject RequestIdSupplier
   - MDC -> automatic log correlation — %X{requestId} in the log pattern means 
every log line carries the request ID without callers needing to inject anything
   
   Without MDC on the task thread, task log lines would silently lose the 
originating request ID, which I believe this is important for observability. 
The AutoCloseable cleanup ensures pooled threads don't leak stale IDs to the 
next task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to