This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch context-value-scoped-value-support in repository https://gitbox.apache.org/repos/asf/camel.git
commit 59e652d859e4f457335e2bd455ac6d837d919172 Author: Guillaume Nodet <[email protected]> AuthorDate: Wed Jan 7 12:17:18 2026 +0100 Add load tests for virtual threads performance comparison Add two disabled load test classes that can be run manually to compare performance between platform threads and virtual threads: - VirtualThreadsLoadTest: Uses SEDA with concurrent consumers to test throughput with simulated I/O delays - VirtualThreadsWithThreadsDSLLoadTest: Uses threads() DSL to exercise the ContextValue/ScopedValue code paths Tests are disabled by default and configurable via system properties: - loadtest.messages: Number of messages to process (default: 5000) - loadtest.producers: Number of producer threads (default: 50) - loadtest.consumers: Number of concurrent consumers (default: 100) - loadtest.delay: Simulated I/O delay in ms (default: 5-10) Run with: mvn test -Dtest=VirtualThreadsLoadTest \ -Djunit.jupiter.conditions.deactivate='org.junit.*DisabledCondition' \ -Dcamel.threads.virtual.enabled=true --- .../camel/processor/VirtualThreadsLoadTest.java | 163 +++++++++++++++++++++ .../VirtualThreadsWithThreadsDSLLoadTest.java | 144 ++++++++++++++++++ 2 files changed, 307 insertions(+) diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java new file mode 100644 index 000000000000..fac56aec7290 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsLoadTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.StopWatch; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Load test to compare performance of platform threads vs virtual threads. + * <p> + * This test is disabled by default as it's meant to be run manually for benchmarking. + * <p> + * Run with platform threads (default): + * + * <pre> + * mvn test -Dtest=VirtualThreadsLoadTest -pl core/camel-core + * </pre> + * <p> + * Run with virtual threads (JDK 21+): + * + * <pre> + * mvn test -Dtest=VirtualThreadsLoadTest -pl core/camel-core -Dcamel.threads.virtual.enabled=true + * </pre> + */ +@Disabled("Manual load test - run explicitly for benchmarking") +public class VirtualThreadsLoadTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualThreadsLoadTest.class); + + // Configuration - adjust these for your environment + // With 200 consumers and 5ms delay, theoretical max throughput = 200 * 1000/5 = 40,000 msg/sec + private static final int TOTAL_MESSAGES = Integer.getInteger("loadtest.messages", 5_000); + private static final int CONCURRENT_PRODUCERS = Integer.getInteger("loadtest.producers", 50); + private static final int CONCURRENT_CONSUMERS = Integer.getInteger("loadtest.consumers", 100); + private static final int SIMULATED_IO_DELAY_MS = Integer.getInteger("loadtest.delay", 5); + + private final LongAdder processedCount = new LongAdder(); + private CountDownLatch completionLatch; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + // Log whether virtual threads are enabled + boolean virtualThreads = "true".equalsIgnoreCase( + System.getProperty("camel.threads.virtual.enabled", "false")); + LOG.info("Virtual threads enabled: {}", virtualThreads); + return context; + } + + @Test + public void testHighConcurrencyWithSimulatedIO() throws Exception { + completionLatch = new CountDownLatch(TOTAL_MESSAGES); + processedCount.reset(); + + System.out.println("Starting load test: " + TOTAL_MESSAGES + " messages, " + + CONCURRENT_PRODUCERS + " producers, " + CONCURRENT_CONSUMERS + " consumers, " + + SIMULATED_IO_DELAY_MS + "ms I/O delay"); + + StopWatch watch = new StopWatch(); + + // Create producer threads - use virtual threads when available for producers too + ExecutorService producerPool; + try { + producerPool = (ExecutorService) Executors.class + .getMethod("newVirtualThreadPerTaskExecutor").invoke(null); + System.out.println("Using virtual threads for producers"); + } catch (Exception e) { + producerPool = Executors.newFixedThreadPool(CONCURRENT_PRODUCERS); + System.out.println("Using platform threads for producers"); + } + + for (int i = 0; i < TOTAL_MESSAGES; i++) { + final int msgNum = i; + producerPool.submit(() -> { + try { + template.sendBody("seda:start", "Message-" + msgNum); + } catch (Exception e) { + LOG.error("Error sending message", e); + } + }); + } + + // Wait for all messages to be processed + boolean completed = completionLatch.await(5, TimeUnit.MINUTES); + + long elapsed = watch.taken(); + producerPool.shutdown(); + + // Calculate metrics + long processed = processedCount.sum(); + double throughput = (processed * 1000.0) / elapsed; + double avgLatency = elapsed / (double) processed; + + // Use System.out for guaranteed visibility in test output + System.out.println(); + System.out.println("=== Load Test Results ==="); + System.out.println("Completed: " + (completed ? "YES" : "NO (timeout)")); + System.out.println("Messages processed: " + processed); + System.out.println("Total time: " + elapsed + " ms"); + System.out.println("Throughput: " + String.format("%.2f", throughput) + " msg/sec"); + System.out.println("Average latency: " + String.format("%.2f", avgLatency) + " ms/msg"); + System.out.println("Virtual threads: " + System.getProperty("camel.threads.virtual.enabled", "false")); + System.out.println(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // Route with concurrent consumers and simulated I/O delay + // Use larger queue size to avoid blocking + from("seda:start?concurrentConsumers=" + CONCURRENT_CONSUMERS + "&size=" + (TOTAL_MESSAGES + 1000)) + .routeId("loadTestRoute") + .process(new SimulatedIOProcessor()) + .process(exchange -> { + processedCount.increment(); + completionLatch.countDown(); + }); + } + }; + } + + /** + * Processor that simulates I/O delay (e.g., database call, HTTP request). This is where virtual threads should show + * significant improvement - platform threads block during sleep, while virtual threads yield. + */ + private static class SimulatedIOProcessor implements Processor { + @Override + public void process(Exchange exchange) throws Exception { + // Simulate blocking I/O operation + Thread.sleep(SIMULATED_IO_DELAY_MS); + } + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsWithThreadsDSLLoadTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsWithThreadsDSLLoadTest.java new file mode 100644 index 000000000000..3a3f4997ed44 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/VirtualThreadsWithThreadsDSLLoadTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.StopWatch; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Load test using the threads() DSL to directly exercise the thread pool creation which uses ContextValue/ScopedValue + * for the "create processor" context. + * <p> + * This test is disabled by default as it's meant to be run manually for benchmarking. + * <p> + * Run with platform threads (default): + * + * <pre> + * mvn test -Dtest=VirtualThreadsWithThreadsDSLLoadTest -pl core/camel-core + * </pre> + * <p> + * Run with virtual threads (JDK 21+): + * + * <pre> + * mvn test -Dtest=VirtualThreadsWithThreadsDSLLoadTest -pl core/camel-core -Dcamel.threads.virtual.enabled=true + * </pre> + */ +@Disabled("Manual load test - run explicitly for benchmarking") +public class VirtualThreadsWithThreadsDSLLoadTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualThreadsWithThreadsDSLLoadTest.class); + + // Configuration - can be overridden via system properties + private static final int TOTAL_MESSAGES = Integer.getInteger("loadtest.messages", 5_000); + private static final int CONCURRENT_PRODUCERS = Integer.getInteger("loadtest.producers", 50); + private static final int THREAD_POOL_SIZE = Integer.getInteger("loadtest.poolSize", 20); + private static final int MAX_POOL_SIZE = Integer.getInteger("loadtest.maxPoolSize", 100); + private static final int SIMULATED_IO_DELAY_MS = Integer.getInteger("loadtest.delay", 10); + + private final LongAdder processedCount = new LongAdder(); + private CountDownLatch completionLatch; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + boolean virtualThreads = "true".equalsIgnoreCase( + System.getProperty("camel.threads.virtual.enabled", "false")); + LOG.info("Virtual threads enabled: {}", virtualThreads); + return context; + } + + @Test + public void testThreadsDSLWithSimulatedIO() throws Exception { + completionLatch = new CountDownLatch(TOTAL_MESSAGES); + processedCount.reset(); + + LOG.info("Starting threads() DSL load test: {} messages, {} producers, pool {}-{}, {}ms I/O delay", + TOTAL_MESSAGES, CONCURRENT_PRODUCERS, THREAD_POOL_SIZE, MAX_POOL_SIZE, SIMULATED_IO_DELAY_MS); + + StopWatch watch = new StopWatch(); + + ExecutorService producerPool = Executors.newFixedThreadPool(CONCURRENT_PRODUCERS); + for (int i = 0; i < TOTAL_MESSAGES; i++) { + final int msgNum = i; + producerPool.submit(() -> { + try { + template.sendBody("direct:start", "Message-" + msgNum); + } catch (Exception e) { + LOG.error("Error sending message", e); + } + }); + } + + boolean completed = completionLatch.await(5, TimeUnit.MINUTES); + + long elapsed = watch.taken(); + producerPool.shutdown(); + + long processed = processedCount.sum(); + double throughput = (processed * 1000.0) / elapsed; + + // Use System.out for guaranteed visibility in test output + System.out.println(); + System.out.println("=== threads() DSL Load Test Results ==="); + System.out.println("Completed: " + (completed ? "YES" : "NO (timeout)")); + System.out.println("Messages processed: " + processed); + System.out.println("Total time: " + elapsed + " ms"); + System.out.println("Throughput: " + String.format("%.2f", throughput) + " msg/sec"); + System.out.println("Virtual threads: " + System.getProperty("camel.threads.virtual.enabled", "false")); + System.out.println(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // Route using threads() DSL - this exercises ContextValue for createProcessor + from("direct:start") + .routeId("threadsDSLLoadTest") + .threads(THREAD_POOL_SIZE, MAX_POOL_SIZE) + .threadName("loadTest") + .process(new SimulatedIOProcessor()) + .process(exchange -> { + processedCount.increment(); + completionLatch.countDown(); + }); + } + }; + } + + private static class SimulatedIOProcessor implements Processor { + @Override + public void process(Exchange exchange) throws Exception { + Thread.sleep(SIMULATED_IO_DELAY_MS); + } + } +}
