This is an automated email from the ASF dual-hosted git repository.
ahuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/causeway.git
The following commit(s) were added to refs/heads/master by this push:
new ccc738d34b CAUSEWAY-2822: Wrapper: async executions must run within
their own exclusive transaction
ccc738d34b is described below
commit ccc738d34b70fe6deb97e363c45d1e9aaa5b606b
Author: andi-huber <[email protected]>
AuthorDate: Mon May 22 11:17:38 2023 +0200
CAUSEWAY-2822: Wrapper: async executions must run within their own
exclusive transaction
---
.../services/wrapper/control/AsyncControl.java | 17 ++++----
.../runtimeservices/task/TaskQueueService.java | 51 ++++++++++++++++++++++
.../wrapper/WrapperFactoryDefault.java | 29 +++++++++++-
3 files changed, 86 insertions(+), 11 deletions(-)
diff --git
a/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
b/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
index fc4e3917c3..e4d12f972f 100644
---
a/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
+++
b/api/applib/src/main/java/org/apache/causeway/applib/services/wrapper/control/AsyncControl.java
@@ -23,13 +23,15 @@ import java.util.Locale;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.springframework.lang.Nullable;
+
import org.apache.causeway.applib.clock.VirtualClock;
import org.apache.causeway.applib.services.user.UserMemento;
+import org.apache.causeway.applib.services.wrapper.WrapperFactory;
import org.apache.causeway.commons.internal.assertions._Assert;
import lombok.Getter;
@@ -111,20 +113,17 @@ public class AsyncControl<R> extends
ControlAbstract<AsyncControl<R>> {
return super.with(exceptionHandler);
}
- @Getter @NonNull
- private ExecutorService executorService =
- ForkJoinPool.commonPool();
+ @Getter @Nullable
+ private ExecutorService executorService = null;
/**
* Specifies the {@link ExecutorService} to use to obtain the thread
* to invoke the action.
- *
* <p>
- * The default executor service is the common pool.
- * </p>
- *
+ * The default is {@code null}, indicating, that its the {@link
WrapperFactory}'s
+ * responsibility to provide a suitable {@link ExecutorService}.
*
- * @param executorService
+ * @param executorService - null-able
*/
public AsyncControl<R> with(final ExecutorService executorService) {
this.executorService = executorService;
diff --git
a/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/task/TaskQueueService.java
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/task/TaskQueueService.java
new file mode 100644
index 0000000000..4cf5c72cd4
--- /dev/null
+++
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/task/TaskQueueService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.causeway.core.runtimeservices.task;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Priority;
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.stereotype.Service;
+
+import org.apache.causeway.applib.annotation.PriorityPrecedence;
+import
org.apache.causeway.core.runtimeservices.CausewayModuleCoreRuntimeServices;
+
+import lombok.Getter;
+import lombok.NonNull;
+
+@Service
+@Named(TaskQueueService.LOGICAL_TYPE_NAME)
+@Priority(PriorityPrecedence.MIDPOINT)
+@Qualifier("Default")
+public class TaskQueueService {
+
+ static final String LOGICAL_TYPE_NAME =
CausewayModuleCoreRuntimeServices.NAMESPACE + ".TaskQueueService";
+
+ @Inject private SimpleAsyncTaskExecutor simpleAsyncTaskExecutor;
+
+ @Getter @NonNull
+ private ExecutorService commonExecutorService =
Executors.newSingleThreadExecutor();
+
+}
diff --git
a/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
index 3fd05b30f3..c325748905 100644
---
a/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
+++
b/core/runtimeservices/src/main/java/org/apache/causeway/core/runtimeservices/wrapper/WrapperFactoryDefault.java
@@ -28,15 +28,16 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import javax.annotation.Priority;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
-import org.apache.causeway.core.runtimeservices.session.InteractionIdGenerator;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@@ -96,6 +97,7 @@ import
org.apache.causeway.core.metamodel.spec.feature.MixedInMember;
import org.apache.causeway.core.metamodel.spec.feature.ObjectAction;
import org.apache.causeway.core.metamodel.spec.feature.OneToOneAssociation;
import
org.apache.causeway.core.runtimeservices.CausewayModuleCoreRuntimeServices;
+import org.apache.causeway.core.runtimeservices.session.InteractionIdGenerator;
import
org.apache.causeway.core.runtimeservices.wrapper.dispatchers.InteractionEventDispatcher;
import
org.apache.causeway.core.runtimeservices.wrapper.dispatchers.InteractionEventDispatcherTypeSafe;
import
org.apache.causeway.core.runtimeservices.wrapper.handlers.DomainObjectInvocationHandler;
@@ -138,9 +140,13 @@ implements WrapperFactory, HasMetaModelContext {
dispatchersByEventClass = new HashMap<>();
private ProxyContextHandler proxyContextHandler;
+ private ExecutorService commonExecutorService;
+
@PostConstruct
public void init() {
+ this.commonExecutorService = newCommonExecutorService();
+
val proxyCreator = new ProxyCreator(proxyFactoryService);
proxyContextHandler = new ProxyContextHandler(proxyCreator);
@@ -163,6 +169,11 @@ implements WrapperFactory, HasMetaModelContext {
putDispatcher(CollectionMethodEvent.class,
InteractionListener::collectionMethodInvoked);
}
+ @PreDestroy
+ public void close() {
+ commonExecutorService.shutdown();
+ }
+
// -- WRAPPING
@Override
@@ -397,7 +408,8 @@ implements WrapperFactory, HasMetaModelContext {
asyncControl.setMethod(method);
asyncControl.setBookmark(Bookmark.forOidDto(oidDto));
- val executorService = asyncControl.getExecutorService();
+ val executorService =
Optional.ofNullable(asyncControl.getExecutorService())
+ .orElse(commonExecutorService);
val asyncTask = getServiceInjector().injectServicesInto(new
AsyncTask<R>(
asyncInteractionContext,
Propagation.REQUIRES_NEW,
@@ -578,6 +590,8 @@ implements WrapperFactory, HasMetaModelContext {
@RequiredArgsConstructor
private static class AsyncTask<R> implements AsyncCallable<R> {
+ private static final long serialVersionUID = 1L;
+
@Getter private final InteractionContext interactionContext;
@Getter private final Propagation propagation;
@Getter private final CommandDto commandDto;
@@ -666,4 +680,15 @@ implements WrapperFactory, HasMetaModelContext {
}).orElse(null));
}
+ private final static int MIN_POOL_SIZE = 2; // at least 2
+ private final static int MAX_POOL_SIZE = 4; // max 4
+ private ExecutorService newCommonExecutorService() {
+ final int poolSize = Math.min(
+ MAX_POOL_SIZE,
+ Math.max(
+ MIN_POOL_SIZE,
+ Runtime.getRuntime().availableProcessors()));
+ return Executors.newFixedThreadPool(poolSize);
+ }
+
}