Repository: camel Updated Branches: refs/heads/master 33063a86c -> 34fdcb3ca
CAMEL-10426 - Added CuratorMultiMasterLeaderRoutePolicy Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/34fdcb3c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/34fdcb3c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/34fdcb3c Branch: refs/heads/master Commit: 34fdcb3ca67aebad9af2a05ee737f32f8d5f5982 Parents: 33063a8 Author: Paolo Antinori <[email protected]> Authored: Wed Nov 9 15:07:32 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Wed Nov 9 22:24:35 2016 +0100 ---------------------------------------------------------------------- components/camel-zookeeper/pom.xml | 16 + .../src/main/docs/zookeeper-component.adoc | 9 +- .../CuratorMultiMasterLeaderElection.java | 171 +++++++ .../CuratorMultiMasterLeaderRoutePolicy.java | 191 ++++++++ ...MultiMasterCuratorLeaderRoutePolicyTest.java | 460 +++++++++++++++++++ .../src/test/resources/log4j2.properties | 10 +- 6 files changed, 854 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/pom.xml b/components/camel-zookeeper/pom.xml index 2021d2c..08dc904 100644 --- a/components/camel-zookeeper/pom.xml +++ b/components/camel-zookeeper/pom.xml @@ -105,4 +105,20 @@ </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <childDelegation>false</childDelegation> + <useFile>true</useFile> + <forkCount>1</forkCount> + <reuseForks>true</reuseForks> + <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + </plugins> + </build> + + </project> http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc index e569ea2..5f8d5cf 100644 --- a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc +++ b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc @@ -269,18 +269,23 @@ from("direct:policy-controlled") .to("mock:controlled"); ---- -There are currently 2 policies defined in the component, with different SLAs: +There are currently 3 policies defined in the component, with different SLAs: * `ZooKeeperRoutePolicy` * `CuratorLeaderRoutePolicy` (since *2.19*) +* `MultiMasterCuratorLeaderRoutePolicy` (since *2.19*) *ZooKeeperRoutePolicy* supports multiple active nodes, but it's activation kicks in only after a Camel component and its correspondent Consumer have already been started, this introduces, depending on your routes definition, the risk that you component can already start consuming events and producing `Exchange`s, before the policy could estabilish that the node should not be activated. - + *CuratorLeaderRoutePolicy* supports only a single active node, but it's bound to a different `CamelContext` lifecycle method; this Policy kicks in before any route or consumer is started thus you can be sure that no even is processed before the Policy takes its decision. +*MultiMasterCuratorLeaderRoutePolicy* support multiple active nodes, and it's bound to the same lifecycle method as `CuratorLeaderRoutePolicy`; this Policy kicks in before any route or consumer is started + thus you can be sure that no even is processed before the Policy takes its decision. + + [[Zookeeper-SeeAlso]] See Also ^^^^^^^^ http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java new file mode 100644 index 0000000..9c91b73 --- /dev/null +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.policy; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.camel.CamelContext; +import org.apache.camel.StatefulService; +import org.apache.camel.impl.JavaUuidGenerator; +import org.apache.camel.spi.UuidGenerator; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2; +import org.apache.curator.framework.recipes.locks.Lease; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <code>CuratorMultiMasterLeaderElection</code> uses the leader election capabilities of a + * ZooKeeper cluster to control which nodes are enabled. It is typically used in + * fail-over scenarios controlling identical instances of an application across + * a cluster of Camel based servers. <p> The election is configured providing the number of instances that are required + * to be active.. + * <p> All instances of the election must also be configured with the same path on the ZooKeeper + * cluster where the election will be carried out. It is good practice for this + * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note + * that these nodes should exist before using the election. <p> See <a + * href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection"> + * for more on how Leader election</a> is archived with ZooKeeper. + */ +public class CuratorMultiMasterLeaderElection implements ConnectionStateListener { + + private static final Logger LOG = LoggerFactory.getLogger(CuratorMultiMasterLeaderElection.class); + + private final String candidateName; + private final List<ElectionWatcher> watchers = new ArrayList<ElectionWatcher>(); + private final int desiredActiveNodes; + private AtomicBoolean activeNode = new AtomicBoolean(false); + private UuidGenerator uuidGenerator = new JavaUuidGenerator(); + private InterProcessSemaphoreV2 leaderSelector; + private CuratorFramework client; + private Lease lease; + + public CuratorMultiMasterLeaderElection(String uri, int desiredActiveNodes) { + this.candidateName = createCandidateName(); + this.desiredActiveNodes = desiredActiveNodes; + + String connectionString = uri.substring(1 + uri.indexOf(':')).split("/")[0]; + String protocol = uri.substring(0, uri.indexOf(':')); + String path = uri.replace(protocol + ":" + connectionString, ""); + client = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3)); + client.getConnectionStateListenable().addListener(this); + leaderSelector = new InterProcessSemaphoreV2(client, path, this.desiredActiveNodes); + client.start(); + + + } + + // stolen from org/apache/camel/processor/CamelInternalProcessor + public static boolean isCamelStopping(CamelContext context) { + if (context instanceof StatefulService) { + StatefulService ss = (StatefulService) context; + return ss.isStopping() || ss.isStopped(); + } + return false; + } + + public void shutdownClients() { + try { + leaderSelector.returnLease(lease); + } finally { + client.close(); + } + } + + /* + * Blocking method + */ + public void requestResource() { + LOG.info("Requested to become active from {}", candidateName); + try { + lease = leaderSelector.acquire(); + } catch (Exception e) { + throw new RuntimeException("Unable to obtain access to become a leader node."); + } + LOG.info("{} is now active", candidateName); + activeNode.set(true); + notifyElectionWatchers(); + } + + public boolean isMaster() { + return activeNode.get(); + } + + private String createCandidateName() { + StringBuilder builder = new StringBuilder(); + try { + /* UUID would be enough, also using hostname for human readability */ + builder.append(InetAddress.getLocalHost().getCanonicalHostName()); + } catch (UnknownHostException ex) { + LOG.warn("Failed to get the local hostname.", ex); + builder.append("unknown-host"); + } + builder.append("-").append(uuidGenerator.generateUuid()); + return builder.toString(); + } + + public String getCandidateName() { + return candidateName; + } + + private void notifyElectionWatchers() { + for (ElectionWatcher watcher : watchers) { + try { + watcher.electionResultChanged(); + } catch (Exception e) { + LOG.warn("Election watcher " + watcher + " of type " + watcher.getClass() + " threw an exception.", e); + } + } + } + + public boolean addElectionWatcher(ElectionWatcher e) { + return watchers.add(e); + } + + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { + switch (connectionState) { + case SUSPENDED: + case LOST: + LOG.info("Received {} state from connection. Giving up lock.", connectionState); + + try { + leaderSelector.returnLease(lease); + } finally { + this.activeNode.set(false); + notifyElectionWatchers(); + } + + break; + default: + LOG.info("Connection state changed: {}", connectionState); + requestResource(); + + } + } + +} + + http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java new file mode 100644 index 0000000..b220ee4 --- /dev/null +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.policy; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.camel.NonManagedService; +import org.apache.camel.Route; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.RoutePolicySupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +/** + * <code>CuratorMultiMasterLeaderRoutePolicy</code> uses Apache Curator InterProcessSemaphoreV2 receipe to implement the behavior of having + * at multiple active instances of a route, controlled by a specific policy, running. It is typically used in + * fail-over scenarios controlling identical instances of a route across a cluster of Camel based servers. + * <p> + * The policy affects the normal startup lifecycle of CamelContext and Routes, automatically set autoStart property of + * routes controlled by this policy to false. + * After Curator receipe identifies the current Policy instance as the Leader between a set of clients that are + * competing for the role, it will start the route, and only at that moment the route will start its business. + * This specific behavior is designed to avoid scenarios where such a policy would kick in only after a route had + * already been started, with the risk, for consumers for example, that some source event might have already been + * consumed. + * <p> + * All instances of the policy must also be configured with the same path on the + * ZooKeeper cluster where the election will be carried out. It is good practice + * for this to indicate the application e.g. <tt>/someapplication/someroute/</tt> note + * that these nodes should exist before using the policy. + * <p> + * See <a href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection"> + * for more on how Leader election</a> is archived with ZooKeeper. + */ +public class CuratorMultiMasterLeaderRoutePolicy extends RoutePolicySupport implements ElectionWatcher, NonManagedService { + + private static final Logger LOG = LoggerFactory.getLogger(CuratorMultiMasterLeaderRoutePolicy.class); + private final String uri; + private final Lock lock = new ReentrantLock(); + private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>(); + private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean(); + private volatile boolean shouldStopRoute = true; + private final int enabledCount; + + + private final Lock electionLock = new ReentrantLock(); + + private CuratorMultiMasterLeaderElection election; + + public CuratorMultiMasterLeaderRoutePolicy(String uri, int enabledCount) { + this.uri = uri; + this.enabledCount = enabledCount; + } + public CuratorMultiMasterLeaderRoutePolicy(String uri) { + this(uri, 1); + } + + @Override + public void onInit(Route route) { + ensureElectionIsCreated(); + LOG.info("Route managed by {}. Setting route [{}] AutoStartup flag to false.", this.getClass(), route.getId()); + route.getRouteContext().getRoute().setAutoStartup("false"); + + + if (election.isMaster()) { + if (shouldStopRoute) { + startManagedRoute(route); + } + } else { + if (shouldStopRoute) { + stopManagedRoute(route); + } + } + + } + + private void ensureElectionIsCreated() { + if (election == null) { + electionLock.lock(); + try { + if (election == null) { // re-test + election = new CuratorMultiMasterLeaderElection(uri, enabledCount); + election.addElectionWatcher(this); + + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + electionLock.unlock(); + } + } + } + + private void startManagedRoute(Route route) { + try { + lock.lock(); + if (suspendedRoutes.contains(route)) { + startRoute(route); + suspendedRoutes.remove(route); + } + } catch (Exception e) { + handleException(e); + } finally { + lock.unlock(); + } + } + + private void stopManagedRoute(Route route) { + try { + lock.lock(); + // check that we should still suspend once the lock is acquired + if (!suspendedRoutes.contains(route) && !shouldProcessExchanges.get()) { + stopRoute(route); + suspendedRoutes.add(route); + } + } catch (Exception e) { + handleException(e); + } finally { + lock.unlock(); + } + } + + @Override + public void electionResultChanged() { + if (election.isMaster()) { + startAllStoppedRoutes(); + } + } + + private void startAllStoppedRoutes() { + try { + lock.lock(); + + if (!suspendedRoutes.isEmpty()) { + if (log.isDebugEnabled()) { + log.info("{} route(s) have been stopped previously by policy, restarting.", suspendedRoutes.size()); + } + for (Route suspended : suspendedRoutes) { + DefaultCamelContext ctx = (DefaultCamelContext)suspended.getRouteContext().getCamelContext(); + while (!ctx.isStarted()) { + log.info("Context {} is not started yet. Sleeping for a bit.", ctx.getName()); + Thread.sleep(5000); + } + log.info("Starting route [{}] defined in context [{}].", suspended.getId(), ctx.getName()); + startRoute(suspended); + } + suspendedRoutes.clear(); + } + + } catch (Exception e) { + handleException(e); + } finally { + lock.unlock(); + } + } + + @Override + protected void doShutdown() throws Exception { + try { + electionLock.lock(); + election.shutdownClients(); + election = null; + } finally { + electionLock.unlock(); + } + } + + public CuratorMultiMasterLeaderElection getElection() { + return election; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java new file mode 100644 index 0000000..a6c9946 --- /dev/null +++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeeper.policy; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.ExchangePattern; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.zookeeper.ZooKeeperTestSupport; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; + +public class MultiMasterCuratorLeaderRoutePolicyTest extends ZooKeeperTestSupport { + public static final String ZNODE = "/multimaster"; + public static final String BASE_ZNODE = "/someapp"; + private static final Logger LOG = LoggerFactory.getLogger(MultiMasterCuratorLeaderRoutePolicyTest.class); + + + protected CamelContext createCamelContext() throws Exception { + disableJMX(); + return super.createCamelContext(); + } + + + @Test + public void ensureRoutesDoNotStartAutomatically() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + CuratorMultiMasterLeaderRoutePolicy policy = new CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE + ZNODE + 2); + from("timer://foo?fixedRate=true&period=5").routePolicy(policy).id("single_route").autoStartup(true).to("mock:controlled"); + } + }); + context.start(); + // this check verifies that a route marked as autostartable is not started automatically. It will be the policy responsibility to eventually start it. + assertThat(context.getRouteStatus("single_route").isStarted(), is(false)); + assertThat(context.getRouteStatus("single_route").isStarting(), is(false)); + try { + context.shutdown(); + } catch (Exception e) { + //concurrency can raise some InterruptedException but we don't really care in this scenario. + } + } + + @Test + public void oneMasterOneSlaveScenarioContolledByPolicy() throws Exception { + final String path = "oneMasterOneSlaveScenarioContolledByPolicy"; + final String firstDestination = "first" + System.currentTimeMillis(); + final String secondDestination = "second" + System.currentTimeMillis(); + final CountDownLatch waitForSecondRouteCompletedLatch = new CountDownLatch(1); + final int activeNodesDesired = 1; + + MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination, activeNodesDesired, path); + DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext; + // get reference to the Policy object to check if it's already a master + CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0); + + assertWeHaveMasters(routePolicy); + + LOG.info("Starting first CamelContext"); + final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1]; + + new Thread() { + @Override + public void run() { + MultiMasterZookeeperPolicyEnforcedContext second = null; + try { + LOG.info("Starting second CamelContext in a separate thread"); + second = createEnforcedContext(secondDestination, activeNodesDesired, path); + arr[0] = second; + second.sendMessageToEnforcedRoute("message for second", 0); + waitForSecondRouteCompletedLatch.countDown(); + } catch (Exception e) { + LOG.error("Error in the thread controlling the second context", e); + fail("Error in the thread controlling the second context: " + e.getMessage()); + } + + + } + }.start(); + + first.sendMessageToEnforcedRoute("message for first", 1); + + waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES); + LOG.info("Explicitly shutting down the first camel context."); + + LOG.info("Shutting down first con"); + first.shutdown(); + + MultiMasterZookeeperPolicyEnforcedContext second = arr[0]; + + DefaultCamelContext secondCamelContext = (DefaultCamelContext) second.controlledContext; + assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0)); + + //second.mock = secondCamelContext.getEndpoint("mock:controlled", MockEndpoint.class); + second.sendMessageToEnforcedRoute("message for slave", 1); + second.shutdown(); + } + + + @Test + public void oneMasterOneSlaveAndFlippedAgainScenarioContolledByPolicy() throws Exception { + final String path = "oneMasterOneSlaveScenarioContolledByPolicy"; + final String firstDestination = "first" + System.currentTimeMillis(); + final String secondDestination = "second" + System.currentTimeMillis(); + final CountDownLatch waitForSecondRouteCompletedLatch = new CountDownLatch(1); + final int activeNodeDesired = 1; + + MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination, activeNodeDesired, path); + DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext; + // get reference to the Policy object to check if it's already a master + CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0); + + assertWeHaveMasters(routePolicy); + + LOG.info("Starting first CamelContext"); + final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1]; + + new Thread() { + @Override + public void run() { + MultiMasterZookeeperPolicyEnforcedContext slave = null; + try { + LOG.info("Starting second CamelContext in a separate thread"); + slave = createEnforcedContext(secondDestination, activeNodeDesired, path); + arr[0] = slave; + slave.sendMessageToEnforcedRoute("message for second", 0); + waitForSecondRouteCompletedLatch.countDown(); + } catch (Exception e) { + LOG.error("Error in the thread controlling the second context", e); + fail("Error in the thread controlling the second context: " + e.getMessage()); + } + + + } + }.start(); + + first.sendMessageToEnforcedRoute("message for first", 1); + + waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES); + MultiMasterZookeeperPolicyEnforcedContext second = arr[0]; + + LOG.info("Explicitly shutting down the first camel context."); + first.shutdown(); + + DefaultCamelContext secondCamelContext = (DefaultCamelContext) second.controlledContext; + assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0)); + + CountDownLatch restartFirstLatch = new CountDownLatch(1); + LOG.info("Start back first context"); + new Thread() { + @Override + public void run() { + try { + first.startup(); + restartFirstLatch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + restartFirstLatch.await(); + second.sendMessageToEnforcedRoute("message for second", 1); + first.mock.reset(); + first.sendMessageToEnforcedRoute("message for first", 0); + second.shutdown(); + controlledContext = (DefaultCamelContext) first.controlledContext; + // get reference to the Policy object to check if it's already a master + routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0); + log.info("Asserting route is up. context: [{}]", controlledContext.getName()); + assertWeHaveMasters(routePolicy); + first.controlledContext.setTracing(true); + first.mock = controlledContext.getEndpoint("mock:controlled", MockEndpoint.class); + first.sendMessageToEnforcedRoute("message for first", 1); + first.shutdown(); + } + + + + + @Test + public void oneMasterTwoSlavesScenarioContolledByPolicy() throws Exception { + final String path = "oneMasterTwoSlavesScenarioContolledByPolicy"; + final String master = "master" + System.currentTimeMillis(); + final String secondDestination = "second" + System.currentTimeMillis(); + final String thirdDestination = "third" + System.currentTimeMillis(); + final CountDownLatch waitForNonActiveRoutesLatch = new CountDownLatch(2); + final int activeNodesDesired = 1; + + LOG.info("Starting first CamelContext"); + MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(master, activeNodesDesired, path); + DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext; + // get reference to the Policy object to check if it's already a master + CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(master).getRoutePolicies().get(0); + + assertWeHaveMasters(routePolicy); + + final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[2]; + + new Thread() { + @Override + public void run() { + MultiMasterZookeeperPolicyEnforcedContext second = null; + try { + LOG.info("Starting second CamelContext"); + second = createEnforcedContext(secondDestination, activeNodesDesired, path); + arr[0] = second; + second.sendMessageToEnforcedRoute("message for second", 0); + waitForNonActiveRoutesLatch.countDown(); + } catch (Exception e) { + LOG.error("Error in the thread controlling the second context", e); + fail("Error in the thread controlling the second context: " + e.getMessage()); + } + + + } + }.start(); + + new Thread() { + @Override + public void run() { + MultiMasterZookeeperPolicyEnforcedContext third = null; + try { + LOG.info("Starting third CamelContext"); + third = createEnforcedContext(thirdDestination, activeNodesDesired, path); + arr[1] = third; + third.sendMessageToEnforcedRoute("message for third", 0); + waitForNonActiveRoutesLatch.countDown(); + } catch (Exception e) { + LOG.error("Error in the thread controlling the third context", e); + fail("Error in the thread controlling the third context: " + e.getMessage()); + } + + + } + }.start(); + + // Send messages to the master and the slave. + // The route is enabled in the master and gets through, but that sent to + // the slave context is rejected. + first.sendMessageToEnforcedRoute("message for master", 1); + + waitForNonActiveRoutesLatch.await(); + LOG.info("Explicitly shutting down the first camel context."); + // trigger failover by killing the master.. + first.shutdown(); + // let's find out who's active now: + + CuratorMultiMasterLeaderRoutePolicy routePolicySecond = (CuratorMultiMasterLeaderRoutePolicy) arr[0].controlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0); + CuratorMultiMasterLeaderRoutePolicy routePolicyThird = (CuratorMultiMasterLeaderRoutePolicy) arr[1].controlledContext.getRouteDefinition(thirdDestination).getRoutePolicies().get(0); + + MultiMasterZookeeperPolicyEnforcedContext newMaster = null; + MultiMasterZookeeperPolicyEnforcedContext slave = null; + + final int maxWait = 20; + for (int i = 0; i < maxWait; i++) { + if (routePolicySecond.getElection().isMaster()) { + newMaster = arr[0]; + slave = arr[1]; + LOG.info("[second] is the new master"); + break; + } else if (routePolicyThird.getElection().isMaster()) { + newMaster = arr[1]; + slave = arr[0]; + LOG.info("[third] is the new master"); + break; + } else { + Thread.sleep(2000); + LOG.info("waiting for a new master to be elected"); + } + } + assertThat(newMaster, is(notNullValue())); + + newMaster.sendMessageToEnforcedRoute("message for second", 1); + slave.sendMessageToEnforcedRoute("message for third", 0); + slave.shutdown(); + newMaster.shutdown(); + } + + + @Test + public void twoMasterOneSlavesScenarioContolledByPolicy() throws Exception { + final String path = "twoMasterOneSlavesScenarioContolledByPolicy"; + final String firstDestination = "first" + System.currentTimeMillis(); + final String secondDestination = "second" + System.currentTimeMillis(); + final String thirdDestination = "third" + System.currentTimeMillis(); + final CountDownLatch waitForThirdRouteCompletedLatch = new CountDownLatch(1); + final int activeNodeDesired = 2; + + MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination, activeNodeDesired, path); + DefaultCamelContext firstControlledContext = (DefaultCamelContext) first.controlledContext; + CuratorMultiMasterLeaderRoutePolicy firstRoutePolicy = (CuratorMultiMasterLeaderRoutePolicy) firstControlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0); + + MultiMasterZookeeperPolicyEnforcedContext second = createEnforcedContext(secondDestination, activeNodeDesired, path); + DefaultCamelContext secondControlledContext = (DefaultCamelContext) second.controlledContext; + CuratorMultiMasterLeaderRoutePolicy secondRoutePolicy = (CuratorMultiMasterLeaderRoutePolicy) secondControlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0); + + assertWeHaveMasters(firstRoutePolicy, secondRoutePolicy); + + final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1]; + + + new Thread() { + @Override + public void run() { + MultiMasterZookeeperPolicyEnforcedContext third = null; + try { + LOG.info("Starting third CamelContext"); + third = createEnforcedContext(thirdDestination, activeNodeDesired, path); + arr[0] = third; + third.sendMessageToEnforcedRoute("message for third", 0); + waitForThirdRouteCompletedLatch.countDown(); + } catch (Exception e) { + LOG.error("Error in the thread controlling the third context", e); + fail("Error in the thread controlling the third context: " + e.getMessage()); + } + + + } + }.start(); + + first.sendMessageToEnforcedRoute("message for first", 1); + second.sendMessageToEnforcedRoute("message for second", 1); + + + waitForThirdRouteCompletedLatch.await(); + + LOG.info("Explicitly shutting down the first camel context."); + first.shutdown(); + + + arr[0].sendMessageToEnforcedRoute("message for third", 1); + second.shutdown(); + arr[0].shutdown(); + } + + void assertWeHaveMasters(CuratorMultiMasterLeaderRoutePolicy... routePolicies) throws InterruptedException { + final int maxWait = 20; + boolean global = false; + for (int i = 0; i < maxWait; i++) { + boolean iteration = true; + for (CuratorMultiMasterLeaderRoutePolicy policy : routePolicies) { + log.info("Policy: {}, master: {}", policy, policy.getElection().isMaster()); + iteration = iteration & policy.getElection().isMaster(); + } + if (iteration) { + LOG.info("the number of required active routes is available"); + global = true; + break; + } else { + Thread.sleep(2000); + LOG.info("waiting routes to become leader and be activated."); + } + } + if (!global) { + fail("The expected number of route never became master"); + } + } + + + private class MultiMasterZookeeperPolicyEnforcedContext { + CamelContext controlledContext; + ProducerTemplate template; + MockEndpoint mock; + String routename; + String path; + + MultiMasterZookeeperPolicyEnforcedContext(String name, int activeNodesDesired, String path) throws Exception { + controlledContext = new DefaultCamelContext(); + routename = name; + this.path = path; + template = controlledContext.createProducerTemplate(); + mock = controlledContext.getEndpoint("mock:controlled", MockEndpoint.class); + controlledContext.addRoutes(new FailoverRoute(name, activeNodesDesired, path)); + controlledContext.start(); + } + + public void sendMessageToEnforcedRoute(String message, int expected) throws InterruptedException { + mock.expectedMessageCount(expected); + try { + LOG.info("Sending message to: {}", "vm:" + routename); + template.sendBody("vm:" + routename, ExchangePattern.InOut, message); + } catch (Exception e) { + if (expected > 0) { + LOG.error(e.getMessage(), e); + fail("Expected messages..."); + } + } + mock.await(2, TimeUnit.SECONDS); + mock.assertIsSatisfied(2000); + } + + public void shutdown() throws Exception { + LogFactory.getLog(getClass()).debug("stopping"); + controlledContext.stop(); + LogFactory.getLog(getClass()).debug("stopped"); + } + + + public void startup() throws Exception { + LogFactory.getLog(getClass()).debug("starting"); + controlledContext.start(); + LogFactory.getLog(getClass()).debug("started"); + } + } + + private MultiMasterZookeeperPolicyEnforcedContext createEnforcedContext(String name, int activeNodesDesired, String path) throws Exception, InterruptedException { + MultiMasterZookeeperPolicyEnforcedContext context = new MultiMasterZookeeperPolicyEnforcedContext(name, activeNodesDesired, path); + delay(1000); + return context; + } + + public class FailoverRoute extends RouteBuilder { + + private String path; + private String routename; + private int activeNodesDesired; + + public FailoverRoute(String routename, int activeNodesDesired, String path) { + // need names as if we use the same direct ep name in two contexts + // in the same vm shutting down one context shuts the endpoint for + // both. + this.routename = routename; + this.activeNodesDesired = activeNodesDesired; + this.path = path; + } + + public void configure() throws Exception { + CuratorMultiMasterLeaderRoutePolicy policy = new CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE + ZNODE + "/" + path, this.activeNodesDesired); + from("vm:" + routename).routePolicy(policy).id(routename).to("mock:controlled"); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties index 81447cc..7536ead 100644 --- a/components/camel-zookeeper/src/test/resources/log4j2.properties +++ b/components/camel-zookeeper/src/test/resources/log4j2.properties @@ -23,18 +23,26 @@ appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout +# appender.out.layout.pattern = %highlight{%d [%t] %-5level: %msg%n%throwable}{FATAL=red, ERROR=red, WARN=blue, INFO=black, DEBUG=grey, TRACE=blue} appender.out.layout.pattern = [%t] %c{1} %-5p %m%n + logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO logger.camel-zookeeper.name = org.apache.camel.component.zookeeper logger.camel-zookeeper.level = INFO +logger.camel-zookeeper-policy.name = org.apache.camel.component.zookeeper.policy +logger.camel-zookeeper-policy.level = INFO logger.camel-support.name = org.apache.camel.support logger.camel-support.level = INFO +logger.camel.name = org.apache.camel +logger.camel.level = INFO + + logger.springframework.name = org.springframework logger.springframework.level = WARN rootLogger.level = INFO -# rootLogger.appenderRef.stdout.ref = out +#rootLogger.appenderRef.stdout.ref = out rootLogger.appenderRef.file.ref = file
