Author: breed
Date: Wed Mar 16 18:49:18 2011
New Revision: 1082260
URL: http://svn.apache.org/viewvc?rev=1082260&view=rev
Log:
ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1082260&r1=1082259&r2=1082260&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Mar 16 18:49:18 2011
@@ -192,6 +192,8 @@ BUGFIXES:
ZOOKEEPER-1012. support distinct JVMFLAGS for zookeeper server in
zkServer.sh and zookeeper client in zkCli.sh (Eugene Koontz via breed)
+ ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds (vishal via
breed)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1082260&r1=1082259&r2=1082260&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Wed Mar 16 18:49:18 2011
@@ -32,6 +32,8 @@ import java.util.Enumeration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Date;
import org.apache.log4j.Logger;
@@ -110,6 +112,11 @@ public class QuorumCnxManager {
*/
public final Listener listener;
+ /*
+ * Counter to count worker threads
+ */
+ private AtomicInteger threadCnt = new AtomicInteger(0);
+
static public class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
@@ -177,7 +184,7 @@ public class QuorumCnxManager {
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
- RecvWorker rw = new RecvWorker(sock, sid);
+ RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -253,7 +260,7 @@ public class QuorumCnxManager {
// Otherwise start worker threads to receive data.
} else {
SendWorker sw = new SendWorker(sock, sid);
- RecvWorker rw = new RecvWorker(sock, sid);
+ RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
@@ -448,6 +455,19 @@ public class QuorumCnxManager {
}
/**
+ * Return number of worker threads
+ */
+ public long getThreadCount() {
+ return threadCnt.get();
+ }
+ /**
+ * Return reference to QuorumPeer
+ */
+ public QuorumPeer getQuorumPeer() {
+ return self;
+ }
+
+ /**
* Thread to listen on some port
*/
public class Listener extends Thread {
@@ -591,6 +611,7 @@ public class QuorumCnxManager {
LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
}
senderWorkerMap.remove(sid, this);
+ threadCnt.decrementAndGet();
return running;
}
@@ -610,6 +631,7 @@ public class QuorumCnxManager {
@Override
public void run() {
+ threadCnt.incrementAndGet();
try {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
@@ -662,10 +684,12 @@ public class QuorumCnxManager {
Socket sock;
volatile boolean running = true;
DataInputStream din;
+ final SendWorker sw;
- RecvWorker(Socket sock, Long sid) {
+ RecvWorker(Socket sock, Long sid, SendWorker sw) {
this.sid = sid;
this.sock = sock;
+ this.sw = sw;
try {
din = new DataInputStream(sock.getInputStream());
// OK to wait until socket disconnects while reading.
@@ -692,11 +716,13 @@ public class QuorumCnxManager {
running = false;
this.interrupt();
+ threadCnt.decrementAndGet();
return running;
}
@Override
public void run() {
+ threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
@@ -719,8 +745,10 @@ public class QuorumCnxManager {
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = " +
- self.getId() + ", error = " + e);
+ self.getId() + ", error = " , e);
} finally {
+ LOG.warn("Interrupting SendWorker");
+ sw.finish();
if (sock != null) {
closeSocket(sock);
}
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1082260&r1=1082259&r2=1082260&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Wed Mar 16 18:49:18 2011
@@ -74,6 +74,7 @@ public class QuorumPeer extends Thread i
QuorumBean jmxQuorumBean;
LocalPeerBean jmxLocalPeerBean;
LeaderElectionBean jmxLeaderElectionBean;
+ QuorumCnxManager qcm;
/* ZKDatabase is a top level member of quorumpeer
* which will be used in all the zookeeperservers
@@ -524,11 +525,11 @@ public class QuorumPeer extends Thread i
le = new AuthFastLeaderElection(this, true);
break;
case 3:
- QuorumCnxManager mng = new QuorumCnxManager(this);
- QuorumCnxManager.Listener listener = mng.listener;
+ qcm = new QuorumCnxManager(this);
+ QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
- le = new FastLeaderElection(this,mng);
+ le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
@@ -957,4 +958,11 @@ public class QuorumPeer extends Thread i
public boolean isRunning() {
return running;
}
+
+ /**
+ * get reference to QuorumCnxManager
+ */
+ public QuorumCnxManager getQuorumCnxManager() {
+ return qcm;
+}
}
Modified:
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1082260&r1=1082259&r2=1082260&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
(original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Wed Mar 16 18:49:18 2011
@@ -22,18 +22,19 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.io.*;
+import java.net.Socket;
import org.apache.log4j.Logger;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.junit.Assert;
@@ -46,25 +47,26 @@ public class CnxManagerTest extends ZKTe
int count;
HashMap<Long,QuorumServer> peers;
- File tmpdir[];
- int port[];
-
+ File peerTmpdir[];
+ int peerQuorumPort[];
+ int peerClientPort[];
@Before
public void setUp() throws Exception {
this.count = 3;
- this.peers = new HashMap<Long,QuorumServer>(count);
- tmpdir = new File[count];
- port = new int[count];
-
+ this.peers = new HashMap<Long,QuorumServer>(count);
+ peerTmpdir = new File[count];
+ peerQuorumPort = new int[count];
+ peerClientPort = new int[count];
+
for(int i = 0; i < count; i++) {
- int clientport = PortAssignment.unique();
+ peerQuorumPort[i] = PortAssignment.unique();
+ peerClientPort[i] = PortAssignment.unique();
peers.put(Long.valueOf(i),
new QuorumServer(i,
- new InetSocketAddress(clientport),
+ new InetSocketAddress(peerQuorumPort[i]),
new InetSocketAddress(PortAssignment.unique())));
- tmpdir[i] = ClientBase.createTmpDir();
- port[i] = clientport;
+ peerTmpdir[i] = ClientBase.createTmpDir();
}
}
@@ -94,7 +96,7 @@ public class CnxManagerTest extends ZKTe
public void run(){
try {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0],
port[0], 3, 0, 2, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0],
peerTmpdir[0], peerClientPort[0], 3, 0, 2, 2, 2);
QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -137,8 +139,8 @@ public class CnxManagerTest extends ZKTe
CnxManagerThread thread = new CnxManagerThread();
thread.start();
-
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1],
3, 1, 2, 2, 2);
+
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
peerClientPort[1], 3, 1, 2, 2, 2);
QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -155,7 +157,7 @@ public class CnxManagerTest extends ZKTe
m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) cnxManager.connectAll();
}
-
+
Assert.assertTrue("Exceeded number of retries", numRetries <=
THRESHOLD);
thread.join(5000);
@@ -181,10 +183,9 @@ public class CnxManagerTest extends ZKTe
new QuorumServer(2,
new InetSocketAddress(deadAddress, deadPort),
new InetSocketAddress(deadAddress,
PortAssignment.unique())));
- tmpdir[2] = ClientBase.createTmpDir();
- port[2] = deadPort;
-
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1],
3, 1, 2, 2, 2);
+ peerTmpdir[2] = ClientBase.createTmpDir();
+
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
peerClientPort[1], 3, 1, 2, 2, 2);
QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -211,7 +212,7 @@ public class CnxManagerTest extends ZKTe
*/
@Test
public void testCnxManagerSpinLock() throws Exception {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1],
3, 1, 2, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
peerClientPort[1], 3, 1, 2, 2, 2);
QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -258,14 +259,16 @@ public class CnxManagerTest extends ZKTe
} catch (Exception e) {
LOG.info("Socket has been closed as expected");
}
- }
+ peer.shutdown();
+ cnxManager.halt();
+ }
/*
* Test if a receiveConnection is able to timeout on socket errors
*/
@Test
public void testSocketTimeout() throws Exception {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1],
3, 1, 2000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
peerClientPort[1], 3, 1, 2000, 2, 2);
QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -286,4 +289,80 @@ public class CnxManagerTest extends ZKTe
long end = System.currentTimeMillis();
if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500))
Assert.fail("Waited more than necessary");
}
+
+ /*
+ * Test if Worker threads are getting killed after connection loss
+ */
+ @Test
+ public void testWorkerThreads() throws Exception {
+ ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+
+ for (int sid = 0; sid < 3; sid++) {
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid],
peerTmpdir[sid],
+ peerClientPort[sid], 3, sid,
1000, 2, 2);
+ LOG.info("Starting peer " + peer.getId());
+ peer.start();
+ peerList.add(sid, peer);
+ }
+ String failure = verifyThreadCount(peerList, 4);
+ if (failure != null) {
+ Assert.fail(failure);
+ }
+ for (int myid = 0; myid < 3; myid++) {
+ for (int i = 0; i < 5; i++) {
+ // halt one of the listeners and verify count
+ QuorumPeer peer = peerList.get(myid);
+ LOG.info("Round " + i + ", halting peer " + peer.getId());
+ peer.shutdown();
+ peerList.remove(myid);
+ failure = verifyThreadCount(peerList, 2);
+ if (failure != null) {
+ Assert.fail(failure);
+ }
+
+ // Restart halted node and verify count
+ peer = new QuorumPeer(peers, peerTmpdir[myid],
peerTmpdir[myid],
+ peerClientPort[myid], 3, myid, 1000,
2, 2);
+ LOG.info("Round " + i + ", restarting peer " + peer.getId());
+ peer.start();
+ peerList.add(myid, peer);
+ failure = verifyThreadCount(peerList, 4);
+ if (failure != null) {
+ Assert.fail(failure);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns null on success, otw the message assoc with the failure
+ * @throws InterruptedException
+ */
+ public String verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt)
+ throws InterruptedException
+ {
+ String failure = null;
+ for (int i = 0; i < 120; i++) {
+ Thread.sleep(500);
+
+ failure = _verifyThreadCount(peerList, ecnt);
+ if (failure == null) {
+ return null;
+ }
+ }
+ return failure;
+ }
+ public String _verifyThreadCount(ArrayList<QuorumPeer> peerList, long
ecnt) {
+ for (int myid = 0; myid < peerList.size(); myid++) {
+ QuorumPeer peer = peerList.get(myid);
+ QuorumCnxManager cnxManager = peer.getQuorumCnxManager();
+ long cnt = cnxManager.getThreadCount();
+ if (cnt != ecnt) {
+ return new String(new Date()
+ + " Incorrect number of Worker threads for sid=" + myid
+ + " expected " + ecnt + " found " + cnt);
+ }
+ }
+ return null;
+ }
}