This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new edd358df8a [AMQ-9394] Tech Preview: Virtual Thread support
edd358df8a is described below
commit edd358df8ae00db7cd0984eaf900767512301787
Author: Matt Pavlovich <[email protected]>
AuthorDate: Tue Mar 5 14:51:57 2024 -0600
[AMQ-9394] Tech Preview: Virtual Thread support
---
.../org/apache/activemq/broker/BrokerService.java | 16 +-
.../org/apache/activemq/broker/jmx/BrokerView.java | 10 +
.../activemq/broker/jmx/BrokerViewMBean.java | 7 +
activemq-client-jdk21-test/pom.xml | 98 +++++++++
.../broker/VirtualThreadTaskRunnerBrokerTest.java | 37 ++++
activemq-client-jdk21/pom.xml | 237 +++++++++++++++++++++
.../activemq/thread/VirtualThreadExecutor.java | 80 +++++++
.../activemq/thread/VirtualThreadTaskRunner.java | 176 +++++++++++++++
.../org/apache/activemq/ActiveMQConnection.java | 15 +-
.../apache/activemq/ActiveMQConnectionFactory.java | 10 +
.../apache/activemq/annotation/Experimental.java | 46 ++++
.../apache/activemq/thread/TaskRunnerFactory.java | 76 ++++++-
.../org/apache/activemq/thread/TaskRunnerTest.java | 2 -
assembly/pom.xml | 4 +
assembly/src/main/descriptors/common-bin.xml | 10 +
pom.xml | 48 +++++
16 files changed, 865 insertions(+), 7 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 510b55f89c..1eeb6fb8b4 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -50,6 +50,7 @@ import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.annotation.Experimental;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@@ -223,6 +224,7 @@ public class BrokerService implements Service {
private boolean monitorConnectionSplits = false;
private int taskRunnerPriority = Thread.NORM_PRIORITY;
private boolean dedicatedTaskRunner;
+ private boolean virtualThreadTaskRunner;
private boolean cacheTempDestinations = false;// useful for failover
private int timeBeforePurgeTempDestinations = 5000;
private final List<Runnable> shutdownHooks = new ArrayList<>();
@@ -1269,7 +1271,7 @@ public class BrokerService implements Service {
public TaskRunnerFactory getTaskRunnerFactory() {
if (this.taskRunnerFactory == null) {
this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ
BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
- isDedicatedTaskRunner());
+ isDedicatedTaskRunner(), isVirtualThreadTaskRunner());
this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
}
return this.taskRunnerFactory;
@@ -1280,9 +1282,10 @@ public class BrokerService implements Service {
}
public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
+ // [AMQ-9394] TODO: Should we have a separate config flag for
virtualThread for persistence task runner?
if (taskRunnerFactory == null) {
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence
Adaptor Task", persistenceThreadPriority,
- true, 1000, isDedicatedTaskRunner());
+ true, 1000, isDedicatedTaskRunner(),
isVirtualThreadTaskRunner());
}
return persistenceTaskRunnerFactory;
}
@@ -1891,6 +1894,15 @@ public class BrokerService implements Service {
this.dedicatedTaskRunner = dedicatedTaskRunner;
}
+ public boolean isVirtualThreadTaskRunner() {
+ return virtualThreadTaskRunner;
+ }
+
+ @Experimental("Tech Preview for Virtaul Thread support")
+ public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) {
+ this.virtualThreadTaskRunner = virtualThreadTaskRunner;
+ }
+
public boolean isCacheTempDestinations() {
return cacheTempDestinations;
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
index f5c8468297..4f0ab2c7a6 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
@@ -547,6 +547,16 @@ public class BrokerView implements BrokerViewMBean {
return brokerService.isSlave();
}
+ @Override
+ public boolean isDedicatedTaskRunner() {
+ return brokerService.isDedicatedTaskRunner();
+ }
+
+ @Override
+ public boolean isVirtualThreadTaskRunner() {
+ return brokerService.isVirtualThreadTaskRunner();
+ }
+
private ManagedRegionBroker safeGetBroker() {
if (broker == null) {
throw new IllegalStateException("Broker is not yet started.");
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
index 8e6ae70efe..98df113f69 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
@@ -353,4 +353,11 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo(value="The total number of times that the max number of
uncommitted count has been exceeded across all destinations")
long getTotalMaxUncommittedExceededCount();
+
+ @MBeanInfo("Dedicated Task Runner enabled.")
+ boolean isDedicatedTaskRunner();
+
+ @MBeanInfo("Virtual Thread Task Runner enabled.")
+ boolean isVirtualThreadTaskRunner();
+
}
diff --git a/activemq-client-jdk21-test/pom.xml
b/activemq-client-jdk21-test/pom.xml
new file mode 100644
index 0000000000..5d7247064d
--- /dev/null
+++ b/activemq-client-jdk21-test/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>6.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>activemq-client-jdk21-test</artifactId>
+ <packaging>jar</packaging>
+ <name>ActiveMQ :: Client JDK 21 Test</name>
+ <description>Test module for activemq-client-jdk21 with tech preview
support for Virtual Threads</description>
+ <properties>
+ <maven.compiler.release>21</maven.compiler.release>
+ <maven.compiler.source>21</maven.compiler.source>
+ <maven.compiler.target>21</maven.compiler.target>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client-jdk21</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-unit-tests</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>jdk-17-skip</id>
+ <activation>
+ <jdk>[17,21)</jdk>
+ </activation>
+ <properties>
+ <!-- maven.bundle.skip>true</maven.bundle.skip -->
+ <maven.main.skip>true</maven.main.skip>
+ <maven.test.skip>true</maven.test.skip>
+ </properties>
+ </profile>
+ <profile>
+ <id>jdk-21-plus</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ </profile>
+ </profiles>
+</project>
diff --git
a/activemq-client-jdk21-test/src/test/java/org/apache/activemq/broker/VirtualThreadTaskRunnerBrokerTest.java
b/activemq-client-jdk21-test/src/test/java/org/apache/activemq/broker/VirtualThreadTaskRunnerBrokerTest.java
new file mode 100644
index 0000000000..88d67158ae
--- /dev/null
+++
b/activemq-client-jdk21-test/src/test/java/org/apache/activemq/broker/VirtualThreadTaskRunnerBrokerTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+
+public class VirtualThreadTaskRunnerBrokerTest extends BrokerTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+ broker.setVirtualThreadTaskRunner(true);
+ return broker;
+ }
+
+ public static Test suite() {
+ return suite(VirtualThreadTaskRunnerBrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+}
diff --git a/activemq-client-jdk21/pom.xml b/activemq-client-jdk21/pom.xml
new file mode 100644
index 0000000000..5474099455
--- /dev/null
+++ b/activemq-client-jdk21/pom.xml
@@ -0,0 +1,237 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>6.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>activemq-client-jdk21</artifactId>
+ <packaging>bundle</packaging>
+ <name>ActiveMQ :: Client JDK 21</name>
+ <description>ActiveMQ Client implementation compiled with JDK 21 and tech
preview support for Virtual Threads</description>
+ <properties>
+ <maven.compiler.release>21</maven.compiler.release>
+ <maven.compiler.source>21</maven.compiler.source>
+ <maven.compiler.target>21</maven.compiler.target>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>jakarta.jms</groupId>
+ <artifactId>jakarta.jms-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.jms</groupId>
+ <artifactId>jakarta.jms-api</artifactId>
+ <version>${jakarta-jms-api-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf</artifactId>
+ <version>${hawtbuf-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jmdns</groupId>
+ <artifactId>jmdns</artifactId>
+ <optional>true</optional>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.thoughtworks.xstream</groupId>
+ <artifactId>xstream</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-unit-tests</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>jdk-17-skip</id>
+ <activation>
+ <jdk>[17,21)</jdk>
+ </activation>
+ <properties>
+ <!-- maven.bundle.skip>true</maven.bundle.skip -->
+ <maven.main.skip>true</maven.main.skip>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>jdk-21-plus</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>unpack-source</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ <classifier>sources</classifier>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/copied-sources/activemq-client</outputDirectory>
+
<excludes>**/META-INF/*,**/META-INF/maven/**,**/zeroconf/**</excludes>
+ <includes>**/**</includes>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-java-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
+ <resources>
+ <resource>
+
<directory>${project.build.directory}/copied-sources/activemq-client</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-resources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+
<outputDirectory>${project.build.directory}/generated-resources/META-INF</outputDirectory>
+ <resources>
+ <resource>
+
<directory>${project.build.directory}/copied-sources/activemq-client/META-INF</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+
<source>${project.build.directory}/generated-sources</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-resource</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>add-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.build.directory}/generated-resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <inherited>true</inherited>
+ <configuration>
+ <instructions>
+ <Import-Package>
+ !java.*,
+ !com.google.errorprone.annotations,
+ !com.google.errorprone.annotations.concurrent,
+ com.thoughtworks.xstream.*;resolution:="optional",
+ *
+ </Import-Package>
+ <Private-Package>
+ com.google.errorprone.annotations,
+ com.google.errorprone.annotations.concurrent
+ </Private-Package>
+ <_noee>true</_noee>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git
a/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadExecutor.java
b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadExecutor.java
new file mode 100644
index 0000000000..783e198f4d
--- /dev/null
+++
b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadExecutor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.thread;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.annotation.Experimental;
+import org.slf4j.Logger;
+
+/**
+ * [AMQ-9394] Virtual Thread support
+ *
+ * JDK 21 introduces experimental virtual thread support which allow detaching
of OS threads
+ * from JVM threads. This allows the Java VM to continue working on other
tasks while waiting
+ * for OS operations (specifically I/O) to complete.
+ *
+ * JDK 25 provides improvements to Virtual Threads to prevent thread pinning
in code blocks
+ * that use synchronized.
+ *
+ * ActiveMQ support for Virtual Threads is currently experimental and
profiling of various
+ * scenarios and end-user feedback is needed to identify hotspots in order to
realize
+ * the full performance benefit.
+ *
+ * Additionally, usage to ThreadLocal needs to be removed/refactored to use
ScopedValue
+ * or another approach altogether. ActiveMQ has only a few ThreadLocal usages,
but a key
+ * area is SSLContext support. JIRA [AMQ-9753] will SSL Context ThreadLocal
usage.
+ *
+ * Status:
+ * v6.2.0 - Experimental Virtual Thread support introduced.
+ *
+ */
+@Experimental("Tech Preview for Virtual Thread support")
+public class VirtualThreadExecutor {
+
+ private VirtualThreadExecutor() {}
+
+ public static ExecutorService createVirtualThreadExecutorService(final
String name, final AtomicLong id, final Logger LOG) {
+
+ // [AMQ-9394] NOTE: Submitted JDK feature enhancement id: 9076243 to
allow AtomicLong thread id param
+ // https://bugs.java.com/bugdatabase/view_bug?bug_id=JDK-8320377
+ Thread.Builder.OfVirtual threadBuilderOfVirtual = Thread.ofVirtual()
+ .name(name)
+ .uncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final
Throwable e) {
+ LOG.error("Error in thread '{}'", t.getName(), e);
+ }
+ });
+
+ // [AMQ-9394] Work around to have global thread id increment across
ThreadFactories
+ ThreadFactory virtualThreadFactory = threadBuilderOfVirtual.factory();
+ ThreadFactory atomicThreadFactory = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread tmpThread = virtualThreadFactory.newThread(r);
+ tmpThread.setName(tmpThread.getName() + id.incrementAndGet());
+ return tmpThread;
+ }
+ };
+
+ return Executors.newThreadPerTaskExecutor(atomicThreadFactory); //
[AMQ-9394] Same as newVirtualThreadPerTaskExecutor
+ }
+}
diff --git
a/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadTaskRunner.java
b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadTaskRunner.java
new file mode 100644
index 0000000000..0b58657e7e
--- /dev/null
+++
b/activemq-client-jdk21/src/main/java/org/apache/activemq/thread/VirtualThreadTaskRunner.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.thread;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class VirtualThreadTaskRunner implements TaskRunner {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(VirtualThreadTaskRunner.class);
+ private final int maxIterationsPerRun;
+ private final Executor executor;
+ private final Task task;
+ private final Lock taskRunnerLock = new ReentrantLock();
+ private final Runnable runable;
+ private final Condition runnableCondition;
+ private boolean queued;
+ private boolean shutdown;
+ private boolean iterating;
+ // [AMQ-9394] TODO: Remove references to the running thread where possible
+ private volatile Thread runningThread;
+
+ public VirtualThreadTaskRunner(Executor executor, final Task task, int
maxIterationsPerRun) {
+ this.executor = executor;
+ this.maxIterationsPerRun = maxIterationsPerRun;
+ this.task = task;
+ runable = new Runnable() {
+ @Override
+ public void run() {
+ runningThread = Thread.currentThread();
+ try {
+ runTask();
+ } finally {
+ LOG.trace("Run task done: {}", task);
+ runningThread = null;
+ }
+ }
+ };
+ this.runnableCondition = taskRunnerLock.newCondition();
+ }
+
+ /**
+ * We Expect MANY wakeup calls on the same TaskRunner.
+ */
+ @Override
+ public void wakeup() throws InterruptedException {
+ taskRunnerLock.lock();
+ try {
+ // When we get in here, we make some assumptions of state:
+ // queued=false, iterating=false: wakeup() has not be called and
+ // therefore task is not executing.
+ // queued=true, iterating=false: wakeup() was called but, task
+ // execution has not started yet
+ // queued=false, iterating=true : wakeup() was called, which caused
+ // task execution to start.
+ // queued=true, iterating=true : wakeup() called after task
+ // execution was started.
+
+ if (queued || shutdown) {
+ return;
+ }
+
+ queued = true;
+
+ // The runTask() method will do this for me once we are done
+ // iterating.
+ if (!iterating) {
+ executor.execute(runable);
+ }
+ } finally {
+ taskRunnerLock.unlock();
+ }
+ }
+
+ /**
+ * shut down the task
+ *
+ * @throws InterruptedException
+ */
+ @Override
+ public void shutdown(long timeout) throws InterruptedException {
+ LOG.trace("Shutdown timeout: {} task: {}", timeout, task);
+ taskRunnerLock.lock();
+ try {
+ shutdown = true;
+ // the check on the thread is done
+ // because a call to iterate can result in
+ // shutDown() being called, which would wait forever
+ // waiting for iterating to finish
+ if (runningThread != Thread.currentThread()) {
+ if (iterating) {
+ runnableCondition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ } finally {
+ taskRunnerLock.unlock();
+ }
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ shutdown(0);
+ }
+
+ final void runTask() {
+
+ taskRunnerLock.lock();
+ try {
+ queued = false;
+ if (shutdown) {
+ iterating = false;
+ runnableCondition.signalAll();
+ return;
+ }
+ iterating = true;
+ } finally {
+ taskRunnerLock.unlock();
+ }
+
+ // Don't synchronize while we are iterating so that
+ // multiple wakeup() calls can be executed concurrently.
+ boolean done = false;
+ try {
+ for (int i = 0; i < maxIterationsPerRun; i++) {
+ LOG.trace("Running task iteration {} - {}", i, task);
+ if (!task.iterate()) {
+ done = true;
+ break;
+ }
+ }
+ } finally {
+ taskRunnerLock.lock();
+ try {
+ iterating = false;
+ runnableCondition.signalAll();
+ if (shutdown) {
+ queued = false;
+ runnableCondition.signalAll();
+ } else {
+ // If we could not iterate all the items
+ // then we need to re-queue.
+ if (!done) {
+ queued = true;
+ }
+
+ if (queued) {
+ executor.execute(runable);
+ }
+ }
+
+ } finally {
+ taskRunnerLock.unlock();
+ }
+ }
+ }
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index a91349b28b..74a25b728a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -199,6 +199,7 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
+ private boolean useVirtualThreadTaskRunner;
protected AtomicInteger transportInterruptionProcessingComplete = new
AtomicInteger(0);
private long consumerFailoverRedeliveryWaitPeriod;
private volatile Scheduler scheduler;
@@ -1068,10 +1069,22 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
+ public boolean isUseVirtualThreadTaskRunner() {
+ return useVirtualThreadTaskRunner;
+ }
+
+ public void setUseVirtualThreadTaskRunner(boolean
useVirtualThreadTaskRunner) {
+ this.useVirtualThreadTaskRunner = useVirtualThreadTaskRunner;
+ }
+
public TaskRunnerFactory getSessionTaskRunner() {
synchronized (this) {
if (sessionTaskRunner == null) {
- sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session
Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000,
isUseDedicatedTaskRunner(), maxThreadPoolSize);
+ if(isUseVirtualThreadTaskRunner()) {
+ sessionTaskRunner = new TaskRunnerFactory("ActiveMQ
Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000,
isUseDedicatedTaskRunner(), isUseVirtualThreadTaskRunner());
+ } else {
+ sessionTaskRunner = new TaskRunnerFactory("ActiveMQ
Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000,
isUseDedicatedTaskRunner(), maxThreadPoolSize);
+ }
sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
}
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 54969f39ce..ae57d2624b 100644
---
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -148,6 +148,7 @@ public class ActiveMQConnectionFactory extends
JNDIBaseStorable implements Conne
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber =
ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private boolean useDedicatedTaskRunner;
+ private boolean useVirtualThreadTaskRunner;
private long consumerFailoverRedeliveryWaitPeriod = 0;
private boolean checkForDuplicates = true;
private ClientInternalExceptionListener clientInternalExceptionListener;
@@ -438,6 +439,7 @@ public class ActiveMQConnectionFactory extends
JNDIBaseStorable implements Conne
connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
+
connection.setUseVirtualThreadTaskRunner(isUseVirtualThreadTaskRunner());
connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
connection.setCheckForDuplicates(isCheckForDuplicates());
connection.setMessagePrioritySupported(isMessagePrioritySupported());
@@ -1148,6 +1150,14 @@ public class ActiveMQConnectionFactory extends
JNDIBaseStorable implements Conne
return useDedicatedTaskRunner;
}
+ public void setUseVirtualThreadTaskRunner(boolean
useVirtualThreadTaskRunner) {
+ this.useVirtualThreadTaskRunner = useVirtualThreadTaskRunner;
+ }
+
+ public boolean isUseVirtualThreadTaskRunner() {
+ return useVirtualThreadTaskRunner;
+ }
+
public void setConsumerFailoverRedeliveryWaitPeriod(long
consumerFailoverRedeliveryWaitPeriod) {
this.consumerFailoverRedeliveryWaitPeriod =
consumerFailoverRedeliveryWaitPeriod;
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/annotation/Experimental.java
b/activemq-client/src/main/java/org/apache/activemq/annotation/Experimental.java
new file mode 100644
index 0000000000..62b6bd429c
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/annotation/Experimental.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The Experimental annotation documents in-progress
+ * ActiveMQ features and communicates preview status
+ * of new features that may change or be removed
+ * between any release,
+ *
+ * @author Matt Pavlovich <[email protected]>
+ * @since 6.2.0
+ */
+@Documented
+@Retention(value=RetentionPolicy.CLASS)
+@Target(value={
+ ElementType.CONSTRUCTOR,
+ ElementType.FIELD,
+ ElementType.LOCAL_VARIABLE,
+ ElementType.METHOD,
+ ElementType.PACKAGE,
+ ElementType.PARAMETER,
+ ElementType.TYPE})
+public @interface Experimental {
+ String value();
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
index 002aec79d4..3c872bef3c 100644
---
a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
+++
b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
@@ -16,8 +16,12 @@
*/
package org.apache.activemq.thread;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -56,6 +60,9 @@ public class TaskRunnerFactory implements Executor {
private RejectedExecutionHandler rejectedTaskHandler = null;
private ClassLoader threadClassLoader;
+ // [AMQ-9394] Virtual Thread support
+ private boolean virtualThreadTaskRunner = false;
+
public TaskRunnerFactory() {
this("ActiveMQ Task");
}
@@ -72,13 +79,22 @@ public class TaskRunnerFactory implements Executor {
this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner,
getDefaultMaximumPoolSize());
}
+ public TaskRunnerFactory(String name, int priority, boolean daemon, int
maxIterationsPerRun, boolean dedicatedTaskRunner, boolean
virtualThreadTaskRunner) {
+ this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner,
getDefaultMaximumPoolSize(), virtualThreadTaskRunner);
+ }
+
public TaskRunnerFactory(String name, int priority, boolean daemon, int
maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
+ this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner,
maxThreadPoolSize, false);
+ }
+
+ public TaskRunnerFactory(String name, int priority, boolean daemon, int
maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize,
boolean virtualThreadTaskRunner) {
this.name = name;
this.priority = priority;
this.daemon = daemon;
this.maxIterationsPerRun = maxIterationsPerRun;
this.dedicatedTaskRunner = dedicatedTaskRunner;
this.maxThreadPoolSize = maxThreadPoolSize;
+ this.virtualThreadTaskRunner = virtualThreadTaskRunner;
}
public void init() {
@@ -90,7 +106,9 @@ public class TaskRunnerFactory implements Executor {
synchronized(this) {
//need to recheck if initDone is true under the lock
if (!initDone.get()) {
- if (dedicatedTaskRunner ||
"true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner")))
{
+ if (virtualThreadTaskRunner ||
"true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseVirtualThreadTaskRunner"))
) {
+ executorRef.compareAndSet(null,
createVirtualThreadExecutor());
+ } else if (dedicatedTaskRunner ||
"true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner")))
{
executorRef.set(null);
} else {
executorRef.compareAndSet(null,
createDefaultExecutor());
@@ -154,7 +172,11 @@ public class TaskRunnerFactory implements Executor {
init();
ExecutorService executor = executorRef.get();
if (executor != null) {
- return new PooledTaskRunner(executor, task, maxIterationsPerRun);
+ if(isVirtualThreadTaskRunner()) {
+ return createVirtualThreadTaskRunner(executor, task,
maxIterationsPerRun);
+ } else {
+ return new PooledTaskRunner(executor, task,
maxIterationsPerRun);
+ }
} else {
return new DedicatedTaskRunner(task, name, priority, daemon);
}
@@ -217,6 +239,48 @@ public class TaskRunnerFactory implements Executor {
return rc;
}
+ protected ExecutorService createVirtualThreadExecutor() {
+ if(!(Runtime.version().feature() >= 21)) {
+ LOG.error("Virtual Thread support requires JDK 21 or higher");
+ throw new IllegalStateException("Virtual Thread support requires
JDK 21 or higher");
+ }
+
+ try {
+ Class<?> virtualThreadExecutorClass =
Class.forName("org.apache.activemq.thread.VirtualThreadExecutor", false,
threadClassLoader);
+ Method method =
virtualThreadExecutorClass.getMethod("createVirtualThreadExecutorService",
String.class, AtomicLong.class, Logger.class);
+ Object result = method.invoke(null, name, id, LOG);
+ if(!ExecutorService.class.isAssignableFrom(result.getClass())) {
+ throw new IllegalStateException("VirtualThreadExecutor not
returned");
+ }
+ LOG.info("VirtualThreadExecutor initialized name:{}", name);
+ return ExecutorService.class.cast(result);
+ } catch (ClassNotFoundException | NoSuchMethodException |
SecurityException | IllegalAccessException | InvocationTargetException e) {
+ LOG.error("VirtualThreadExecutor class failed to load", e);
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected TaskRunner createVirtualThreadTaskRunner(Executor executor, Task
task, int maxIterations) {
+ if(!(Runtime.version().feature() >= 21)) {
+ LOG.error("Virtual Thread support requires JDK 21 or higher");
+ throw new IllegalStateException("Virtual Thread support requires
JDK 21 or higher");
+ }
+
+ try {
+ Class<?> virtualThreadTaskRunnerClass =
Class.forName("org.apache.activemq.thread.VirtualThreadTaskRunner", false,
threadClassLoader);
+ Constructor<?> constructor =
virtualThreadTaskRunnerClass.getConstructor(Executor.class, Task.class,
Integer.TYPE);
+ Object result = constructor.newInstance(executor, task,
maxIterations);
+ if(!TaskRunner.class.isAssignableFrom(result.getClass())) {
+ throw new IllegalStateException("VirtualThreadTaskRunner not
returned");
+ }
+ return TaskRunner.class.cast(result);
+ } catch (ClassNotFoundException | NoSuchMethodException |
SecurityException | IllegalAccessException | InvocationTargetException |
InstantiationException | IllegalArgumentException e) {
+ LOG.error("VirtualThreadTaskRunner class failed to load", e);
+ throw new IllegalStateException(e);
+ }
+ }
+
+
public ExecutorService getExecutor() {
return executorRef.get();
}
@@ -265,6 +329,14 @@ public class TaskRunnerFactory implements Executor {
this.dedicatedTaskRunner = dedicatedTaskRunner;
}
+ public boolean isVirtualThreadTaskRunner() {
+ return virtualThreadTaskRunner;
+ }
+
+ public void setVirtualThreadTaskRunner(boolean virtualThreadTaskRunner) {
+ this.virtualThreadTaskRunner = virtualThreadTaskRunner;
+ }
+
public int getMaxThreadPoolSize() {
return maxThreadPoolSize;
}
diff --git
a/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java
b/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java
index 9cfd1d404e..04f6086fbc 100644
---
a/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java
+++
b/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java
@@ -3,8 +3,6 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 03ca26e6f8..d87127ed82 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -41,6 +41,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>activemq-client-jdk21</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-openwire-legacy</artifactId>
diff --git a/assembly/src/main/descriptors/common-bin.xml
b/assembly/src/main/descriptors/common-bin.xml
index 4574d21bc0..b916a0c194 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -158,6 +158,16 @@
<fileMode>0644</fileMode>
<directoryMode>0755</directoryMode>
</dependencySet>
+ <dependencySet>
+ <outputDirectory>lib/jdk21</outputDirectory>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ <includes>
+ <include>${pom.groupId}:activemq-client-jdk21</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ <directoryMode>0755</directoryMode>
+ </dependencySet>
<dependencySet>
<outputDirectory>lib/camel</outputDirectory>
<unpack>false</unpack>
diff --git a/pom.xml b/pom.xml
index 7facafd4c8..0472e17ecd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,6 +275,11 @@
<artifactId>activemq-client</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client-jdk21</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
@@ -1452,5 +1457,48 @@
<javadoc.options>-Xdoclint:none</javadoc.options>
</properties>
</profile>
+ <profile>
+ <id>jdk-21</id>
+ <activation>
+ <jdk>[21,)</jdk>
+ </activation>
+ <modules>
+ <module>activemq-openwire-generator</module>
+ <module>activemq-client</module>
+ <module>activemq-client-jdk21</module>
+ <module>activemq-client-jdk21-test</module>
+ <module>activemq-openwire-legacy</module>
+ <module>activemq-broker</module>
+ <module>activemq-stomp</module>
+ <module>activemq-mqtt</module>
+ <module>activemq-amqp</module>
+ <module>activemq-kahadb-store</module>
+ <module>activemq-jdbc-store</module>
+ <module>activemq-unit-tests</module>
+ <module>activemq-all</module>
+ <module>activemq-console</module>
+ <module>activemq-jaas</module>
+ <module>activemq-jms-pool</module>
+ <module>activemq-pool</module>
+ <module>activemq-cf</module>
+ <module>activemq-ra</module>
+ <module>activemq-rar</module>
+ <module>activemq-run</module>
+ <module>activemq-shiro</module>
+ <module>activemq-spring</module>
+ <module>activemq-runtime-config</module>
+ <module>activemq-tooling</module>
+ <module>activemq-web</module>
+ <module>activemq-web-demo</module>
+ <module>activemq-web-console</module>
+ <module>activemq-karaf</module>
+ <module>activemq-osgi</module>
+ <module>activemq-blueprint</module>
+ <module>activemq-karaf-itest</module>
+ <module>assembly</module>
+ <module>activemq-log4j-appender</module>
+ <module>activemq-http</module>
+ </modules>
+ </profile>
</profiles>
</project>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact