This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5e5c68000470fa3f1b0ec5f6f33b58c5894875b7 Author: Claus Ibsen <[email protected]> AuthorDate: Sat May 9 11:53:40 2020 +0200 CAMEL-15034: SupervisingRouteController - Allow to easily filter routes --- .../camel/spi/SupervisingRouteController.java | 30 +++ .../engine/DefaultSupervisingRouteController.java | 205 +++++++++++---------- .../engine/SupervisingRouteControllerFilters.java | 55 ------ .../camel/main/DefaultConfigurationConfigurer.java | 6 + .../camel/main/DefaultConfigurationProperties.java | 40 ++++ ...gRouteControllerFilterFailToStartRouteTest.java | 108 +++++++++++ 6 files changed, 295 insertions(+), 149 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java index 2f6b50d..0e16f68 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/SupervisingRouteController.java @@ -24,6 +24,36 @@ package org.apache.camel.spi; */ public interface SupervisingRouteController extends RouteController { + String getIncludeRoutes(); + + /** + * Pattern for filtering routes to be included as supervised. + * + * The pattern is matching on route id, and endpoint uri for the route. + * Multiple patterns can be separated by comma. + * + * For example to include all kafka routes, you can say <tt>kafka:*</tt>. + * And to include routes with specific route ids <tt>myRoute,myOtherRoute</tt>. + * The pattern supports wildcards and uses the matcher from + * org.apache.camel.support.PatternHelper#matchPattern. + */ + void setIncludeRoutes(String includeRoutes); + + String getExcludeRoutes(); + + /** + * Pattern for filtering routes to be excluded as supervised. + * + * The pattern is matching on route id, and endpoint uri for the route. + * Multiple patterns can be separated by comma. + * + * For example to exclude all JMS routes, you can say <tt>jms:*</tt>. + * And to exclude routes with specific route ids <tt>mySpecialRoute,myOtherSpecialRoute</tt>. + * The pattern supports wildcards and uses the matcher from + * org.apache.camel.support.PatternHelper#matchPattern. + */ + void setExcludeRoutes(String excludeRoutes); + int getThreadPoolSize(); /** diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java index cfdca87..7c1c0e5 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java @@ -17,9 +17,8 @@ package org.apache.camel.impl.engine; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,23 +30,22 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedStartupListener; +import org.apache.camel.FailedToCreateRouteException; +import org.apache.camel.FailedToStartRouteException; import org.apache.camel.NamedNode; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; import org.apache.camel.ServiceStatus; -import org.apache.camel.StartupListener; -import org.apache.camel.spi.CamelEvent; -import org.apache.camel.spi.CamelEvent.CamelContextStartedEvent; import org.apache.camel.spi.HasId; import org.apache.camel.spi.RouteController; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.spi.SupervisingRouteController; -import org.apache.camel.support.EventNotifierSupport; +import org.apache.camel.support.PatternHelper; import org.apache.camel.support.RoutePolicySupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.backoff.BackOff; @@ -68,13 +66,15 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im private final Object lock; private final AtomicBoolean contextStarted; private final AtomicInteger routeCount; - private final List<Filter> filters; private final Set<RouteHolder> routes; + private final Set<String> nonSupervisedRoutes; private final RouteManager routeManager; private volatile CamelContextStartupListener listener; private volatile BackOffTimer timer; private volatile ScheduledExecutorService executorService; private volatile BackOff backOff; + private String includeRoutes; + private String excludeRoutes; private int threadPoolSize = 1; private long initialDelay; private long backOffDelay = 2000; @@ -86,9 +86,9 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im public DefaultSupervisingRouteController() { this.lock = new Object(); this.contextStarted = new AtomicBoolean(false); - this.filters = new ArrayList<>(); this.routeCount = new AtomicInteger(0); this.routes = new TreeSet<>(); + this.nonSupervisedRoutes = new HashSet<>(); this.routeManager = new RouteManager(); } @@ -96,12 +96,26 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im // Properties // ********************************* - @Override + public String getIncludeRoutes() { + return includeRoutes; + } + + public void setIncludeRoutes(String includeRoutes) { + this.includeRoutes = includeRoutes; + } + + public String getExcludeRoutes() { + return excludeRoutes; + } + + public void setExcludeRoutes(String excludeRoutes) { + this.excludeRoutes = excludeRoutes; + } + public int getThreadPoolSize() { return threadPoolSize; } - @Override public void setThreadPoolSize(int threadPoolSize) { this.threadPoolSize = threadPoolSize; } @@ -154,28 +168,6 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im this.backOffMultiplier = backOffMultiplier; } - /** - * Add a filter used to determine the routes to supervise. - */ - @Deprecated - public void addFilter(Filter filter) { - this.filters.add(filter); - } - - /** - * Sets the filters user to determine the routes to supervise. - */ - @Deprecated - public void setFilters(Collection<Filter> filters) { - this.filters.clear(); - this.filters.addAll(filters); - } - - @Deprecated - public Collection<Filter> getFilters() { - return Collections.unmodifiableList(filters); - } - protected BackOff getBackOff(String id) { // currently all routes use the same backoff return backOff; @@ -196,16 +188,10 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im this.listener = new CamelContextStartupListener(); CamelContext context = getCamelContext(); + // prevent routes from automatic being started by default context.setAutoStartup(false); context.addRoutePolicyFactory(new ManagedRoutePolicyFactory()); context.addStartupListener(this.listener); - context.getManagementStrategy().addEventNotifier(this.listener); - - try { - this.listener.start(); - } catch (Exception e) { - throw RuntimeCamelException.wrapRuntimeException(e); - } } @Override @@ -228,13 +214,6 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im } } - @Override - protected void doShutdown() throws Exception { - if (getCamelContext() != null) { - getCamelContext().getManagementStrategy().removeEventNotifier(listener); - } - } - // ********************************* // Route management // ********************************* @@ -388,7 +367,33 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im } } - private void startRoutes() { + private void startNonSupervisedRoutes() throws Exception { + if (!isRunAllowed()) { + return; + } + + final List<String> routeList; + + synchronized (lock) { + routeList = routes.stream() + .filter(r -> r.getStatus() == ServiceStatus.Stopped) + .filter(r -> !isSupervised(r.route)) + .map(RouteHolder::getId) + .collect(Collectors.toList()); + } + + for (String route : routeList) { + try { + // let non supervising controller start the route by calling super + LOG.debug("Starting non-supervised route {}", route); + super.startRoute(route); + } catch (Exception e) { + throw new FailedToStartRouteException(route, e.getMessage(), e); + } + } + } + + private void startSupervisedRoutes() { if (!isRunAllowed()) { return; } @@ -398,10 +403,12 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im synchronized (lock) { routeList = routes.stream() .filter(r -> r.getStatus() == ServiceStatus.Stopped) + .filter(r -> isSupervised(r.route)) .map(RouteHolder::getId) .collect(Collectors.toList()); } + LOG.debug("Starting {} supervised routes", routeList.size()); for (String route: routeList) { try { startRoute(route); @@ -417,6 +424,10 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im ); } + private boolean isSupervised(Route route) { + return !nonSupervisedRoutes.contains(route.getId()); + } + // ********************************* // RouteChecker // ********************************* @@ -602,11 +613,44 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im return; } - for (Filter filter : filters) { - FilterResult result = filter.apply(route); - - if (!result.supervised()) { - LOG.info("Route: {} will not be supervised (Reason: {})", route.getId(), result.reason()); + // exclude takes precedence + if (excludeRoutes != null) { + for (String part : excludeRoutes.split(",")) { + String id = route.getRouteId(); + String uri = route.getEndpoint().getEndpointUri(); + boolean exclude = PatternHelper.matchPattern(id, part) || PatternHelper.matchPattern(uri, part); + if (exclude) { + LOG.debug("Route: {} excluded from being supervised", route.getId()); + RouteHolder holder = new RouteHolder(route, routeCount.incrementAndGet()); + if (routes.add(holder)) { + nonSupervisedRoutes.add(route.getId()); + holder.get().setRouteController(DefaultSupervisingRouteController.this); + // this route should be started + holder.get().setAutoStartup(true); + } + return; + } + } + } + if (includeRoutes != null) { + boolean include = false; + for (String part : includeRoutes.split(",")) { + String id = route.getRouteId(); + String uri = route.getEndpoint().getEndpointUri(); + include = PatternHelper.matchPattern(id, part) || PatternHelper.matchPattern(uri, part); + if (include) { + break; + } + } + if (!include) { + LOG.debug("Route: {} excluded from being supervised", route.getId()); + RouteHolder holder = new RouteHolder(route, routeCount.incrementAndGet()); + if (routes.add(holder)) { + nonSupervisedRoutes.add(route.getId()); + holder.get().setRouteController(DefaultSupervisingRouteController.this); + // this route should be started + holder.get().setAutoStartup(true); + } return; } } @@ -643,19 +687,20 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im } - private class CamelContextStartupListener extends EventNotifierSupport implements StartupListener { + private class CamelContextStartupListener implements ExtendedStartupListener { + @Override - public void notify(CamelEvent event) throws Exception { - onCamelContextStarted(); + public void onCamelContextStarting(CamelContext context, boolean alreadyStarted) throws Exception { + // noop } @Override - public boolean isEnabled(CamelEvent event) { - return event instanceof CamelContextStartedEvent; + public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { + // noop } @Override - public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { + public void onCamelContextFullyStarted(CamelContext context, boolean alreadyStarted) throws Exception { if (alreadyStarted) { // Invoke it only if the context was already started as this // method is not invoked at last event as documented but after @@ -671,52 +716,24 @@ public class DefaultSupervisingRouteController extends DefaultRouteController im } } - private void onCamelContextStarted() { + private void onCamelContextStarted() throws Exception { // Start managing the routes only when the camel context is started // so start/stop of managed routes do not clash with CamelContext // startup if (contextStarted.compareAndSet(false, true)) { + // start non supervised routes first as if they fail then + // camel context fails to start which is the behaviour of non-supervised routes + startNonSupervisedRoutes(); // Eventually delay the startup of the routes a later time if (initialDelay > 0) { - LOG.debug("Routes will be started in {} millis", initialDelay); - executorService.schedule(DefaultSupervisingRouteController.this::startRoutes, initialDelay, TimeUnit.MILLISECONDS); + LOG.debug("Supervised routes will be started in {} millis", initialDelay); + executorService.schedule(DefaultSupervisingRouteController.this::startSupervisedRoutes, initialDelay, TimeUnit.MILLISECONDS); } else { - startRoutes(); + startSupervisedRoutes(); } } } } - // ********************************* - // Filter - // ********************************* - - public static class FilterResult { - public static final FilterResult SUPERVISED = new FilterResult(true, null); - - private final boolean controlled; - private final String reason; - - public FilterResult(boolean controlled, String reason) { - this.controlled = controlled; - this.reason = reason; - } - - public FilterResult(boolean controlled, String format, Object... args) { - this(controlled, String.format(format, args)); - } - - public boolean supervised() { - return controlled; - } - - public String reason() { - return reason; - } - } - - public interface Filter extends Function<Route, FilterResult> { - } - } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SupervisingRouteControllerFilters.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SupervisingRouteControllerFilters.java deleted file mode 100644 index e531636..0000000 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SupervisingRouteControllerFilters.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl.engine; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.camel.Route; - -/** - * A {@link DefaultSupervisingRouteController.Filter} which blacklists routes. - */ -public final class SupervisingRouteControllerFilters { - - private SupervisingRouteControllerFilters() { - } - - public static final class BlackList implements DefaultSupervisingRouteController.Filter { - private final Set<String> names; - - public BlackList(String name) { - this(Collections.singletonList(name)); - } - - public BlackList(Collection<String> names) { - this.names = new HashSet<>(names); - } - - @Override - public DefaultSupervisingRouteController.FilterResult apply(Route route) { - boolean supervised = !names.contains(route.getId()); - - return new DefaultSupervisingRouteController.FilterResult( - supervised, - supervised ? null : "black-listed" - ); - } - } -} diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java index 24dc774..38fda13 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java @@ -182,6 +182,12 @@ public final class DefaultConfigurationConfigurer { if (config.isRouteControllerEnabled()) { SupervisingRouteController src = camelContext.adapt(ExtendedCamelContext.class).getSupervisingRouteController(); src.setCamelContext(camelContext); + if (config.getRouteControllerIncludeRoutes() != null) { + src.setIncludeRoutes(config.getRouteControllerIncludeRoutes()); + } + if (config.getRouteControllerExcludeRoutes() != null) { + src.setExcludeRoutes(config.getRouteControllerExcludeRoutes()); + } if (config.getRouteControllerThreadPoolSize() > 0) { src.setThreadPoolSize(config.getRouteControllerThreadPoolSize()); } diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java index d6597e2..42e80aa 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java @@ -82,6 +82,8 @@ public abstract class DefaultConfigurationProperties<T> { private String xmlRests = "classpath:camel-rest/*.xml"; private boolean lightweight; private boolean routeControllerEnabled; + private String routeControllerIncludeRoutes; + private String routeControllerExcludeRoutes; private int routeControllerThreadPoolSize; private long routeControllerInitialDelay; private long routeControllerBackOffDelay; @@ -916,6 +918,44 @@ public abstract class DefaultConfigurationProperties<T> { this.routeControllerEnabled = routeControllerEnabled; } + public String getRouteControllerIncludeRoutes() { + return routeControllerIncludeRoutes; + } + + /** + * Pattern for filtering routes to be excluded as supervised. + * + * The pattern is matching on route id, and endpoint uri for the route. + * Multiple patterns can be separated by comma. + * + * For example to exclude all JMS routes, you can say <tt>jms:*</tt>. + * And to exclude routes with specific route ids <tt>mySpecialRoute,myOtherSpecialRoute</tt>. + * The pattern supports wildcards and uses the matcher from + * org.apache.camel.support.PatternHelper#matchPattern. + */ + public void setRouteControllerIncludeRoutes(String routeControllerIncludeRoutes) { + this.routeControllerIncludeRoutes = routeControllerIncludeRoutes; + } + + public String getRouteControllerExcludeRoutes() { + return routeControllerExcludeRoutes; + } + + /** + * Pattern for filtering routes to be included as supervised. + * + * The pattern is matching on route id, and endpoint uri for the route. + * Multiple patterns can be separated by comma. + * + * For example to include all kafka routes, you can say <tt>kafka:*</tt>. + * And to include routes with specific route ids <tt>myRoute,myOtherRoute</tt>. + * The pattern supports wildcards and uses the matcher from + * org.apache.camel.support.PatternHelper#matchPattern. + */ + public void setRouteControllerExcludeRoutes(String routeControllerExcludeRoutes) { + this.routeControllerExcludeRoutes = routeControllerExcludeRoutes; + } + public int getRouteControllerThreadPoolSize() { return routeControllerThreadPoolSize; } diff --git a/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerFilterFailToStartRouteTest.java b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerFilterFailToStartRouteTest.java new file mode 100644 index 0000000..c1fae8f --- /dev/null +++ b/core/camel-main/src/test/java/org/apache/camel/main/MainSupervisingRouteControllerFilterFailToStartRouteTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.main; + +import java.util.Map; + +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.FailedToStartRouteException; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.seda.SedaComponent; +import org.apache.camel.component.seda.SedaConsumer; +import org.apache.camel.component.seda.SedaEndpoint; +import org.junit.Assert; +import org.junit.Test; + +public class MainSupervisingRouteControllerFilterFailToStartRouteTest extends Assert { + + @Test + public void testMain() throws Exception { + // lets make a simple route + Main main = new Main(); + main.configure().addRoutesBuilder(new MyRoute()); + main.configure().setRouteControllerEnabled(true); + main.configure().setRouteControllerBackOffDelay(250); + main.configure().setRouteControllerBackOffMaxAttempts(3); + main.configure().setRouteControllerInitialDelay(1000); + main.configure().setRouteControllerThreadPoolSize(2); + main.configure().setRouteControllerExcludeRoutes("inbox"); + + try { + main.start(); + fail("Should fail"); + } catch (FailedToStartRouteException e) { + assertEquals("inbox", e.getRouteId()); + } + + main.stop(); + } + + private class MyRoute extends RouteBuilder { + @Override + public void configure() throws Exception { + getContext().addComponent("jms", new MyJmsComponent()); + + from("timer:foo").to("mock:foo").routeId("foo"); + + from("jms:cheese").to("mock:cheese").routeId("cheese"); + + from("jms:cake").to("mock:cake").routeId("cake"); + + from("jms:inbox").routeId("inbox").to("mock:inbox"); + } + } + + private class MyJmsComponent extends SedaComponent { + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + return new MyJmsEndpoint(); + } + } + + private class MyJmsEndpoint extends SedaEndpoint { + + public MyJmsEndpoint() { + super(); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new MyJmsConsumer(this, processor); + } + + @Override + protected String createEndpointUri() { + return "jms:cheese"; + } + } + + private class MyJmsConsumer extends SedaConsumer { + + public MyJmsConsumer(SedaEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected void doStart() throws Exception { + throw new IllegalArgumentException("Cannot start"); + } + } + +}
