This is an automated email from the ASF dual-hosted git repository.

ilgrosso pushed a commit to branch 4_0_X
in repository https://gitbox.apache.org/repos/asf/syncope.git


The following commit(s) were added to refs/heads/4_0_X by this push:
     new 72ec415b1c VirtualThreadPoolTaskExecutor improvements
72ec415b1c is described below

commit 72ec415b1c38fb46a91e456d60ed9446930050bf
Author: Francesco Chicchiriccò <ilgro...@apache.org>
AuthorDate: Thu Jul 3 15:47:32 2025 +0200

    VirtualThreadPoolTaskExecutor improvements
---
 .../core/rest/cxf/IdRepoRESTCXFContext.java        |  5 +-
 .../core/rest/cxf/service/SyncopeServiceImpl.java  |  6 +-
 .../provisioning/java/ProvisioningContext.java     |  8 +--
 .../provisioning/java/job/MacroJobDelegate.java    |  4 +-
 .../PriorityPropagationTaskExecutor.java           |  6 +-
 .../spring/task/VirtualThreadPoolTaskExecutor.java | 66 ++++++++++++++++++++--
 6 files changed, 75 insertions(+), 20 deletions(-)

diff --git 
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
 
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
index 530becbf10..231a64f2a4 100644
--- 
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
+++ 
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/IdRepoRESTCXFContext.java
@@ -136,6 +136,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.PropertySource;
 import org.springframework.core.env.Environment;
+import org.springframework.core.task.AsyncTaskExecutor;
 
 @PropertySource("classpath:errorMessages.properties")
 @EnableConfigurationProperties(RESTProperties.class)
@@ -145,7 +146,7 @@ public class IdRepoRESTCXFContext {
     private static final Logger LOG = 
LoggerFactory.getLogger(IdRepoRESTCXFContext.class);
 
     @Bean
-    public VirtualThreadPoolTaskExecutor batchExecutor(final RESTProperties 
props) {
+    public AsyncTaskExecutor batchExecutor(final RESTProperties props) {
         VirtualThreadPoolTaskExecutor executor = new 
VirtualThreadPoolTaskExecutor();
         executor.setPoolSize(props.getBatchExecutor().getPoolSize());
         
executor.setAwaitTerminationSeconds(props.getBatchExecutor().getAwaitTerminationSeconds());
@@ -463,7 +464,7 @@ public class IdRepoRESTCXFContext {
             final Bus bus,
             final SyncopeLogic syncopeLogic,
             @Qualifier("batchExecutor")
-            final VirtualThreadPoolTaskExecutor batchExecutor,
+            final AsyncTaskExecutor batchExecutor,
             final BatchDAO batchDAO,
             final EntityFactory entityFactory) {
 
diff --git 
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
 
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
index 1a3d7ff551..785c0a10fd 100644
--- 
a/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
+++ 
b/core/idrepo/rest-cxf/src/main/java/org/apache/syncope/core/rest/cxf/service/SyncopeServiceImpl.java
@@ -51,7 +51,7 @@ import org.apache.syncope.core.persistence.api.entity.Batch;
 import org.apache.syncope.core.persistence.api.entity.EntityFactory;
 import org.apache.syncope.core.rest.cxf.batch.BatchProcess;
 import org.apache.syncope.core.spring.security.AuthContextUtils;
-import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor;
+import org.springframework.core.task.AsyncTaskExecutor;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.PageRequest;
 import org.springframework.data.domain.Sort;
@@ -61,7 +61,7 @@ public class SyncopeServiceImpl extends AbstractService 
implements SyncopeServic
 
     protected final SyncopeLogic logic;
 
-    protected final VirtualThreadPoolTaskExecutor batchExecutor;
+    protected final AsyncTaskExecutor batchExecutor;
 
     protected final Bus bus;
 
@@ -71,7 +71,7 @@ public class SyncopeServiceImpl extends AbstractService 
implements SyncopeServic
 
     public SyncopeServiceImpl(
             final SyncopeLogic logic,
-            final VirtualThreadPoolTaskExecutor batchExecutor,
+            final AsyncTaskExecutor batchExecutor,
             final Bus bus,
             final BatchDAO batchDAO,
             final EntityFactory entityFactory) {
diff --git 
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
 
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
index 47b0a67cd8..88a3d2fcad 100644
--- 
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
+++ 
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java
@@ -195,7 +195,7 @@ public class ProvisioningContext {
      */
     @Bean
     @Primary
-    public VirtualThreadPoolTaskExecutor asyncConnectorFacadeExecutor(final 
ProvisioningProperties props) {
+    public AsyncTaskExecutor asyncConnectorFacadeExecutor(final 
ProvisioningProperties props) {
         VirtualThreadPoolTaskExecutor executor = new 
VirtualThreadPoolTaskExecutor();
         
executor.setPoolSize(props.getAsyncConnectorFacadeExecutor().getPoolSize());
         
executor.setAwaitTerminationSeconds(props.getAsyncConnectorFacadeExecutor().getAwaitTerminationSeconds());
@@ -208,7 +208,7 @@ public class ProvisioningContext {
     @Bean
     public AsyncConfigurer asyncConfigurer(
             @Qualifier("asyncConnectorFacadeExecutor")
-            final VirtualThreadPoolTaskExecutor asyncConnectorFacadeExecutor) {
+            final AsyncTaskExecutor asyncConnectorFacadeExecutor) {
 
         return new AsyncConfigurer() {
 
@@ -226,7 +226,7 @@ public class ProvisioningContext {
      * @return executor thread pool task executor
      */
     @Bean
-    public VirtualThreadPoolTaskExecutor 
propagationTaskExecutorAsyncExecutor(final ProvisioningProperties props) {
+    public AsyncTaskExecutor propagationTaskExecutorAsyncExecutor(final 
ProvisioningProperties props) {
         VirtualThreadPoolTaskExecutor executor = new 
VirtualThreadPoolTaskExecutor();
         
executor.setPoolSize(props.getPropagationTaskExecutorAsyncExecutor().getPoolSize());
         executor.setWaitForTasksToCompleteOnShutdown(true);
@@ -459,7 +459,7 @@ public class ProvisioningContext {
     @Bean
     public PropagationTaskExecutor propagationTaskExecutor(
             @Qualifier("propagationTaskExecutorAsyncExecutor")
-            final VirtualThreadPoolTaskExecutor 
propagationTaskExecutorAsyncExecutor,
+            final AsyncTaskExecutor propagationTaskExecutorAsyncExecutor,
             final TaskUtilsFactory taskUtilsFactory,
             final AnyUtilsFactory anyUtilsFactory,
             final ConnectorManager connectorManager,
diff --git 
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
 
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
index a2fd5aa0df..e0ece47ee7 100644
--- 
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
+++ 
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/MacroJobDelegate.java
@@ -57,9 +57,9 @@ import org.apache.syncope.core.provisioning.api.macro.Command;
 import org.apache.syncope.core.provisioning.api.macro.MacroActions;
 import org.apache.syncope.core.provisioning.api.serialization.POJOHelper;
 import org.apache.syncope.core.spring.implementation.ImplementationManager;
-import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor;
 import org.springframework.aop.support.AopUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.task.AsyncTaskExecutor;
 import 
org.springframework.security.concurrent.DelegatingSecurityContextCallable;
 import org.springframework.util.ReflectionUtils;
 
@@ -74,7 +74,7 @@ public class MacroJobDelegate extends 
AbstractSchedTaskJobDelegate<MacroTask> {
     protected Validator validator;
 
     @Resource(name = "batchExecutor")
-    protected VirtualThreadPoolTaskExecutor taskExecutor;
+    protected AsyncTaskExecutor taskExecutor;
 
     protected final Map<String, MacroActions> perContextActions = new 
ConcurrentHashMap<>();
 
diff --git 
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
 
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
index f4bfc86950..94ea5cdbcf 100644
--- 
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
+++ 
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java
@@ -49,8 +49,8 @@ import 
org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo;
 import org.apache.syncope.core.provisioning.java.pushpull.OutboundMatcher;
 import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils;
 import org.apache.syncope.core.spring.security.AuthContextUtils;
-import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor;
 import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.core.task.AsyncTaskExecutor;
 import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
 
@@ -63,7 +63,7 @@ import 
org.springframework.security.core.context.SecurityContextHolder;
  */
 public class PriorityPropagationTaskExecutor extends 
AbstractPropagationTaskExecutor {
 
-    protected final VirtualThreadPoolTaskExecutor taskExecutor;
+    protected final AsyncTaskExecutor taskExecutor;
 
     public PriorityPropagationTaskExecutor(
             final ConnectorManager connectorManager,
@@ -79,7 +79,7 @@ public class PriorityPropagationTaskExecutor extends 
AbstractPropagationTaskExec
             final OutboundMatcher outboundMatcher,
             final PlainAttrValidationManager validator,
             final ApplicationEventPublisher publisher,
-            final VirtualThreadPoolTaskExecutor taskExecutor) {
+            final AsyncTaskExecutor taskExecutor) {
 
         super(connectorManager,
                 connObjectUtils,
diff --git 
a/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
 
b/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
index 6bcf57d0a8..fea02c8f2b 100644
--- 
a/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
+++ 
b/core/spring/src/main/java/org/apache/syncope/core/spring/task/VirtualThreadPoolTaskExecutor.java
@@ -18,16 +18,18 @@
  */
 package org.apache.syncope.core.spring.task;
 
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import org.springframework.core.task.AsyncTaskExecutor;
 import org.springframework.core.task.SimpleAsyncTaskExecutor;
 import org.springframework.core.task.TaskDecorator;
-import org.springframework.core.task.support.ExecutorServiceAdapter;
 import org.springframework.scheduling.SchedulingTaskExecutor;
 import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
 
@@ -39,6 +41,8 @@ public class VirtualThreadPoolTaskExecutor
 
     private int poolSize = -1;
 
+    private long taskTerminationTimeout;
+
     private TaskDecorator taskDecorator;
 
     private SimpleAsyncTaskExecutor executor;
@@ -53,10 +57,24 @@ public class VirtualThreadPoolTaskExecutor
     }
 
     /**
-     * @return the maximum number of managed threads
+     * Specify a timeout (in milliseconds) for task termination when closing
+     * this executor. The default is 0, not waiting for task termination at 
all.
+     * <p>
+     * Note that a concrete >0 timeout specified here will lead to the
+     * wrapping of every submitted task into a task-tracking runnable which
+     * involves considerable overhead in case of a high number of tasks.
+     * However, for a modest level of submissions with longer-running
+     * tasks, this is feasible in order to arrive at a graceful shutdown.
+     * <p>
+     * Note that {@code SimpleAsyncTaskExecutor} does not participate in
+     * a coordinated lifecycle stop but rather just awaits task termination
+     * on {@link #close()}.
+     *
+     * @param taskTerminationTimeout the timeout in milliseconds
+     * @see SimpleAsyncTaskExecutor#close
      */
-    public int getPoolSize() {
-        return poolSize;
+    public void setTaskTerminationTimeout(final long taskTerminationTimeout) {
+        this.taskTerminationTimeout = taskTerminationTimeout;
     }
 
     /**
@@ -94,10 +112,46 @@ public class VirtualThreadPoolTaskExecutor
 
         executor = new SimpleAsyncTaskExecutor(getThreadNamePrefix());
         executor.setVirtualThreads(true);
+        executor.setDaemon(true);
         executor.setConcurrencyLimit(poolSize);
+        if (taskTerminationTimeout >= 0) {
+            executor.setTaskTerminationTimeout(taskTerminationTimeout);
+        }
         
Optional.ofNullable(taskDecorator).ifPresent(executor::setTaskDecorator);
 
-        return new ExecutorServiceAdapter(executor);
+        return new AbstractExecutorService() {
+
+            @Override
+            public void execute(final Runnable task) {
+                executor.execute(task);
+            }
+
+            @Override
+            public void shutdown() {
+                executor.close();
+            }
+
+            @Override
+            public List<Runnable> shutdownNow() {
+                executor.close();
+                return List.of();
+            }
+
+            @Override
+            public boolean isShutdown() {
+                return !executor.isActive();
+            }
+
+            @Override
+            public boolean isTerminated() {
+                return !executor.isActive();
+            }
+
+            @Override
+            public boolean awaitTermination(final long timeout, final TimeUnit 
unit) throws InterruptedException {
+                return !executor.isActive();
+            }
+        };
     }
 
     @Override
@@ -117,6 +171,6 @@ public class VirtualThreadPoolTaskExecutor
 
     @Override
     public void shutdown() {
-        // manual shutdown is not supported
+        executor.close();
     }
 }

Reply via email to