Raul, what's the status on this? Seems like trunk was committed but not 3.5? We don't want to lose track.
Patrick On Wed, Jun 8, 2016 at 8:43 AM, <[email protected]> wrote: > Author: rgs > Date: Wed Jun 8 15:43:15 2016 > New Revision: 1747408 > > URL: http://svn.apache.org/viewvc?rev=1747408&view=rev > Log: > ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne > (Michael Han via rgs) > > Modified: > zookeeper/trunk/CHANGES.txt > > zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java > > Modified: zookeeper/trunk/CHANGES.txt > URL: > http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1747408&r1=1747407&r2=1747408&view=diff > > ============================================================================== > --- zookeeper/trunk/CHANGES.txt (original) > +++ zookeeper/trunk/CHANGES.txt Wed Jun 8 15:43:15 2016 > @@ -308,6 +308,9 @@ BUGFIXES: > ZOOKEEPER-2405: getTGT() in Login.java mishandles confidential > information (Michael Han via phunt) > > + ZOOKEEPER-2442: Socket leak in QuorumCnxManager connectOne > + (Michael Han via rgs) > + > IMPROVEMENTS: > ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir > Lev-Ari via shralex) > > > Modified: > zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java > URL: > http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1747408&r1=1747407&r2=1747408&view=diff > > ============================================================================== > --- > zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java > (original) > +++ > zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java > Wed Jun 8 15:43:15 2016 > @@ -237,9 +237,7 @@ public class QuorumCnxManager { > * @param sid > */ > public void testInitiateConnection(long sid) throws Exception { > - if (LOG.isDebugEnabled()) { > - LOG.debug("Opening channel to server " + sid); > - } > + LOG.debug("Opening channel to server " + sid); > Socket sock = new Socket(); > setSockOpts(sock); > sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO); > @@ -434,17 +432,14 @@ public class QuorumCnxManager { > LOG.debug("There is a connection already for server " + sid); > return true; > } > - try { > > - if (LOG.isDebugEnabled()) { > - LOG.debug("Opening channel to server " + sid); > - } > - Socket sock = new Socket(); > + Socket sock = null; > + try { > + LOG.debug("Opening channel to server " + sid); > + sock = new Socket(); > setSockOpts(sock); > sock.connect(electionAddr, cnxTO); > - if (LOG.isDebugEnabled()) { > - LOG.debug("Connected to server " + sid); > - } > + LOG.debug("Connected to server " + sid); > initiateConnection(sock, sid); > return true; > } catch (UnresolvedAddressException e) { > @@ -454,11 +449,13 @@ public class QuorumCnxManager { > // detail. > LOG.warn("Cannot open channel to " + sid > + " at election address " + electionAddr, e); > + closeSocket(sock); > throw e; > } catch (IOException e) { > LOG.warn("Cannot open channel to " + sid > + " at election address " + electionAddr, > e); > + closeSocket(sock); > return false; > } > > @@ -574,6 +571,10 @@ public class QuorumCnxManager { > * Reference to socket > */ > private void closeSocket(Socket sock) { > + if (sock == null) { > + return; > + } > + > try { > sock.close(); > } catch (IOException ie) { > @@ -614,7 +615,7 @@ public class QuorumCnxManager { > public void run() { > int numRetries = 0; > InetSocketAddress addr; > - > + Socket client = null; > while((!shutdown) && (numRetries < 3)){ > try { > ss = new ServerSocket(); > @@ -632,7 +633,7 @@ public class QuorumCnxManager { > setName(addr.toString()); > ss.bind(addr); > while (!shutdown) { > - Socket client = ss.accept(); > + client = ss.accept(); > setSockOpts(client); > LOG.info("Received connection request " > + client.getRemoteSocketAddress()); > @@ -654,6 +655,7 @@ public class QuorumCnxManager { > LOG.error("Interrupted while sleeping. " + > "Ignoring exception", ie); > } > + closeSocket(client); > } > } > LOG.info("Leaving listener"); > @@ -739,9 +741,7 @@ public class QuorumCnxManager { > } > > synchronized boolean finish() { > - if (LOG.isDebugEnabled()) { > - LOG.debug("Calling finish for " + sid); > - } > + LOG.debug("Calling finish for " + sid); > > if(!running){ > /* > @@ -752,16 +752,14 @@ public class QuorumCnxManager { > > running = false; > closeSocket(sock); > - // channel = null; > > this.interrupt(); > if (recvWorker != null) { > recvWorker.finish(); > } > > - if (LOG.isDebugEnabled()) { > - LOG.debug("Removing entry from senderWorkerMap sid=" + > sid); > - } > + LOG.debug("Removing entry from senderWorkerMap sid=" + sid); > + > senderWorkerMap.remove(sid, this); > threadCnt.decrementAndGet(); > return running; > @@ -919,9 +917,7 @@ public class QuorumCnxManager { > } finally { > LOG.warn("Interrupting SendWorker"); > sw.finish(); > - if (sock != null) { > - closeSocket(sock); > - } > + closeSocket(sock); > } > } > } > > >
