Author: davsclaus
Date: Wed Aug 29 12:03:33 2012
New Revision: 1378511
URL: http://svn.apache.org/viewvc?rev=1378511&view=rev
Log:
CAMEL-5544: Polished zookeeper route policy. Thanks to Andrew Wheat for patch.
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java?rev=1378511&r1=1378510&r2=1378511&view=diff
==============================================================================
---
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
(original)
+++
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
Wed Aug 29 12:03:33 2012
@@ -17,6 +17,7 @@
package org.apache.camel.component.zookeeper.policy;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -26,8 +27,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static java.lang.String.format;
-
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -68,41 +67,40 @@ import org.apache.zookeeper.CreateMode;
*/
public class ZooKeeperRoutePolicy extends RoutePolicySupport {
- private String uri;
-
- private int enabledCount;
-
+ private final String uri;
+ private final int enabledCount;
private String candidateName;
-
private final Lock lock = new ReentrantLock();
-
private final CountDownLatch electionComplete = new CountDownLatch(1);
-
- private Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>();
-
- private AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
-
+ private final Set<Route> suspendedRoutes = new
CopyOnWriteArraySet<Route>();
+ private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
private ProducerTemplate template;
+ private volatile boolean shouldStopConsumer = true;
+ private final UuidGenerator uuidGenerator = new JavaUuidGenerator();
+ private volatile boolean isCandidateCreated;
- private boolean shouldStopConsumer = true;
-
- private UuidGenerator uuidGenerator = new JavaUuidGenerator();
-
- private boolean isCandidateCreated;
-
- public ZooKeeperRoutePolicy(String uri, int enabledCount) throws Exception
{
+ public ZooKeeperRoutePolicy(String uri, int enabledCount) {
this.uri = uri;
this.enabledCount = enabledCount;
createCandidateName();
}
- private void createCandidateName() throws Exception {
+ private void createCandidateName() {
/** UUID would be enough, also using hostname for human readability */
- StringBuilder b = new
StringBuilder(InetAddress.getLocalHost().getCanonicalHostName());
+ StringBuilder b = new StringBuilder(fetchHostname());
b.append("-").append(uuidGenerator.generateUuid());
this.candidateName = b.toString();
}
+ private String fetchHostname() {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException ex) {
+ log.warn("Unable to determine the local hostname, using a
default.", ex);
+ return "default";
+ }
+ }
+
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
testAndCreateCandidateNode(route);
@@ -180,7 +178,7 @@ public class ZooKeeperRoutePolicy extend
lock.lock();
if (!suspendedRoutes.isEmpty()) {
if (log.isDebugEnabled()) {
- log.debug(format("'%d' have been stopped previously by
poilcy, restarting.", suspendedRoutes.size()));
+ log.debug("{} have been stopped previously by policy,
restarting.", suspendedRoutes.size());
}
for (Route suspended : suspendedRoutes) {
startConsumer(suspended.getConsumer());
@@ -205,9 +203,8 @@ public class ZooKeeperRoutePolicy extend
private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
this.template = camelContext.createProducerTemplate();
- if (log.isInfoEnabled()) {
- log.info(format("Initializing ZookeeperRoutePolicy with uri '%s'",
uri));
- }
+ log.info("Initializing ZookeeperRoutePolicy with uri {}", uri);
+
ZooKeeperEndpoint zep = camelContext.getEndpoint(uri,
ZooKeeperEndpoint.class);
zep.getConfiguration().setCreate(true);
String fullpath = createFullPathToCandidate(zep);
@@ -218,17 +215,13 @@ public class ZooKeeperRoutePolicy extend
template.send(zep, e);
if (e.isFailed()) {
- log.error("Error setting up election node " + fullpath,
e.getException());
+ log.warn("Error setting up election node " + fullpath,
e.getException());
} else {
- if (log.isInfoEnabled()) {
- log.info(format("Candidate node '%s' has been created",
fullpath));
- }
+ log.info("Candidate node {} has been created", fullpath);
try {
- if (zep != null) {
- camelContext.addRoutes(new ElectoralMonitorRoute(zep));
- }
+ camelContext.addRoutes(new ElectoralMonitorRoute(zep));
} catch (Exception ex) {
- log.error("Error configuring ZookeeperRoutePolicy", ex);
+ log.warn("Error configuring ZookeeperRoutePolicy. This
exception is ignored.", ex);
}
}
return zep;
@@ -287,8 +280,8 @@ public class ZooKeeperRoutePolicy extend
shouldProcessExchanges.set(location <= enabledCount);
if (log.isDebugEnabled()) {
- log.debug(format("This node is number '%d' on the
candidate list, route is configured for the top '%d'. Exchange processing will
be %s", location,
- enabledCount,
shouldProcessExchanges.get() ? "enabled" : "disabled"));
+ log.debug("This node is number {} on the candidate
list, route is configured for the top {}. Exchange processing will be {}",
+ new Object[]{location, enabledCount,
shouldProcessExchanges.get() ? "enabled" : "disabled"});
}
startAllStoppedConsumers();
}