This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit bb9d1a6fb2b86670ecdae7c23ad2128113b71319 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Jan 22 13:29:31 2021 +0100 CAMEL-15844: camel-core - Optimize Route to move its setup (init) logic to the init phase of CamelContext, so they are initialized together. --- .../apache/camel/component/ref/RefEndpoint.java | 19 ++-- .../src/main/java/org/apache/camel/Route.java | 5 +- .../camel/impl/engine/AbstractCamelContext.java | 2 + .../org/apache/camel/impl/engine/DefaultRoute.java | 26 ++++- .../impl/engine/InternalRouteStartupManager.java | 37 ++++++- .../org/apache/camel/impl/engine/RouteService.java | 113 +++++++++++++-------- 6 files changed, 140 insertions(+), 62 deletions(-) diff --git a/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java b/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java index 912d65d..4a8645d 100644 --- a/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java +++ b/components/camel-ref/src/main/java/org/apache/camel/component/ref/RefEndpoint.java @@ -69,22 +69,17 @@ public class RefEndpoint extends DefaultEndpoint implements DelegateEndpoint { @Override public Endpoint getEndpoint() { - if (endpoint == null) { - endpoint = CamelContextHelper.mandatoryLookup(getCamelContext(), name, Endpoint.class); - } return endpoint; } @Override - protected void doStart() throws Exception { - // add the endpoint to the endpoint registry - getCamelContext().addEndpoint(getEndpoint().getEndpointUri(), getEndpoint()); - super.doStart(); + protected void doInit() throws Exception { + if (endpoint == null) { + // endpoint is mandatory + endpoint = CamelContextHelper.mandatoryLookup(getCamelContext(), name, Endpoint.class); + getCamelContext().addEndpoint(getEndpoint().getEndpointUri(), endpoint); + } + super.doInit(); } - @Override - protected void doStop() throws Exception { - super.doStop(); - // noop - } } diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java index 9f14a1c..808dc35 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Route.java +++ b/core/camel-api/src/main/java/org/apache/camel/Route.java @@ -130,12 +130,11 @@ public interface Route extends RuntimeConfiguration { Endpoint getEndpoint(); /** - * A strategy callback allowing special initialization when services are starting. + * A strategy callback allowing special initialization when services are initializing. * - * @param services the service * @throws Exception is thrown in case of error */ - void onStartingServices(List<Service> services) throws Exception; + void initializeServices() throws Exception; /** * Returns the services for this particular route diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 3971cc5..036e98b 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -2712,6 +2712,8 @@ public abstract class AbstractCamelContext extends BaseService StartupStep subStep = startupStepRecorder.beginStep(CamelContext.class, getName(), "Initializing routes"); // the method is called start but at this point it will only initialize (as context is starting up) startRouteDefinitions(); + // this will init route definitions and populate as route services which we can then initialize now + internalRouteStartupManager.doInitRoutes(routeServices); startupStepRecorder.endStep(subStep); if (!lifecycleStrategies.isEmpty()) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java index 9d55f0f..fd877b0 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java @@ -48,6 +48,7 @@ import org.apache.camel.spi.RouteError; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.support.PatternHelper; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.TimeUtils; @@ -156,8 +157,9 @@ public class DefaultRoute extends ServiceSupport implements Route { } @Override - public void onStartingServices(List<Service> services) throws Exception { - addServices(services); + public void initializeServices() throws Exception { + // gather all the services for this route + gatherServices(services); } @Override @@ -174,7 +176,7 @@ public class DefaultRoute extends ServiceSupport implements Route { @Override public void warmUp() { - getServices().clear(); + // noop } /** @@ -570,7 +572,23 @@ public class DefaultRoute extends ServiceSupport implements Route { * Factory method to lazily create the complete list of services required for this route such as adding the * processor or consumer */ - protected void addServices(List<Service> services) throws Exception { + protected void gatherServices(List<Service> services) throws Exception { + // first gather the root services + gatherRootServices(services); + // and then all the child services + List<Service> children = new ArrayList<>(); + for (Service service : services) { + Set<Service> extra = ServiceHelper.getChildServices(service); + children.addAll(extra); + } + for (Service extra : children) { + if (!services.contains(extra)) { + services.add(extra); + } + } + } + + protected void gatherRootServices(List<Service> services) throws Exception { Endpoint endpoint = getEndpoint(); consumer = endpoint.createConsumer(processor); if (consumer != null) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java index fecd08b1..f3dbf02 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/InternalRouteStartupManager.java @@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory; /** * Internal route startup manager used by {@link AbstractCamelContext} to safely start internal route services during * starting routes. - * + * <p> * This code has been refactored out of {@link AbstractCamelContext} to its own class. */ class InternalRouteStartupManager { @@ -70,6 +70,37 @@ class InternalRouteStartupManager { } /** + * Initializes the routes + * + * @param routeServices the routes to initialize + * @throws Exception is thrown if error initializing routes + */ + protected void doInitRoutes(Map<String, RouteService> routeServices) + throws Exception { + + abstractCamelContext.setStartingRoutes(true); + try { + for (RouteService routeService : routeServices.values()) { + StartupStep step = abstractCamelContext.getStartupStepRecorder().beginStep(Route.class, routeService.getId(), + "Initializing Route"); + try { + LOG.debug("Initializing route id: {}", routeService.getId()); + setupRoute.set(routeService.getRoute()); + // initializing route is called doSetup as we do not want to change the service state on the RouteService + // so it can remain as stopped, when Camel is booting as this was the previous behavior - otherwise its state + // would be initialized + routeService.setUp(); + } finally { + setupRoute.remove(); + abstractCamelContext.getStartupStepRecorder().endStep(step); + } + } + } finally { + abstractCamelContext.setStartingRoutes(false); + } + } + + /** * Starts or resumes the routes * * @param routeServices the routes to start (will only start a route if its not already started) @@ -200,7 +231,7 @@ class InternalRouteStartupManager { } /** - * @see #safelyStartRouteServices(boolean,boolean,boolean,boolean,Collection) + * @see #safelyStartRouteServices(boolean, boolean, boolean, boolean, Collection) */ protected synchronized void safelyStartRouteServices( boolean forceAutoStart, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes, @@ -270,6 +301,8 @@ class InternalRouteStartupManager { try { LOG.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup); setupRoute.set(routeService.getRoute()); + // ensure we setup before warmup + routeService.setUp(); routeService.warmUp(); } finally { setupRoute.remove(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java index c55c272..2b9fc9f 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java @@ -57,6 +57,7 @@ public class RouteService extends ChildServiceSupport { private final Route route; private boolean removingRoutes; private Consumer input; + private final AtomicBoolean setUpDone = new AtomicBoolean(); private final AtomicBoolean warmUpDone = new AtomicBoolean(); private final AtomicBoolean endpointDone = new AtomicBoolean(); @@ -117,6 +118,16 @@ public class RouteService extends ChildServiceSupport { } } + public void setUp() throws FailedToStartRouteException { + if (setUpDone.compareAndSet(false, true)) { + try { + doSetup(); + } catch (Exception e) { + throw new FailedToStartRouteException(getId(), route.getDescription(), e); + } + } + } + public boolean isAutoStartup() { if (!getCamelContext().isAutoStartup()) { return false; @@ -124,6 +135,43 @@ public class RouteService extends ChildServiceSupport { return getRoute().isAutoStartup(); } + protected synchronized void doSetup() throws Exception { + // to setup we initialize the services + ServiceHelper.initService(route.getEndpoint()); + + try (MDCHelper mdcHelper = new MDCHelper(route.getId())) { + + // ensure services are initialized first + route.initializeServices(); + List<Service> services = route.getServices(); + + // split into consumers and child services as we need to start the consumers + // afterwards to avoid them being active while the others start + List<Service> list = new ArrayList<>(); + for (Service service : services) { + + // inject the route + if (service instanceof RouteAware) { + ((RouteAware) service).setRoute(route); + } + if (service instanceof RouteIdAware) { + ((RouteIdAware) service).setRouteId(route.getId()); + } + // inject camel context + if (service instanceof CamelContextAware) { + ((CamelContextAware) service).setCamelContext(camelContext); + } + + if (service instanceof Consumer) { + this.input = (Consumer) service; + } else { + list.add(service); + } + } + initChildServices(route, list); + } + } + protected synchronized void doWarmUp() throws Exception { if (endpointDone.compareAndSet(false, true)) { // endpoints should only be started once as they can be reused on other routes @@ -138,41 +186,7 @@ public class RouteService extends ChildServiceSupport { // warm up the route first route.warmUp(); - List<Service> services = route.getServices(); - - // callback that we are staring these services - route.onStartingServices(services); - - // gather list of services to start as we need to start child services as well - Set<Service> list = new LinkedHashSet<>(); - for (Service service : services) { - list.addAll(ServiceHelper.getChildServices(service)); - } - - // split into consumers and child services as we need to start the consumers - // afterwards to avoid them being active while the others start - List<Service> childServices = new ArrayList<>(); - for (Service service : list) { - - // inject the route - if (service instanceof RouteAware) { - ((RouteAware) service).setRoute(route); - } - if (service instanceof RouteIdAware) { - ((RouteIdAware) service).setRouteId(route.getId()); - } - // inject camel context - if (service instanceof CamelContextAware) { - ((CamelContextAware) service).setCamelContext(camelContext); - } - - if (service instanceof Consumer) { - this.input = (Consumer) service; - } else { - childServices.add(service); - } - } - startChildService(route, childServices); + startChildServices(route, childServices); // fire event EventHelper.notifyRouteAdded(camelContext, route); @@ -199,6 +213,7 @@ public class RouteService extends ChildServiceSupport { } try { + // ensure we are warmed up warmUp(); } catch (FailedToStartRouteException e) { throw RuntimeCamelException.wrapRuntimeException(e); @@ -234,11 +249,12 @@ public class RouteService extends ChildServiceSupport { } try (MDCHelper mdcHelper = new MDCHelper(route.getId())) { + // TODO: childrenService + some more // gather list of services to stop as we need to start child services as well Set<Service> services = gatherChildServices(); // stop services - stopChildService(route, services, isShutdownCamelContext); + stopChildServices(route, services, isShutdownCamelContext); // stop the route itself if (isShutdownCamelContext) { @@ -256,18 +272,25 @@ public class RouteService extends ChildServiceSupport { if (isRemovingRoutes()) { camelContext.adapt(ExtendedCamelContext.class).removeRoute(route); } - // need to warm up again + // need to redo if we start again after being stopped + input = null; + childServices = null; + warmUpDone.set(false); + setUpDone.set(false); + endpointDone.set(false); + setUpDone.set(false); warmUpDone.set(false); } @Override protected void doShutdown() { try (MDCHelper mdcHelper = new MDCHelper(route.getId())) { + // TODO: childrenService + some more // gather list of services to stop as we need to start child services as well Set<Service> services = gatherChildServices(); // shutdown services - stopChildService(route, services, true); + stopChildServices(route, services, true); // shutdown the route itself ServiceHelper.stopAndShutdownServices(route); @@ -296,7 +319,9 @@ public class RouteService extends ChildServiceSupport { // clear inputs on shutdown input = null; + childServices = null; warmUpDone.set(false); + setUpDone.set(false); endpointDone.set(false); } @@ -326,17 +351,23 @@ public class RouteService extends ChildServiceSupport { } } - protected void startChildService(Route route, List<Service> services) { + protected void initChildServices(Route route, List<Service> services) { + for (Service service : services) { + ServiceHelper.initService(service); + addChildService(service); + } + } + + protected void startChildServices(Route route, List<Service> services) { for (Service service : services) { for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { strategy.onServiceAdd(camelContext, service, route); } ServiceHelper.startService(service); - addChildService(service); } } - protected void stopChildService(Route route, Set<Service> services, boolean shutdown) { + protected void stopChildServices(Route route, Set<Service> services, boolean shutdown) { for (Service service : services) { for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { strategy.onServiceRemove(camelContext, service, route);
