[ 
https://issues.apache.org/jira/browse/CAMEL-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15226769#comment-15226769
 ] 

Jose Luis Pedrosa commented on CAMEL-9813:
------------------------------------------

Hi, 
I've been made a working implementation using curator. It would need a litlte 
bit of javadoc and maybe polish, but functional without loosing messages. 



> Zookeeper route policy requires first message to check if online.
> -----------------------------------------------------------------
>
>                 Key: CAMEL-9813
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9813
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-zookeeper
>    Affects Versions: 2.17.0
>         Environment: Java 1.8
> wildfly 10.0.0
> camel 2.17.0
>            Reporter: Jose Luis Pedrosa
>
> Hi All
> i've been investigaten after I found the same issue as described in the email 
> chain 
> (http://camel.465427.n5.nabble.com/Zookeeper-Route-Policy-not-respected-on-route-with-sftp-consumer-td5771610.html)
> The issue lies in the fact that ZookeeperRoutePolicy (ZRP from now on) waits 
> to check if that node is master after the first message have been recived 
> (which causes exception in the logs, and of course attempts to process 
> messages).
> What it does is simply throw an exception if we are not master... which stops 
> the route, sounds like we should not even the start the route in the first 
> place if we are not master. 
> I've been playing around, trying to implement the first check in the onStart 
> or onInit, the problem is that ZRP depends on a internal route injected on 
> the policy, I guess that is why the original developer did it in 
> ExchangeBegin instead of in start or onInit, to overcome the issue.
> {code:java}
>    private class ElectoralMonitorRoute extends RouteBuilder {
> ...
> ...
>             from(zep).id("election-route-" + candidateName).sort(body(), 
> comparator).process(new Processor() {
>                 @Override
>                 public void process(Exchange e) throws Exception {
>                     @SuppressWarnings("unchecked")
>                     List<String> candidates = 
> e.getIn().getMandatoryBody(List.class);
>                     // we cannot use the binary search here and the 
> candidates a not sorted in the normal way
>                     /**
>                      * check if the item at this location starts with this 
> nodes
>                      * candidate name
>                      */
>                     int location = 
> findCandidateLocationInCandidatesList(candidates, candidateName); 
>                     if (location != -1) {
>                         // set the nodes
>                         masterNode.set(location <= enabledCount);
>                         LOG.debug("This node is number '{}' on the candidate 
> list, election is configured for the top '{}'. this node will be {}",
>                                 new Object[]{location, enabledCount, 
> masterNode.get() ? "enabled" : "disabled"}
>                         );
>                     }
>                     electionComplete.countDown();
>                     notifyElectionWatchers();
>                 }
> {code}
> which makes a route dependant on another to start, which is hard to solve in 
> a clean way.
> Also i found that for some routes (my case) it also does not start them 
> automatically because they are stopped and not suspended:
> {code}
>   public static boolean resumeService(Object service) throws Exception {
>         if (service instanceof SuspendableService) {
>             SuspendableService ss = (SuspendableService) service;
>             if (ss.isSuspended()) {
>                 LOG.debug("Resuming service {}", service);
>                 ss.resume();
>                 return true;
>             } else {
>                 return false;
>             }
>         } else {
>             startService(service);
>             return true;
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to