[ 
https://issues.apache.org/jira/browse/ZOOKEEPER-2930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372624#comment-16372624
 ] 

ASF GitHub Bot commented on ZOOKEEPER-2930:
-------------------------------------------

Github user JonathanO commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/456#discussion_r169910520
  
    --- Diff: 
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ---
    @@ -318,76 +318,167 @@ public Thread newThread(Runnable r) {
          */
         public void testInitiateConnection(long sid) throws Exception {
             LOG.debug("Opening channel to server " + sid);
    -        Socket sock = new Socket();
    -        setSockOpts(sock);
    -        sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
    -        initiateConnection(sock, sid);
    +        initiateConnection(sid, 
self.getVotingView().get(sid).electionAddr);
    +    }
    +
    +    private Socket openChannel(long sid, InetSocketAddress electionAddr) {
    +        LOG.debug("Opening channel to server " + sid);
    +        try {
    +            final Socket sock = new Socket();
    +            setSockOpts(sock);
    +            sock.connect(electionAddr, cnxTO);
    +            LOG.debug("Connected to server " + sid);
    +            return sock;
    +        } catch (UnresolvedAddressException e) {
    +            // Sun doesn't include the address that causes this
    +            // exception to be thrown, also UAE cannot be wrapped cleanly
    +            // so we log the exception in order to capture this critical
    +            // detail.
    +            LOG.warn("Cannot open channel to " + sid
    +                    + " at election address " + electionAddr, e);
    +            throw e;
    +        } catch (IOException e) {
    +            LOG.warn("Cannot open channel to " + sid
    +                            + " at election address " + electionAddr,
    +                    e);
    +            return null;
    +        }
         }
     
         /**
          * If this server has initiated the connection, then it gives up on the
          * connection if it loses challenge. Otherwise, it keeps the 
connection.
          */
    -    public void initiateConnection(final Socket sock, final Long sid) {
    +    public boolean initiateConnection(final Long sid, InetSocketAddress 
electionAddr) {
             try {
    -            startConnection(sock, sid);
    -        } catch (IOException e) {
    -            LOG.error("Exception while connecting, id: {}, addr: {}, 
closing learner connection",
    -                    new Object[] { sid, sock.getRemoteSocketAddress() }, 
e);
    -            closeSocket(sock);
    -            return;
    +            Socket sock = openChannel(sid, electionAddr);
    +            if (sock != null) {
    +                try {
    +                    startConnection(sock, sid);
    +                } catch (IOException e) {
    +                    LOG.error("Exception while connecting, id: {}, addr: 
{}, closing learner connection",
    +                            new Object[]{sid, 
sock.getRemoteSocketAddress()}, e);
    +                    closeSocket(sock);
    +                }
    +                return true;
    +            } else {
    +                return false;
    +            }
    +        } finally {
    +            inprogressConnections.remove(sid);
             }
         }
     
    -    /**
    -     * Server will initiate the connection request to its peer server
    -     * asynchronously via separate connection thread.
    -     */
    -    public void initiateConnectionAsync(final Socket sock, final Long sid) 
{
    +    synchronized private void connectOneAsync(final Long sid, final 
ZooKeeperThread connectorThread) {
    +        if (senderWorkerMap.get(sid) != null) {
    +            LOG.debug("There is a connection already for server " + sid);
    +            return;
    +        }
             if(!inprogressConnections.add(sid)){
                 // simply return as there is a connection request to
                 // server 'sid' already in progress.
                 LOG.debug("Connection request to server id: {} is already in 
progress, so skipping this request",
                         sid);
    -            closeSocket(sock);
                 return;
             }
             try {
    -            connectionExecutor.execute(
    -                    new QuorumConnectionReqThread(sock, sid));
    +            connectionExecutor.execute(connectorThread);
                 connectionThreadCnt.incrementAndGet();
             } catch (Throwable e) {
                 // Imp: Safer side catching all type of exceptions and remove 
'sid'
                 // from inprogress connections. This is to avoid blocking 
further
                 // connection requests from this 'sid' in case of errors.
                 inprogressConnections.remove(sid);
                 LOG.error("Exception while submitting quorum connection 
request", e);
    -            closeSocket(sock);
             }
         }
     
    +    /**
    +     * Try to establish a connection to server with id sid using its 
electionAddr.
    +     *
    +     * Server will initiate the connection request to its peer server
    +     * asynchronously via separate connection thread.
    +     *
    +     *  @param sid  server id
    +     *  @param electionAddr election address
    +     */
    +    private void connectOne(final Long sid, InetSocketAddress 
electionAddr) {
    +        connectOneAsync(sid, new QuorumConnectionReqThread(sid, 
electionAddr));
    +    }
    +
    +    /**
    +     * Try to establish a connection to server with id sid.
    +     *
    +     * Server will initiate the connection request to its peer server
    +     * asynchronously via separate connection thread.
    +     *
    +     *  @param sid  server id
    +     */
    +    public void connectOne(final Long sid) {
    +        connectOneAsync(sid, new QuorumConnectionReqBySidThread(sid));
    +    }
    +
         /**
          * Thread to send connection request to peer server.
          */
    -    private class QuorumConnectionReqThread extends ZooKeeperThread {
    -        final Socket sock;
    +    private class QuorumConnectionReqBySidThread extends ZooKeeperThread {
             final Long sid;
    -        QuorumConnectionReqThread(final Socket sock, final Long sid) {
    +
    +        QuorumConnectionReqBySidThread(final Long sid) {
                 super("QuorumConnectionReqThread-" + sid);
    -            this.sock = sock;
                 this.sid = sid;
             }
     
             @Override
             public void run() {
    -            try{
    -                initiateConnection(sock, sid);
    -            } finally {
    -                inprogressConnections.remove(sid);
    +            synchronized (self.QV_LOCK) {
    +                boolean knownId = false;
    +                // Resolve hostname for the remote server before 
attempting to
    +                // connect in case the underlying ip address has changed.
    +                self.recreateSocketAddresses(sid);
    +                Map<Long, QuorumPeer.QuorumServer> lastCommittedView = 
self.getView();
    +                QuorumVerifier lastSeenQV = 
self.getLastSeenQuorumVerifier();
    +                Map<Long, QuorumPeer.QuorumServer> lastProposedView = 
lastSeenQV.getAllMembers();
    +                if (lastCommittedView.containsKey(sid)) {
    +                    knownId = true;
    +                    if (initiateConnection(sid, 
lastCommittedView.get(sid).electionAddr)) {
    +                        return;
    +                    }
    +                }
    +                if (lastSeenQV != null && lastProposedView.containsKey(sid)
    +                        && (!knownId || 
(lastProposedView.get(sid).electionAddr !=
    +                        lastCommittedView.get(sid).electionAddr))) {
    +                    knownId = true;
    +                    if (initiateConnection(sid, 
lastProposedView.get(sid).electionAddr)) {
    +                        return;
    +                    }
    +                }
    +                if (!knownId) {
    +                    LOG.warn("Invalid server id: " + sid);
    +                    return;
    +                }
                 }
             }
         }
     
    +    /**
    +     * Thread to send connection request to peer server.
    +     */
    +    private class QuorumConnectionReqThread extends ZooKeeperThread {
    --- End diff --
    
    I considered it, especially as the QuorumConnectionReqBySidThread case (and 
the backport to 3.4 #465) are effectively run serially. It just seemed 
consistent with the existing approach to stick with creating a thread for every 
new connection. There shouldn't be large numbers of them being created, so I 
figured it should be fine.


> Leader cannot be elected due to network timeout of some members.
> ----------------------------------------------------------------
>
>                 Key: ZOOKEEPER-2930
>                 URL: https://issues.apache.org/jira/browse/ZOOKEEPER-2930
>             Project: ZooKeeper
>          Issue Type: Bug
>          Components: leaderElection, quorum, server
>    Affects Versions: 3.4.10, 3.5.3, 3.4.11, 3.5.4, 3.4.12
>         Environment: Java 8
> ZooKeeper 3.4.11(from github)
> Centos6.5
>            Reporter: Jiafu Jiang
>            Priority: Critical
>         Attachments: zoo.cfg, zookeeper1.log, zookeeper2.log
>
>
> I deploy a cluster of ZooKeeper with three nodes:
> ofs_zk1:20.10.11.101, 30.10.11.101
> ofs_zk2:20.10.11.102, 30.10.11.102
> ofs_zk3:20.10.11.103, 30.10.11.103
> I shutdown the network interfaces of ofs_zk2 using "ifdown eth0 eth1" command.
> It is supposed that the new Leader should be elected in some seconds, but the 
> fact is, ofs_zk1 and ofs_zk3 just keep electing again and again, but none of 
> them can become the new Leader.
> I change the log level to DEBUG (the default is INFO), and restart zookeeper 
> servers on ofs_zk1 and ofs_zk2 again, but it can not fix the problem.
> I read the log and the ZooKeeper source code, and I think I find the reason.
> When the potential leader(says ofs_zk3) begins the 
> election(FastLeaderElection.lookForLeader()), it will send notifications to 
> all the servers. 
> When it fails to receive any notification during a timeout, it will resend 
> the notifications, and double the timeout. This process will repeat until any 
> notification is received or the timeout reaches a max value.
> The FastLeaderElection.sendNotifications() just put the notification message 
> into a queue and return. The WorkerSender is responsable to send the 
> notifications.
> The WorkerSender just process the notifications one by one by passing the 
> notifications to QuorumCnxManager. Here comes the problem, the 
> QuorumCnxManager.toSend() blocks for a long time when the notification is 
> send to ofs_zk2(whose network is down) and some notifications (which belongs 
> to ofs_zk1) will thus be blocked for a long time. The repeated notifications 
> by FastLeaderElection.sendNotifications() just make things worse.
> Here is the related source code:
> {code:java}
>     public void toSend(Long sid, ByteBuffer b) {
>         /*
>          * If sending message to myself, then simply enqueue it (loopback).
>          */
>         if (this.mySid == sid) {
>              b.position(0);
>              addToRecvQueue(new Message(b.duplicate(), sid));
>             /*
>              * Otherwise send to the corresponding thread to send.
>              */
>         } else {
>              /*
>               * Start a new connection if doesn't have one already.
>               */
>              ArrayBlockingQueue<ByteBuffer> bq = new 
> ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
>              ArrayBlockingQueue<ByteBuffer> bqExisting = 
> queueSendMap.putIfAbsent(sid, bq);
>              if (bqExisting != null) {
>                  addToSendQueue(bqExisting, b);
>              } else {
>                  addToSendQueue(bq, b);
>              }
>              
>              // This may block!!!
>              connectOne(sid);
>                 
>         }
>     }
> {code}
> Therefore, when ofs_zk3 believes that it is the leader, it begins to wait the 
> epoch ack, but in fact the ofs_zk1 does not receive the notification(which 
> says the leader is ofs_zk3) because the ofs_zk3 has not sent the 
> notification(which may still exist in the sendqueue of WorkerSender). At 
> last, the potential leader ofs_zk3 fails to receive the epoch ack in timeout, 
> so it quits the leader and begins a new election. 
> The log files of ofs_zk1 and ofs_zk3 are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to