This is an automated email from the ASF dual-hosted git repository.

ahuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/causeway.git


The following commit(s) were added to refs/heads/master by this push:
     new ccc738d34b CAUSEWAY-2822: Wrapper: async executions must run within 
their own exclusive transaction
ccc738d34b is described below

commit ccc738d34b70fe6deb97e363c45d1e9aaa5b606b
Author: andi-huber <[email protected]>
AuthorDate: Mon May 22 11:17:38 2023 +0200

    CAUSEWAY-2822: Wrapper: async executions must run within their own
    exclusive transaction
---
 .../services/wrapper/control/AsyncControl.java     | 17 ++++----
 .../runtimeservices/task/TaskQueueService.java     | 51 ++++++++++++++++++++++
 .../wrapper/WrapperFactoryDefault.java             | 29 +++++++++++-
 3 files changed, 86 insertions(+), 11 deletions(-)

diff --git 
a/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
 
b/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
index fc4e3917c3..e4d12f972f 100644
--- 
a/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
+++ 
b/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
@@ -23,13 +23,15 @@ import java.util.Locale;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.springframework.lang.Nullable;
+
 import org.apache.causeway.applib.clock.VirtualClock;
 import org.apache.causeway.applib.services.user.UserMemento;
+import org.apache.causeway.applib.services.wrapper.WrapperFactory;
 import org.apache.causeway.commons.internal.assertions._Assert;
 
 import lombok.Getter;
@@ -111,20 +113,17 @@ public class AsyncControl<R> extends 
ControlAbstract<AsyncControl<R>> {
         return super.with(exceptionHandler);
     }
 
-    @Getter @NonNull
-    private ExecutorService executorService =
-                            ForkJoinPool.commonPool();
+    @Getter @Nullable
+    private ExecutorService executorService = null;
 
     /**
      * Specifies the {@link ExecutorService} to use to obtain the thread
      * to invoke the action.
-     *
      * <p>
-     * The default executor service is the common pool.
-     * </p>
-     *
+     * The default is {@code null}, indicating, that its the {@link 
WrapperFactory}'s
+     * responsibility to provide a suitable {@link ExecutorService}.
      *
-     * @param executorService
+     * @param executorService - null-able
      */
     public AsyncControl<R> with(final ExecutorService executorService) {
         this.executorService = executorService;
diff --git 
a/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/task/TaskQueueService.java
 
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/task/TaskQueueService.java
new file mode 100644
index 0000000000..4cf5c72cd4
--- /dev/null
+++ 
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/task/TaskQueueService.java
@@ -0,0 +1,51 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.causeway.core.runtimeservices.task;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Priority;
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.stereotype.Service;
+
+import org.apache.causeway.applib.annotation.PriorityPrecedence;
+import 
org.apache.causeway.core.runtimeservices.CausewayModuleCoreRuntimeServices;
+
+import lombok.Getter;
+import lombok.NonNull;
+
+@Service
+@Named(TaskQueueService.LOGICAL_TYPE_NAME)
+@Priority(PriorityPrecedence.MIDPOINT)
+@Qualifier("Default")
+public class TaskQueueService {
+
+    static final String LOGICAL_TYPE_NAME = 
CausewayModuleCoreRuntimeServices.NAMESPACE + ".TaskQueueService";
+
+    @Inject private SimpleAsyncTaskExecutor simpleAsyncTaskExecutor;
+
+    @Getter @NonNull
+    private ExecutorService commonExecutorService = 
Executors.newSingleThreadExecutor();
+
+}
diff --git 
a/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
 
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
index 3fd05b30f3..c325748905 100644
--- 
a/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
+++ 
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
@@ -28,15 +28,16 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.BiConsumer;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import javax.annotation.Priority;
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Provider;
 
-import org.apache.causeway.core.runtimeservices.session.InteractionIdGenerator;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
@@ -96,6 +97,7 @@ import 
org.apache.causeway.core.metamodel.spec.feature.MixedInMember;
 import org.apache.causeway.core.metamodel.spec.feature.ObjectAction;
 import org.apache.causeway.core.metamodel.spec.feature.OneToOneAssociation;
 import 
org.apache.causeway.core.runtimeservices.CausewayModuleCoreRuntimeServices;
+import org.apache.causeway.core.runtimeservices.session.InteractionIdGenerator;
 import 
org.apache.causeway.core.runtimeservices.wrapper.dispatchers.InteractionEventDispatcher;
 import 
org.apache.causeway.core.runtimeservices.wrapper.dispatchers.InteractionEventDispatcherTypeSafe;
 import 
org.apache.causeway.core.runtimeservices.wrapper.handlers.DomainObjectInvocationHandler;
@@ -138,9 +140,13 @@ implements WrapperFactory, HasMetaModelContext {
         dispatchersByEventClass = new HashMap<>();
     private ProxyContextHandler proxyContextHandler;
 
+    private ExecutorService commonExecutorService;
+
     @PostConstruct
     public void init() {
 
+        this.commonExecutorService = newCommonExecutorService();
+
         val proxyCreator = new ProxyCreator(proxyFactoryService);
         proxyContextHandler = new ProxyContextHandler(proxyCreator);
 
@@ -163,6 +169,11 @@ implements WrapperFactory, HasMetaModelContext {
         putDispatcher(CollectionMethodEvent.class, 
InteractionListener::collectionMethodInvoked);
     }
 
+    @PreDestroy
+    public void close() {
+        commonExecutorService.shutdown();
+    }
+
     // -- WRAPPING
 
     @Override
@@ -397,7 +408,8 @@ implements WrapperFactory, HasMetaModelContext {
         asyncControl.setMethod(method);
         asyncControl.setBookmark(Bookmark.forOidDto(oidDto));
 
-        val executorService = asyncControl.getExecutorService();
+        val executorService = 
Optional.ofNullable(asyncControl.getExecutorService())
+                .orElse(commonExecutorService);
         val asyncTask = getServiceInjector().injectServicesInto(new 
AsyncTask<R>(
             asyncInteractionContext,
             Propagation.REQUIRES_NEW,
@@ -578,6 +590,8 @@ implements WrapperFactory, HasMetaModelContext {
     @RequiredArgsConstructor
     private static class AsyncTask<R> implements AsyncCallable<R> {
 
+        private static final long serialVersionUID = 1L;
+
         @Getter private final InteractionContext interactionContext;
         @Getter private final Propagation propagation;
         @Getter private final CommandDto commandDto;
@@ -666,4 +680,15 @@ implements WrapperFactory, HasMetaModelContext {
                     }).orElse(null));
     }
 
+    private final static int MIN_POOL_SIZE = 2; // at least 2
+    private final static int MAX_POOL_SIZE = 4; // max 4
+    private ExecutorService newCommonExecutorService() {
+        final int poolSize = Math.min(
+                MAX_POOL_SIZE,
+                Math.max(
+                        MIN_POOL_SIZE,
+                        Runtime.getRuntime().availableProcessors()));
+        return Executors.newFixedThreadPool(poolSize);
+    }
+
 }

Reply via email to