This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch context-value-scoped-value-support in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8576658e6586e32bbe63801f52f9cd7dabcb20c9 Author: Guillaume Nodet <[email protected]> AuthorDate: Wed Jan 7 09:29:49 2026 +0100 Add ContextValue abstraction for ScopedValue support on JDK 25+ This commit introduces a new ContextValue abstraction that provides a unified API for thread-scoped data sharing, with implementations using either ThreadLocal (JDK 17+) or ScopedValue (JDK 25+ with virtual threads). Key changes: - Add ContextValue interface in camel-util with factory methods for creating context values and executing operations within scoped contexts - Add ContextValueFactory with ThreadLocal implementation for base JDK - Add Java 25 multi-release JAR variant using ScopedValue when available - Deprecate NamedThreadLocal in favor of ContextValue.newThreadLocal() - Add new scoped API methods to ExtendedCamelContext: - setupRoutes(Runnable) and setupRoutes(Callable) - createRoute(String, Runnable) and createRoute(String, Callable) - createProcessor(String, Runnable) and createProcessor(String, Callable) - Deprecate the old boolean/void signaling methods (setupRoutes(boolean), createRoute(String), createProcessor(String)) - Update DefaultCamelContextExtension to use ContextValue.where() for scoped execution, enabling proper ScopedValue support on virtual threads - Update DefaultReactiveExecutor to use ContextValue instead of NamedThreadLocal - Simplify Worker class by removing cached stats field The ContextValue abstraction allows Camel to leverage ScopedValue on JDK 25+ when virtual threads are enabled, providing better performance characteristics for virtual thread workloads while maintaining backward compatibility with ThreadLocal on older JDK versions. Documentation added to ContextValue explaining that ThreadLocal variants should hold lightweight objects to avoid memory leaks with pooled threads. --- .../camel/component/kamelet/KameletReifier.java | 10 +- .../org/apache/camel/ExtendedCamelContext.java | 89 ++++++++- .../camel/impl/engine/AbstractCamelContext.java | 11 +- .../impl/engine/DefaultCamelContextExtension.java | 74 ++++++- .../camel/impl/engine/DefaultReactiveExecutor.java | 27 +-- .../org/apache/camel/impl/DefaultCamelContext.java | 26 ++- .../org/apache/camel/reifier/ProcessorReifier.java | 18 +- .../core/xml/AbstractCamelContextFactoryBean.java | 14 +- core/camel-util/pom.xml | 29 +++ .../apache/camel/util/concurrent/ContextValue.java | 188 ++++++++++++++++++ .../camel/util/concurrent/ContextValueFactory.java | 140 +++++++++++++ .../camel/util/concurrent/NamedThreadLocal.java | 5 + .../camel/util/concurrent/ContextValueFactory.java | 220 +++++++++++++++++++++ .../camel/util/concurrent/ContextValueTest.java | 135 +++++++++++++ 14 files changed, 922 insertions(+), 64 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java index fd378851f230..dd544eacc95c 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java @@ -39,17 +39,15 @@ public class KameletReifier extends ProcessorReifier<KameletDefinition> { } // wrap in uow String outputId = definition.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class)); - camelContext.getCamelContextExtension().createProcessor(outputId); - try { - Processor answer = new KameletProcessor(camelContext, parseString(definition.getName()), processor); + final Processor childProcessor = processor; + return camelContext.getCamelContextExtension().createProcessor(outputId, () -> { + Processor answer = new KameletProcessor(camelContext, parseString(definition.getName()), childProcessor); if (answer instanceof DisabledAware da) { da.setDisabled(isDisabled(camelContext, definition)); } answer = PluginHelper.getInternalProcessorFactory(camelContext) .addUnitOfWorkProcessorAdvice(camelContext, answer, null); return answer; - } finally { - camelContext.getCamelContextExtension().createProcessor(null); - } + }); } } diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index 855f963a2ffd..a05f4a5f3486 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -19,6 +19,7 @@ package org.apache.camel; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.function.Supplier; import org.apache.camel.catalog.RuntimeCamelCatalog; @@ -108,9 +109,12 @@ public interface ExtendedCamelContext { /** * Method to signal to {@link CamelContext} that the process to initialize setup routes is in progress. * - * @param done <tt>false</tt> to start the process, call again with <tt>true</tt> to signal its done. - * @see #isSetupRoutes() + * @param done <tt>false</tt> to start the process, call again with <tt>true</tt> to signal its done. + * @see #isSetupRoutes() + * @deprecated use {@link #setupRoutes(Runnable)} or {@link #setupRoutes(Callable)} for ScopedValue + * compatibility */ + @Deprecated(since = "4.17.0") void setupRoutes(boolean done); /** @@ -127,12 +131,36 @@ public interface ExtendedCamelContext { */ boolean isSetupRoutes(); + /** + * Executes the given operation within a "setup routes" context. + * <p/> + * This is the preferred method for ScopedValue compatibility on virtual threads. + * + * @param operation the operation to execute + */ + void setupRoutes(Runnable operation); + + /** + * Executes the given callable within a "setup routes" context and returns its result. + * <p/> + * This is the preferred method for ScopedValue compatibility on virtual threads. + * + * @param <T> the return type + * @param callable the callable to execute + * @return the result of the callable + * @throws Exception if the callable throws + */ + <T> T setupRoutes(Callable<T> callable) throws Exception; + /** * Method to signal to {@link CamelContext} that the process to create routes is in progress. * - * @param routeId the current id of the route being created - * @see #getCreateRoute() + * @param routeId the current id of the route being created + * @see #getCreateRoute() + * @deprecated use {@link #createRoute(String, Runnable)} or {@link #createRoute(String, Callable)} for + * ScopedValue compatibility */ + @Deprecated(since = "4.17.0") void createRoute(String routeId); /** @@ -145,12 +173,38 @@ public interface ExtendedCamelContext { */ String getCreateRoute(); + /** + * Executes the given operation within a "create route" context. + * <p/> + * This is the preferred method for ScopedValue compatibility on virtual threads. + * + * @param routeId the id of the route being created + * @param operation the operation to execute + */ + void createRoute(String routeId, Runnable operation); + + /** + * Executes the given callable within a "create route" context and returns its result. + * <p/> + * This is the preferred method for ScopedValue compatibility on virtual threads. + * + * @param <T> the return type + * @param routeId the id of the route being created + * @param callable the callable to execute + * @return the result of the callable + * @throws Exception if the callable throws + */ + <T> T createRoute(String routeId, Callable<T> callable) throws Exception; + /** * Method to signal to {@link CamelContext} that creation of a given processor is in progress. * - * @param processorId the current id of the processor being created - * @see #getCreateProcessor() + * @param processorId the current id of the processor being created + * @see #getCreateProcessor() + * @deprecated use {@link #createProcessor(String, Runnable)} or + * {@link #createProcessor(String, Callable)} for ScopedValue compatibility */ + @Deprecated(since = "4.17.0") void createProcessor(String processorId); /** @@ -163,6 +217,29 @@ public interface ExtendedCamelContext { */ String getCreateProcessor(); + /** + * Executes the given operation within a "create processor" context. + * <p/> + * This is the preferred method for ScopedValue compatibility on virtual threads. + * + * @param processorId the id of the processor being created + * @param operation the operation to execute + */ + void createProcessor(String processorId, Runnable operation); + + /** + * Executes the given callable within a "create processor" context and returns its result. + * <p/> + * This is the preferred method for ScopedValue compatibility on virtual threads. + * + * @param <T> the return type + * @param processorId the id of the processor being created + * @param callable the callable to execute + * @return the result of the callable + * @throws Exception if the callable throws + */ + <T> T createProcessor(String processorId, Callable<T> callable) throws Exception; + /** * Registers a {@link org.apache.camel.spi.EndpointStrategy callback} to allow you to do custom logic when an * {@link Endpoint} is about to be registered to the {@link org.apache.camel.spi.EndpointRegistry}. diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 5b1e7da577c3..a10f58a4b4c3 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -205,6 +205,7 @@ import org.apache.camel.util.StopWatch; import org.apache.camel.util.StringHelper; import org.apache.camel.util.TimeUtils; import org.apache.camel.util.URISupport; +import org.apache.camel.util.concurrent.ContextValue; import org.apache.camel.vault.VaultConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,8 +234,8 @@ public abstract class AbstractCamelContext extends BaseService private final Map<String, Language> languages = new ConcurrentHashMap<>(); private final Map<String, DataFormat> dataformats = new ConcurrentHashMap<>(); private final List<LifecycleStrategy> lifecycleStrategies = new CopyOnWriteArrayList<>(); - private final ThreadLocal<Boolean> isStartingRoutes = new ThreadLocal<>(); - private final ThreadLocal<Boolean> isLockModel = new ThreadLocal<>(); + private final ContextValue<Boolean> isStartingRoutes = ContextValue.newInstance("isStartingRoutes"); + private final ContextValue<Boolean> isLockModel = ContextValue.newInstance("isLockModel"); private final Map<String, RouteService> routeServices = new LinkedHashMap<>(); private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>(); private final InternalRouteStartupManager internalRouteStartupManager = new InternalRouteStartupManager(); @@ -1121,8 +1122,7 @@ public abstract class AbstractCamelContext extends BaseService } public boolean isStartingRoutes() { - Boolean answer = isStartingRoutes.get(); - return answer != null && answer; + return Boolean.TRUE.equals(isStartingRoutes.orElse(false)); } public void setStartingRoutes(boolean starting) { @@ -1134,8 +1134,7 @@ public abstract class AbstractCamelContext extends BaseService } public boolean isLockModel() { - Boolean answer = isLockModel.get(); - return answer != null && answer; + return Boolean.TRUE.equals(isLockModel.orElse(false)); } public void setLockModel(boolean lockModel) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java index 6e3cb2a5953c..7c44e5c62cbd 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java @@ -21,6 +21,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Lock; @@ -87,15 +88,16 @@ import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.startup.DefaultStartupStepRecorder; import org.apache.camel.util.StringHelper; import org.apache.camel.util.URISupport; +import org.apache.camel.util.concurrent.ContextValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class DefaultCamelContextExtension implements ExtendedCamelContext { private final AbstractCamelContext camelContext; - private final ThreadLocal<String> isCreateRoute = new ThreadLocal<>(); - private final ThreadLocal<String> isCreateProcessor = new ThreadLocal<>(); - private final ThreadLocal<Boolean> isSetupRoutes = new ThreadLocal<>(); + private final ContextValue<String> isCreateRoute = ContextValue.newInstance("isCreateRoute"); + private final ContextValue<String> isCreateProcessor = ContextValue.newInstance("isCreateProcessor"); + private final ContextValue<Boolean> isSetupRoutes = ContextValue.newInstance("isSetupRoutes"); private final List<InterceptStrategy> interceptStrategies = new ArrayList<>(); private final Map<String, FactoryFinder> factories = new ConcurrentHashMap<>(); private final Map<String, FactoryFinder> bootstrapFactories = new ConcurrentHashMap<>(); @@ -318,18 +320,17 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { @Override public boolean isSetupRoutes() { - Boolean answer = isSetupRoutes.get(); - return answer != null && answer; + return Boolean.TRUE.equals(isSetupRoutes.orElse(false)); } @Override public String getCreateRoute() { - return isCreateRoute.get(); + return isCreateRoute.orElse(null); } @Override public String getCreateProcessor() { - return isCreateProcessor.get(); + return isCreateProcessor.orElse(null); } @Override @@ -419,15 +420,35 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { } @Override + @Deprecated public void createRoute(String routeId) { if (routeId != null) { isCreateRoute.set(routeId); } else { - isSetupRoutes.remove(); + isCreateRoute.remove(); } } @Override + public void createRoute(String routeId, Runnable operation) { + ContextValue.where(isCreateRoute, routeId, operation); + } + + @Override + public <T> T createRoute(String routeId, Callable<T> callable) throws Exception { + return ContextValue.where(isCreateRoute, routeId, () -> { + try { + return callable.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + @Deprecated public void createProcessor(String processorId) { if (processorId != null) { isCreateProcessor.set(processorId); @@ -437,6 +458,25 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { } @Override + public void createProcessor(String processorId, Runnable operation) { + ContextValue.where(isCreateProcessor, processorId, operation); + } + + @Override + public <T> T createProcessor(String processorId, Callable<T> callable) throws Exception { + return ContextValue.where(isCreateProcessor, processorId, () -> { + try { + return callable.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + @Deprecated public void setupRoutes(boolean done) { if (done) { isSetupRoutes.remove(); @@ -445,6 +485,24 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { } } + @Override + public void setupRoutes(Runnable operation) { + ContextValue.where(isSetupRoutes, true, operation); + } + + @Override + public <T> T setupRoutes(Callable<T> callable) throws Exception { + return ContextValue.where(isSetupRoutes, true, () -> { + try { + return callable.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + @Override public List<InterceptStrategy> getInterceptStrategies() { return interceptStrategies; diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index 596565f46510..939c553c7991 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -26,7 +26,7 @@ import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.concurrent.NamedThreadLocal; +import org.apache.camel.util.concurrent.ContextValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +44,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE private final LongAdder runningWorkers = new LongAdder(); private final LongAdder pendingTasks = new LongAdder(); - private final NamedThreadLocal<Worker> workers = new NamedThreadLocal<>("CamelReactiveWorker", () -> { + private final ContextValue<Worker> workers = ContextValue.newThreadLocal("CamelReactiveWorker", () -> { int number = createdWorkers.incrementAndGet(); return new Worker(number, DefaultReactiveExecutor.this); }); @@ -66,10 +66,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE @Override public void scheduleQueue(Runnable runnable) { - if (LOG.isTraceEnabled()) { - LOG.trace("ScheduleQueue: {}", runnable); - } - workers.get().queue.add(runnable); + workers.get().scheduleQueue(runnable); } @Override @@ -120,7 +117,6 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE private final int number; private final DefaultReactiveExecutor executor; - private final boolean stats; private volatile Deque<Runnable> queue = new ArrayDeque<>(); private volatile Deque<Deque<Runnable>> back; private volatile boolean running; @@ -128,7 +124,6 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE public Worker(int number, DefaultReactiveExecutor executor) { this.number = number; this.executor = executor; - this.stats = executor != null && executor.isStatisticsEnabled(); } void schedule(Runnable runnable, boolean first, boolean main, boolean sync) { @@ -148,6 +143,14 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE tryExecuteReactiveWork(runnable, sync); } + void scheduleQueue(Runnable runnable) { + if (LOG.isTraceEnabled()) { + LOG.trace("ScheduleQueue: {}", runnable); + } + queue.add(runnable); + incrementPendingTasks(); + } + private void executeMainFlow() { if (!queue.isEmpty()) { if (back == null) { @@ -204,25 +207,25 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE } private void decrementRunningWorkers() { - if (stats) { + if (executor.statisticsEnabled) { executor.runningWorkers.decrement(); } } private void incrementRunningWorkers() { - if (stats) { + if (executor.statisticsEnabled) { executor.runningWorkers.increment(); } } private void incrementPendingTasks() { - if (stats) { + if (executor.statisticsEnabled) { executor.pendingTasks.increment(); } } private void decrementPendingTasks() { - if (stats) { + if (executor.statisticsEnabled) { executor.pendingTasks.decrement(); } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 2a9ede673ad8..de77eb7e8a2e 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -84,7 +84,7 @@ import org.apache.camel.support.scan.InvertingPackageScanFilter; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.OrderedLocationProperties; import org.apache.camel.util.StopWatch; -import org.apache.camel.util.concurrent.NamedThreadLocal; +import org.apache.camel.util.concurrent.ContextValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +96,8 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame // global options that can be set on CamelContext as part of concurrent testing // which means options should be isolated via thread-locals and not a static instance // use a HashMap to store only JDK classes in the thread-local so there will not be any Camel classes leaking - private static final ThreadLocal<Map<String, Object>> OPTIONS = new NamedThreadLocal<>("CamelContextOptions", HashMap::new); + private static final ContextValue<Map<String, Object>> OPTIONS + = ContextValue.newThreadLocal("CamelContextOptions", HashMap::new); private static final String OPTION_NO_START = "OptionNoStart"; private static final String OPTION_DISABLE_JMX = "OptionDisableJMX"; private static final String OPTION_EXCLUDE_ROUTES = "OptionExcludeRoutes"; @@ -760,13 +761,19 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame = getCamelContextReference().getCamelContextExtension().getStartupStepRecorder(); StartupStep step = recorder.beginStep(Route.class, routeDefinition.getRouteId(), "Create Route"); - getCamelContextExtension().createRoute(routeDefinition.getRouteId()); - - Route route = model.getModelReifierFactory().createRoute(this, routeDefinition); - recorder.endStep(step); - - RouteService routeService = new RouteService(route); - startRouteService(routeService, true); + getCamelContextExtension().createRoute(routeDefinition.getRouteId(), () -> { + try { + Route route = model.getModelReifierFactory().createRoute(this, routeDefinition); + recorder.endStep(step); + + RouteService routeService = new RouteService(route); + startRouteService(routeService, true); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } else { // Add the definition to the list of definitions to remove as the route is excluded if (routeDefinitionsToRemove == null) { @@ -790,7 +797,6 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame if (!alreadyStartingRoutes) { setStartingRoutes(false); } - getCamelContextExtension().createRoute(null); } } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index bc4fa9ade67b..7cc330798202 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -809,9 +809,8 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends StartupStep step = camelContext.getCamelContextExtension().getStartupStepRecorder().beginStep(ProcessorReifier.class, outputId, "Create processor"); - camelContext.getCamelContextExtension().createProcessor(outputId); - Processor processor = null; - try { + return camelContext.getCamelContextExtension().createProcessor(outputId, () -> { + Processor processor = null; // at first use custom factory final ProcessorFactory processorFactory = PluginHelper.getProcessorFactory(camelContext); if (processorFactory != null) { @@ -822,10 +821,8 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends processor = reifier(route, output).createProcessor(); } camelContext.getCamelContextExtension().getStartupStepRecorder().endStep(step); - } finally { - camelContext.getCamelContextExtension().createProcessor(null); - } - return processor; + return processor; + }); } /** @@ -833,8 +830,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends */ protected Channel makeProcessor() throws Exception { String outputId = definition.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class)); - camelContext.getCamelContextExtension().createProcessor(outputId); - try { + return camelContext.getCamelContextExtension().createProcessor(outputId, () -> { Processor processor = null; // allow any custom logic before we create the processor @@ -865,9 +861,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends return null; } return wrapProcessor(processor); - } finally { - camelContext.getCamelContextExtension().createProcessor(null); - } + }); } /** diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index 0c9cd0154383..6e1ec0844dd7 100644 --- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -597,8 +597,15 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex LOG.debug("Setting up routes"); // mark that we are setting up routes - getContext().getCamelContextExtension().setupRoutes(false); + getContext().getCamelContextExtension().setupRoutes(this::doSetupRoutes); + } + } + /** + * Internal method to do the actual route setup within the setupRoutes context. + */ + private void doSetupRoutes() { + try { // add route configurations initRouteConfigurationRefs(); getContext().addRouteConfigurations(getRouteConfigurations()); @@ -663,9 +670,8 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex findRouteBuilders(); installRoutes(); - - // and we are now finished setting up the routes - getContext().getCamelContextExtension().setupRoutes(true); + } catch (Exception e) { + throw new RuntimeException(e); } } diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml index afde05060a6d..e46a39b5528b 100644 --- a/core/camel-util/pom.xml +++ b/core/camel-util/pom.xml @@ -319,5 +319,34 @@ </plugins> </build> </profile> + <profile> + <id>java-25-sources</id> + <activation> + <jdk>[25,)</jdk> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin-version}</version> + <executions> + <execution> + <id>compile-java-25</id> + <phase>compile</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <release>25</release> + <compileSourceRoots>${project.basedir}/src/main/java25</compileSourceRoots> + <multiReleaseOutput>true</multiReleaseOutput> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ContextValue.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ContextValue.java new file mode 100644 index 000000000000..208a3cfda259 --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ContextValue.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.NoSuchElementException; +import java.util.function.Supplier; + +/** + * A context value abstraction that provides thread-scoped data sharing. + * <p> + * This interface provides a unified API for sharing data within a thread context, with implementations that use either + * {@link ThreadLocal} (for JDK 17+) or {@link java.lang.ScopedValue} (for JDK 21+ with virtual threads). + * <p> + * The implementation is chosen automatically based on the JDK version and whether virtual threads are enabled via the + * {@code camel.threads.virtual.enabled} system property. + * <p> + * <b>Usage patterns:</b> + * <ul> + * <li><b>Read-only context passing:</b> Use {@link #where(ContextValue, Object, Runnable)} to bind a value for the + * duration of a code block</li> + * <li><b>Mutable state:</b> Use {@link #newThreadLocal(String)} for state that needs to be modified after + * initialization</li> + * </ul> + * <p> + * <b>Important:</b> When using {@link #newThreadLocal(String, Supplier)}, the values should be <b>lightweight + * objects</b>. Heavy objects stored in ThreadLocal can lead to memory leaks (if threads are pooled) and increased + * memory consumption (one instance per thread). Consider whether the object truly needs per-thread state, or if it can + * be shared or passed as a parameter instead. + * <p> + * <b>Example:</b> + * + * <pre>{@code + * private static final ContextValue<String> ROUTE_ID = ContextValue.newInstance("routeId"); + * + * // Bind a value for a scope + * ContextValue.where(ROUTE_ID, "myRoute", () -> { + * // Code here can access ROUTE_ID.get() + * processRoute(); + * }); + * }</pre> + * + * @param <T> the type of value stored in this context + * @see java.lang.ThreadLocal + * @see java.lang.ScopedValue + */ +public interface ContextValue<T> { + + /** + * Returns the value of this context variable for the current thread. + * <p> + * For ScopedValue-based implementations (JDK 21+), this will throw {@link NoSuchElementException} if called outside + * a binding scope. For ThreadLocal-based implementations, this returns the value set via {@link #set(Object)} or + * {@code null} if not set. + * + * @return the current value + * @throws NoSuchElementException if no value is bound (ScopedValue implementation only) + */ + T get(); + + /** + * Returns the value of this context variable for the current thread, or the given default value if no value is + * bound. + * + * @param defaultValue the value to return if no value is bound + * @return the current value, or {@code defaultValue} if not bound + */ + T orElse(T defaultValue); + + /** + * Returns whether a value is currently bound for this context variable. + * + * @return {@code true} if a value is bound, {@code false} otherwise + */ + boolean isBound(); + + /** + * Sets the value for this context variable (ThreadLocal-based implementations only). + * <p> + * This method is only supported by ThreadLocal-based implementations. For ScopedValue-based implementations, use + * {@link #where(ContextValue, Object, Runnable)} instead. + * + * @param value the value to set + * @throws UnsupportedOperationException if called on a ScopedValue-based implementation + */ + void set(T value); + + /** + * Removes the value for this context variable (ThreadLocal-based implementations only). + * <p> + * This method is only supported by ThreadLocal-based implementations. + * + * @throws UnsupportedOperationException if called on a ScopedValue-based implementation + */ + void remove(); + + /** + * Returns the name of this context value (for debugging purposes). + * + * @return the name + */ + String name(); + + /** + * Creates a new context value with the given name. + * <p> + * The implementation will use ScopedValue on JDK 21+ when virtual threads are enabled, otherwise it will use + * ThreadLocal. + * + * @param <T> the type of value + * @param name the name for debugging purposes + * @return a new context value + */ + static <T> ContextValue<T> newInstance(String name) { + return ContextValueFactory.newInstance(name); + } + + /** + * Creates a new ThreadLocal-based context value with the given name. + * <p> + * This always uses ThreadLocal, regardless of JDK version or virtual thread settings. Use this when you need + * mutable state that can be modified after initialization. + * + * @param <T> the type of value + * @param name the name for debugging purposes + * @return a new ThreadLocal-based context value + */ + static <T> ContextValue<T> newThreadLocal(String name) { + return ContextValueFactory.newThreadLocal(name); + } + + /** + * Creates a new ThreadLocal-based context value with the given name and initial value supplier. + * <p> + * This always uses ThreadLocal regardless of JDK version or virtual thread settings. The supplier is called to + * provide the initial value when {@link #get()} is called and no value has been set. + * + * @param <T> the type of value + * @param name the name for debugging purposes + * @param supplier the supplier for the initial value + * @return a new ThreadLocal-based context value with initial value support + */ + static <T> ContextValue<T> newThreadLocal(String name, Supplier<T> supplier) { + return ContextValueFactory.newThreadLocal(name, supplier); + } + + /** + * Executes the given operation with the context value bound to the specified value. + * <p> + * The binding is only visible to the current thread and threads created within the operation (for ScopedValue + * implementations). + * + * @param <T> the type of value + * @param <R> the return type + * @param key the context value to bind + * @param value the value to bind + * @param operation the operation to execute + * @return the result of the operation + */ + static <T, R> R where(ContextValue<T> key, T value, Supplier<R> operation) { + return ContextValueFactory.where(key, value, operation); + } + + /** + * Executes the given operation with the context value bound to the specified value. + * + * @param <T> the type of value + * @param key the context value to bind + * @param value the value to bind + * @param operation the operation to execute + */ + static <T> void where(ContextValue<T> key, T value, Runnable operation) { + ContextValueFactory.where(key, value, operation); + } +} diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ContextValueFactory.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ContextValueFactory.java new file mode 100644 index 000000000000..f3b991c29158 --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ContextValueFactory.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.function.Supplier; + +/** + * Factory for creating {@link ContextValue} instances. + * <p> + * This class is package-private and used internally by {@link ContextValue}. The implementation is overridden in Java + * 21+ to use ScopedValue when appropriate. + */ +class ContextValueFactory { + + /** + * Creates a new context value with the given name. + * <p> + * This base implementation always uses ThreadLocal. The Java 21+ version may use ScopedValue when virtual threads + * are enabled. + */ + static <T> ContextValue<T> newInstance(String name) { + return new ThreadLocalContextValue<>(name); + } + + /** + * Creates a new ThreadLocal-based context value with the given name. + * <p> + * This always uses ThreadLocal, regardless of JDK version. + */ + static <T> ContextValue<T> newThreadLocal(String name) { + return new ThreadLocalContextValue<>(name); + } + + /** + * Creates a new ThreadLocal-based context value with the given name and initial value supplier. + * <p> + * This always uses ThreadLocal, regardless of JDK version. + */ + static <T> ContextValue<T> newThreadLocal(String name, Supplier<T> supplier) { + return new ThreadLocalContextValue<>(name, supplier); + } + + /** + * Executes the given operation with the context value bound to the specified value. + */ + static <T, R> R where(ContextValue<T> key, T value, Supplier<R> operation) { + if (key instanceof ThreadLocalContextValue<T> tlKey) { + T oldValue = tlKey.get(); + try { + tlKey.set(value); + return operation.get(); + } finally { + if (oldValue != null) { + tlKey.set(oldValue); + } else { + tlKey.remove(); + } + } + } + throw new IllegalArgumentException("Unsupported ContextValue type: " + key.getClass()); + } + + /** + * Executes the given operation with the context value bound to the specified value. + */ + static <T> void where(ContextValue<T> key, T value, Runnable operation) { + where(key, value, () -> { + operation.run(); + return null; + }); + } + + /** + * ThreadLocal-based implementation of ContextValue. + */ + static class ThreadLocalContextValue<T> implements ContextValue<T> { + private final String name; + private final ThreadLocal<T> threadLocal; + + ThreadLocalContextValue(String name) { + this.name = name; + this.threadLocal = new ThreadLocal<>(); + } + + ThreadLocalContextValue(String name, Supplier<T> supplier) { + this.name = name; + this.threadLocal = ThreadLocal.withInitial(supplier); + } + + @Override + public T get() { + return threadLocal.get(); + } + + @Override + public T orElse(T defaultValue) { + T value = threadLocal.get(); + return value != null ? value : defaultValue; + } + + @Override + public boolean isBound() { + return threadLocal.get() != null; + } + + @Override + public void set(T value) { + threadLocal.set(value); + } + + @Override + public void remove() { + threadLocal.remove(); + } + + @Override + public String name() { + return name; + } + + @Override + public String toString() { + return "ContextValue[" + name + "]"; + } + } +} diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/NamedThreadLocal.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/NamedThreadLocal.java index 8ca41278f89a..7a29362d603e 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/NamedThreadLocal.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/NamedThreadLocal.java @@ -20,7 +20,12 @@ import java.util.function.Supplier; /** * A {@link ThreadLocal} with an assigned name that makes introspection and debugging easier. + * + * @deprecated Use {@link ContextValue#newThreadLocal(String)} or {@link ContextValue#newThreadLocal(String, Supplier)} + * instead. The ContextValue API provides better abstraction and will automatically use ScopedValue when + * running with virtual threads on Java 21+. */ +@Deprecated(since = "4.17.0") public final class NamedThreadLocal<T> extends ThreadLocal<T> { private final String name; diff --git a/core/camel-util/src/main/java25/org/apache/camel/util/concurrent/ContextValueFactory.java b/core/camel-util/src/main/java25/org/apache/camel/util/concurrent/ContextValueFactory.java new file mode 100644 index 000000000000..d6ce66128e38 --- /dev/null +++ b/core/camel-util/src/main/java25/org/apache/camel/util/concurrent/ContextValueFactory.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.NoSuchElementException; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory for creating {@link ContextValue} instances. + * <p> + * This Java 25+ version uses ScopedValue when virtual threads are enabled, otherwise falls back to ThreadLocal. + */ +class ContextValueFactory { + + private static final Logger LOG = LoggerFactory.getLogger(ContextValueFactory.class); + private static final boolean USE_SCOPED_VALUES = shouldUseScopedValues(); + + static { + if (USE_SCOPED_VALUES) { + LOG.info("ContextValue will use ScopedValue for virtual thread optimization"); + } else { + LOG.debug("ContextValue will use ThreadLocal"); + } + } + + private static boolean shouldUseScopedValues() { + // Only use ScopedValue when virtual threads are enabled + // ScopedValue is immutable and designed for the "pass context through call chain" pattern + return ThreadType.current() == ThreadType.VIRTUAL; + } + + /** + * Creates a new context value with the given name. + * <p> + * Uses ScopedValue when virtual threads are enabled, otherwise ThreadLocal. + */ + static <T> ContextValue<T> newInstance(String name) { + if (USE_SCOPED_VALUES) { + return new ScopedValueContextValue<>(name); + } + return new ThreadLocalContextValue<>(name); + } + + /** + * Creates a new ThreadLocal-based context value with the given name. + * <p> + * This always uses ThreadLocal, regardless of virtual thread settings. + */ + static <T> ContextValue<T> newThreadLocal(String name) { + return new ThreadLocalContextValue<>(name); + } + + /** + * Creates a new ThreadLocal-based context value with the given name and initial value supplier. + * <p> + * This always uses ThreadLocal, regardless of virtual thread settings. + */ + static <T> ContextValue<T> newThreadLocal(String name, Supplier<T> supplier) { + return new ThreadLocalContextValue<>(name, supplier); + } + + /** + * Executes the given operation with the context value bound to the specified value. + */ + static <T, R> R where(ContextValue<T> key, T value, Supplier<R> operation) { + if (key instanceof ScopedValueContextValue<T> svKey) { + // In JDK 25+, ScopedValue.where() returns a Carrier that has get() method + return ScopedValue.where(svKey.scopedValue, value).get(operation); + } else if (key instanceof ThreadLocalContextValue<T> tlKey) { + T oldValue = tlKey.get(); + try { + tlKey.set(value); + return operation.get(); + } finally { + if (oldValue != null) { + tlKey.set(oldValue); + } else { + tlKey.remove(); + } + } + } + throw new IllegalArgumentException("Unsupported ContextValue type: " + key.getClass()); + } + + /** + * Executes the given operation with the context value bound to the specified value. + */ + static <T> void where(ContextValue<T> key, T value, Runnable operation) { + if (key instanceof ScopedValueContextValue<T> svKey) { + // In JDK 25+, ScopedValue.where() returns a Carrier that has run() method + ScopedValue.where(svKey.scopedValue, value).run(operation); + } else { + where(key, value, () -> { + operation.run(); + return null; + }); + } + } + + /** + * ScopedValue-based implementation of ContextValue (JDK 25+). + */ + static class ScopedValueContextValue<T> implements ContextValue<T> { + private final String name; + final ScopedValue<T> scopedValue; + + ScopedValueContextValue(String name) { + this.name = name; + this.scopedValue = ScopedValue.newInstance(); + } + + @Override + public T get() { + return scopedValue.get(); + } + + @Override + public T orElse(T defaultValue) { + return scopedValue.orElse(defaultValue); + } + + @Override + public boolean isBound() { + return scopedValue.isBound(); + } + + @Override + public void set(T value) { + throw new UnsupportedOperationException( + "ScopedValue is immutable. Use ContextValue.where() to bind values."); + } + + @Override + public void remove() { + throw new UnsupportedOperationException( + "ScopedValue is immutable. Values are automatically unbound when leaving the scope."); + } + + @Override + public String name() { + return name; + } + + @Override + public String toString() { + return "ContextValue[" + name + ",ScopedValue]"; + } + } + + /** + * ThreadLocal-based implementation of ContextValue. + */ + static class ThreadLocalContextValue<T> implements ContextValue<T> { + private final String name; + private final ThreadLocal<T> threadLocal; + + ThreadLocalContextValue(String name) { + this.name = name; + this.threadLocal = new ThreadLocal<>(); + } + + ThreadLocalContextValue(String name, Supplier<T> supplier) { + this.name = name; + this.threadLocal = ThreadLocal.withInitial(supplier); + } + + @Override + public T get() { + return threadLocal.get(); + } + + @Override + public T orElse(T defaultValue) { + T value = threadLocal.get(); + return value != null ? value : defaultValue; + } + + @Override + public boolean isBound() { + return threadLocal.get() != null; + } + + @Override + public void set(T value) { + threadLocal.set(value); + } + + @Override + public void remove() { + threadLocal.remove(); + } + + @Override + public String name() { + return name; + } + + @Override + public String toString() { + return "ContextValue[" + name + ",ThreadLocal]"; + } + } +} diff --git a/core/camel-util/src/test/java/org/apache/camel/util/concurrent/ContextValueTest.java b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/ContextValueTest.java new file mode 100644 index 000000000000..e7d55c1f83e2 --- /dev/null +++ b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/ContextValueTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class ContextValueTest { + + @Test + public void testBasicUsage() { + ContextValue<String> routeId = ContextValue.newInstance("routeId"); + + // Initially not bound + assertFalse(routeId.isBound()); + assertNull(routeId.orElse(null)); + assertEquals("default", routeId.orElse("default")); + + // Bind a value + ContextValue.where(routeId, "myRoute", () -> { + assertTrue(routeId.isBound()); + assertEquals("myRoute", routeId.get()); + assertEquals("myRoute", routeId.orElse("default")); + }); + + // After scope, not bound anymore (for ScopedValue) or still bound (for ThreadLocal) + // We can't assert this reliably as it depends on the implementation + } + + @Test + public void testNestedScopes() { + ContextValue<String> routeId = ContextValue.newInstance("routeId"); + + ContextValue.where(routeId, "outer", () -> { + assertEquals("outer", routeId.get()); + + ContextValue.where(routeId, "inner", () -> { + assertEquals("inner", routeId.get()); + }); + + // After inner scope, should be back to outer + assertEquals("outer", routeId.get()); + }); + } + + @Test + public void testMultipleContextValues() { + ContextValue<String> routeId = ContextValue.newInstance("routeId"); + ContextValue<String> exchangeId = ContextValue.newInstance("exchangeId"); + + ContextValue.where(routeId, "route1", () -> { + ContextValue.where(exchangeId, "exchange1", () -> { + assertEquals("route1", routeId.get()); + assertEquals("exchange1", exchangeId.get()); + }); + }); + } + + @Test + public void testThreadLocalContextValue() { + // ThreadLocal-based context values support mutation + ContextValue<String> mutableValue = ContextValue.newThreadLocal("mutable"); + + assertFalse(mutableValue.isBound()); + + mutableValue.set("value1"); + assertTrue(mutableValue.isBound()); + assertEquals("value1", mutableValue.get()); + + mutableValue.set("value2"); + assertEquals("value2", mutableValue.get()); + + mutableValue.remove(); + assertFalse(mutableValue.isBound()); + } + + @Test + public void testWhereWithSupplier() { + ContextValue<String> routeId = ContextValue.newInstance("routeId"); + + String result = ContextValue.where(routeId, "myRoute", () -> { + return "Result: " + routeId.get(); + }); + + assertEquals("Result: myRoute", result); + } + + @Test + public void testThreadIsolation() throws Exception { + ContextValue<String> routeId = ContextValue.newInstance("routeId"); + + // Set value in main thread + ContextValue.where(routeId, "mainThread", () -> { + assertEquals("mainThread", routeId.get()); + + // Create a new thread - it should not see the value (for ScopedValue) + // or see null (for ThreadLocal without inheritance) + Thread thread = new Thread(() -> { + // The value should not be visible in the new thread + assertNull(routeId.orElse(null)); + }); + thread.start(); + try { + thread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Main thread should still see its value + assertEquals("mainThread", routeId.get()); + }); + } + + @Test + public void testName() { + ContextValue<String> value = ContextValue.newInstance("testName"); + assertEquals("testName", value.name()); + assertTrue(value.toString().contains("testName")); + } +}
