eric-maynard commented on code in PR #1000:
URL: https://github.com/apache/polaris/pull/1000#discussion_r1965977740


##########
service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java:
##########
@@ -91,39 +80,45 @@ public void addTaskHandler(TaskHandler taskHandler) {
   }
 
   /**
-   * Register a {@link RealmContext} for a specific task id. That task will be 
loaded and executed
-   * asynchronously with a copy of the provided {@link RealmContext} (because 
the realm context is a
-   * request-scoped component).
+   * Register a {@link CallContext} for a specific task id. That task will be 
loaded and executed
+   * asynchronously with a clone of the provided {@link CallContext}.
    */
   @Override
-  public void addTaskHandlerContext(long taskEntityId, RealmContext 
realmContext) {
-    tryHandleTask(taskEntityId, RealmContext.copyOf(realmContext), null, 1);
+  public void addTaskHandlerContext(long taskEntityId, CallContext 
callContext) {
+    // Unfortunately CallContext is a request-scoped bean and must be cloned 
now,
+    // because its usage inside the TaskExecutor thread pool will outlive its
+    // lifespan, so the original CallContext will eventually be closed while
+    // the task is still running.
+    // Note: PolarisCallContext has request-scoped beans as well, and must be 
cloned.
+    // FIXME replace with context propagation?
+    CallContext clone = CallContext.copyOf(callContext);
+    tryHandleTask(taskEntityId, clone, null, 1);
   }
 
   private @Nonnull CompletableFuture<Void> tryHandleTask(
-      long taskEntityId, RealmContext realmContext, Throwable e, int attempt) {
+      long taskEntityId, CallContext callContext, Throwable e, int attempt) {
     if (attempt > 3) {
       return CompletableFuture.failedFuture(e);
     }
     return CompletableFuture.runAsync(
-            () -> handleTask(taskEntityId, realmContext, attempt), executor)
+            () -> handleTask(taskEntityId, callContext, attempt), executor)
         .exceptionallyComposeAsync(
             (t) -> {
               LOGGER.warn("Failed to handle task entity id {}", taskEntityId, 
t);
-              return tryHandleTask(taskEntityId, realmContext, t, attempt + 1);
+              return tryHandleTask(taskEntityId, callContext, t, attempt + 1);
             },
             CompletableFuture.delayedExecutor(
                 TASK_RETRY_DELAY * (long) attempt, TimeUnit.MILLISECONDS, 
executor));
   }
 
-  protected void handleTask(long taskEntityId, RealmContext realmContext, int 
attempt) {
+  protected void handleTask(long taskEntityId, CallContext ctx, int attempt) {
+    // set the call context INSIDE the async task
+    CallContext.setCurrentContext(ctx);

Review Comment:
   Changed per this comment -- good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to