[
https://issues.apache.org/jira/browse/ZOOKEEPER-2930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366332#comment-16366332
]
ASF GitHub Bot commented on ZOOKEEPER-2930:
-------------------------------------------
Github user fpj commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/456#discussion_r168620297
--- Diff:
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ---
@@ -318,76 +318,167 @@ public Thread newThread(Runnable r) {
*/
public void testInitiateConnection(long sid) throws Exception {
LOG.debug("Opening channel to server " + sid);
- Socket sock = new Socket();
- setSockOpts(sock);
- sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
- initiateConnection(sock, sid);
+ initiateConnection(sid,
self.getVotingView().get(sid).electionAddr);
+ }
+
+ private Socket openChannel(long sid, InetSocketAddress electionAddr) {
+ LOG.debug("Opening channel to server " + sid);
+ try {
+ final Socket sock = new Socket();
+ setSockOpts(sock);
+ sock.connect(electionAddr, cnxTO);
+ LOG.debug("Connected to server " + sid);
+ return sock;
+ } catch (UnresolvedAddressException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, also UAE cannot be wrapped cleanly
+ // so we log the exception in order to capture this critical
+ // detail.
+ LOG.warn("Cannot open channel to " + sid
+ + " at election address " + electionAddr, e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn("Cannot open channel to " + sid
+ + " at election address " + electionAddr,
+ e);
+ return null;
+ }
}
/**
* If this server has initiated the connection, then it gives up on the
* connection if it loses challenge. Otherwise, it keeps the
connection.
*/
- public void initiateConnection(final Socket sock, final Long sid) {
+ public boolean initiateConnection(final Long sid, InetSocketAddress
electionAddr) {
try {
- startConnection(sock, sid);
- } catch (IOException e) {
- LOG.error("Exception while connecting, id: {}, addr: {},
closing learner connection",
- new Object[] { sid, sock.getRemoteSocketAddress() },
e);
- closeSocket(sock);
- return;
+ Socket sock = openChannel(sid, electionAddr);
+ if (sock != null) {
+ try {
+ startConnection(sock, sid);
+ } catch (IOException e) {
+ LOG.error("Exception while connecting, id: {}, addr:
{}, closing learner connection",
+ new Object[]{sid,
sock.getRemoteSocketAddress()}, e);
+ closeSocket(sock);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ inprogressConnections.remove(sid);
}
}
- /**
- * Server will initiate the connection request to its peer server
- * asynchronously via separate connection thread.
- */
- public void initiateConnectionAsync(final Socket sock, final Long sid)
{
+ synchronized private void connectOneAsync(final Long sid, final
ZooKeeperThread connectorThread) {
+ if (senderWorkerMap.get(sid) != null) {
+ LOG.debug("There is a connection already for server " + sid);
+ return;
+ }
if(!inprogressConnections.add(sid)){
// simply return as there is a connection request to
// server 'sid' already in progress.
LOG.debug("Connection request to server id: {} is already in
progress, so skipping this request",
sid);
- closeSocket(sock);
return;
}
try {
- connectionExecutor.execute(
- new QuorumConnectionReqThread(sock, sid));
+ connectionExecutor.execute(connectorThread);
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
// Imp: Safer side catching all type of exceptions and remove
'sid'
// from inprogress connections. This is to avoid blocking
further
// connection requests from this 'sid' in case of errors.
inprogressConnections.remove(sid);
LOG.error("Exception while submitting quorum connection
request", e);
- closeSocket(sock);
}
}
+ /**
+ * Try to establish a connection to server with id sid using its
electionAddr.
+ *
+ * Server will initiate the connection request to its peer server
+ * asynchronously via separate connection thread.
+ *
+ * @param sid server id
+ * @param electionAddr election address
+ */
+ private void connectOne(final Long sid, InetSocketAddress
electionAddr) {
+ connectOneAsync(sid, new QuorumConnectionReqThread(sid,
electionAddr));
+ }
+
+ /**
+ * Try to establish a connection to server with id sid.
+ *
+ * Server will initiate the connection request to its peer server
+ * asynchronously via separate connection thread.
+ *
+ * @param sid server id
+ */
+ public void connectOne(final Long sid) {
+ connectOneAsync(sid, new QuorumConnectionReqBySidThread(sid));
+ }
+
/**
* Thread to send connection request to peer server.
*/
- private class QuorumConnectionReqThread extends ZooKeeperThread {
- final Socket sock;
+ private class QuorumConnectionReqBySidThread extends ZooKeeperThread {
final Long sid;
- QuorumConnectionReqThread(final Socket sock, final Long sid) {
+
+ QuorumConnectionReqBySidThread(final Long sid) {
super("QuorumConnectionReqThread-" + sid);
- this.sock = sock;
this.sid = sid;
}
@Override
public void run() {
- try{
- initiateConnection(sock, sid);
- } finally {
- inprogressConnections.remove(sid);
+ synchronized (self.QV_LOCK) {
+ boolean knownId = false;
+ // Resolve hostname for the remote server before
attempting to
+ // connect in case the underlying ip address has changed.
+ self.recreateSocketAddresses(sid);
+ Map<Long, QuorumPeer.QuorumServer> lastCommittedView =
self.getView();
+ QuorumVerifier lastSeenQV =
self.getLastSeenQuorumVerifier();
+ Map<Long, QuorumPeer.QuorumServer> lastProposedView =
lastSeenQV.getAllMembers();
+ if (lastCommittedView.containsKey(sid)) {
+ knownId = true;
+ if (initiateConnection(sid,
lastCommittedView.get(sid).electionAddr)) {
+ return;
+ }
+ }
+ if (lastSeenQV != null && lastProposedView.containsKey(sid)
+ && (!knownId ||
(lastProposedView.get(sid).electionAddr !=
+ lastCommittedView.get(sid).electionAddr))) {
+ knownId = true;
+ if (initiateConnection(sid,
lastProposedView.get(sid).electionAddr)) {
+ return;
+ }
+ }
+ if (!knownId) {
+ LOG.warn("Invalid server id: " + sid);
+ return;
+ }
}
}
}
+ /**
+ * Thread to send connection request to peer server.
+ */
+ private class QuorumConnectionReqThread extends ZooKeeperThread {
--- End diff --
Since we are making this change to run the connection establishment on a
separate thread, I wonder if it would make sense to use an executor rather than
instantiate a thread each time. What do you think?
> Leader cannot be elected due to network timeout of some members.
> ----------------------------------------------------------------
>
> Key: ZOOKEEPER-2930
> URL: https://issues.apache.org/jira/browse/ZOOKEEPER-2930
> Project: ZooKeeper
> Issue Type: Bug
> Components: leaderElection, quorum, server
> Affects Versions: 3.4.10, 3.5.3, 3.4.11, 3.5.4, 3.4.12
> Environment: Java 8
> ZooKeeper 3.4.11(from github)
> Centos6.5
> Reporter: Jiafu Jiang
> Priority: Critical
> Attachments: zoo.cfg, zookeeper1.log, zookeeper2.log
>
>
> I deploy a cluster of ZooKeeper with three nodes:
> ofs_zk1:20.10.11.101, 30.10.11.101
> ofs_zk2:20.10.11.102, 30.10.11.102
> ofs_zk3:20.10.11.103, 30.10.11.103
> I shutdown the network interfaces of ofs_zk2 using "ifdown eth0 eth1" command.
> It is supposed that the new Leader should be elected in some seconds, but the
> fact is, ofs_zk1 and ofs_zk3 just keep electing again and again, but none of
> them can become the new Leader.
> I change the log level to DEBUG (the default is INFO), and restart zookeeper
> servers on ofs_zk1 and ofs_zk2 again, but it can not fix the problem.
> I read the log and the ZooKeeper source code, and I think I find the reason.
> When the potential leader(says ofs_zk3) begins the
> election(FastLeaderElection.lookForLeader()), it will send notifications to
> all the servers.
> When it fails to receive any notification during a timeout, it will resend
> the notifications, and double the timeout. This process will repeat until any
> notification is received or the timeout reaches a max value.
> The FastLeaderElection.sendNotifications() just put the notification message
> into a queue and return. The WorkerSender is responsable to send the
> notifications.
> The WorkerSender just process the notifications one by one by passing the
> notifications to QuorumCnxManager. Here comes the problem, the
> QuorumCnxManager.toSend() blocks for a long time when the notification is
> send to ofs_zk2(whose network is down) and some notifications (which belongs
> to ofs_zk1) will thus be blocked for a long time. The repeated notifications
> by FastLeaderElection.sendNotifications() just make things worse.
> Here is the related source code:
> {code:java}
> public void toSend(Long sid, ByteBuffer b) {
> /*
> * If sending message to myself, then simply enqueue it (loopback).
> */
> if (this.mySid == sid) {
> b.position(0);
> addToRecvQueue(new Message(b.duplicate(), sid));
> /*
> * Otherwise send to the corresponding thread to send.
> */
> } else {
> /*
> * Start a new connection if doesn't have one already.
> */
> ArrayBlockingQueue<ByteBuffer> bq = new
> ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
> ArrayBlockingQueue<ByteBuffer> bqExisting =
> queueSendMap.putIfAbsent(sid, bq);
> if (bqExisting != null) {
> addToSendQueue(bqExisting, b);
> } else {
> addToSendQueue(bq, b);
> }
>
> // This may block!!!
> connectOne(sid);
>
> }
> }
> {code}
> Therefore, when ofs_zk3 believes that it is the leader, it begins to wait the
> epoch ack, but in fact the ofs_zk1 does not receive the notification(which
> says the leader is ofs_zk3) because the ofs_zk3 has not sent the
> notification(which may still exist in the sendqueue of WorkerSender). At
> last, the potential leader ofs_zk3 fails to receive the epoch ack in timeout,
> so it quits the leader and begins a new election.
> The log files of ofs_zk1 and ofs_zk3 are attached.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)