adutra commented on code in PR #1000:
URL: https://github.com/apache/polaris/pull/1000#discussion_r1965155522
##########
service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java:
##########
@@ -91,39 +80,45 @@ public void addTaskHandler(TaskHandler taskHandler) {
}
/**
- * Register a {@link RealmContext} for a specific task id. That task will be
loaded and executed
- * asynchronously with a copy of the provided {@link RealmContext} (because
the realm context is a
- * request-scoped component).
+ * Register a {@link CallContext} for a specific task id. That task will be
loaded and executed
+ * asynchronously with a clone of the provided {@link CallContext}.
*/
@Override
- public void addTaskHandlerContext(long taskEntityId, RealmContext
realmContext) {
- tryHandleTask(taskEntityId, RealmContext.copyOf(realmContext), null, 1);
+ public void addTaskHandlerContext(long taskEntityId, CallContext
callContext) {
+ // Unfortunately CallContext is a request-scoped bean and must be cloned
now,
+ // because its usage inside the TaskExecutor thread pool will outlive its
+ // lifespan, so the original CallContext will eventually be closed while
+ // the task is still running.
+ // Note: PolarisCallContext has request-scoped beans as well, and must be
cloned.
+ // FIXME replace with context propagation?
+ CallContext clone = CallContext.copyOf(callContext);
+ tryHandleTask(taskEntityId, clone, null, 1);
}
private @Nonnull CompletableFuture<Void> tryHandleTask(
- long taskEntityId, RealmContext realmContext, Throwable e, int attempt) {
+ long taskEntityId, CallContext callContext, Throwable e, int attempt) {
if (attempt > 3) {
return CompletableFuture.failedFuture(e);
}
return CompletableFuture.runAsync(
- () -> handleTask(taskEntityId, realmContext, attempt), executor)
+ () -> handleTask(taskEntityId, callContext, attempt), executor)
.exceptionallyComposeAsync(
(t) -> {
LOGGER.warn("Failed to handle task entity id {}", taskEntityId,
t);
- return tryHandleTask(taskEntityId, realmContext, t, attempt + 1);
+ return tryHandleTask(taskEntityId, callContext, t, attempt + 1);
},
CompletableFuture.delayedExecutor(
TASK_RETRY_DELAY * (long) attempt, TimeUnit.MILLISECONDS,
executor));
}
- protected void handleTask(long taskEntityId, RealmContext realmContext, int
attempt) {
+ protected void handleTask(long taskEntityId, CallContext ctx, int attempt) {
+ // set the call context INSIDE the async task
+ CallContext.setCurrentContext(ctx);
Review Comment:
Yes that was done before:
https://github.com/apache/polaris/blob/4409252d135f62e9f9ae09d943c0c0f720c03efa/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java#L110-L112
(We weren't clearing the context in a finally block though – but I don't
think it's necessary since the thread local is being set at the very beginning
of every task execution. It wouldn't hurt to add it though.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]