Repository: geode Updated Branches: refs/heads/develop 36665aef4 -> 3acb47e60
GEODE-3286: Failing to cleanup connections from ConnectionTable receiver table - prevent adding a closed connection to the connection table's receivers - add a new unit test for connection table - adding connection factory object for creating receiving connections - have the idle connection timeout ensure connections are removed from connection table receivers - modify tcpConduit stat accesses to allow easier mocking Signed-off-by: Hitesh Khamesra <hitesh...@yahoo.com> Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/8aed2684 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/8aed2684 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/8aed2684 Branch: refs/heads/develop Commit: 8aed26846de6e9ff1c123acae98a7b5ce6d82a83 Parents: e7515f5 Author: Brian Rowe <br...@pivotal.io> Authored: Tue Jul 25 15:43:35 2017 -0700 Committer: Hitesh Khamesra <hitesh...@yahoo.com> Committed: Tue Jul 25 15:43:35 2017 -0700 ---------------------------------------------------------------------- .../apache/geode/internal/tcp/Connection.java | 118 +++++++++++-------- .../geode/internal/tcp/ConnectionTable.java | 52 ++++---- .../apache/geode/internal/tcp/MsgReader.java | 2 +- .../internal/tcp/PeerConnectionFactory.java | 32 +++++ .../apache/geode/internal/tcp/TCPConduit.java | 30 +++-- .../geode/internal/tcp/ConnectionTableTest.java | 66 +++++++++++ 6 files changed, 213 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 1afe6ff..9b1a10a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -510,22 +510,22 @@ public class Connection implements Runnable { * creates a connection that we accepted (it was initiated by an explicit connect being done on * the other side). We will only receive data on this socket; never send. */ - protected static Connection createReceiver(ConnectionTable t, Socket s) + protected static Connection createReceiver(ConnectionTable table, Socket socket) throws IOException, ConnectionException { - Connection c = new Connection(t, s); + Connection connection = new Connection(table, socket); boolean readerStarted = false; try { - c.startReader(t); + connection.startReader(table); readerStarted = true; } finally { if (!readerStarted) { - c.closeForReconnect( + connection.closeForReconnect( LocalizedStrings.Connection_COULD_NOT_START_READER_THREAD.toLocalizedString()); } } - c.waitForHandshake(); - c.finishedConnecting = true; - return c; + connection.waitForHandshake(); + connection.finishedConnecting = true; + return connection; } /** @@ -568,6 +568,12 @@ public class Connection implements Runnable { } } + protected void initRecevier() { + this.startReader(owner); + this.waitForHandshake(); + this.finishedConnecting = true; + } + void setIdleTimeoutTask(SystemTimerTask task) { this.idleTask = task; } @@ -591,7 +597,7 @@ public class Connection implements Runnable { this.accessed = false; if (isIdle) { this.timedOut = true; - this.owner.getConduit().stats.incLostLease(); + this.owner.getConduit().getStats().incLostLease(); if (logger.isDebugEnabled()) { logger.debug("Closing idle connection {} shared={} ordered={}", this, this.sharedResource, this.preserveOrder); @@ -1059,7 +1065,7 @@ public class Connection implements Runnable { LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, remoteAddr)); } - t.getConduit().stats.incReconnectAttempts(); + t.getConduit().getStats().incReconnectAttempts(); } // create connection try { @@ -1086,7 +1092,7 @@ public class Connection implements Runnable { } // IOException finally { if (conn == null) { - t.getConduit().stats.incFailedConnect(); + t.getConduit().getStats().incFailedConnect(); } } if (conn != null) { @@ -1322,6 +1328,14 @@ public class Connection implements Runnable { this.batchFlusher.start(); } + public void onIdleCancel() { + // Make sure receivers are removed from the connection table, this should always be a noop, but + // is done here as a failsafe. + if (isReceiver) { + owner.removeReceiver(this); + } + } + private class BatchBufferFlusher extends Thread { private volatile boolean flushNeeded = false; private volatile boolean timeToStop = false; @@ -1330,7 +1344,7 @@ public class Connection implements Runnable { public BatchBufferFlusher() { setDaemon(true); - this.stats = owner.getConduit().stats; + this.stats = owner.getConduit().getStats(); } /** @@ -1367,7 +1381,7 @@ public class Connection implements Runnable { } // while } } finally { - owner.getConduit().stats.incBatchWaitTime(start); + owner.getConduit().getStats().incBatchWaitTime(start); } } @@ -1456,7 +1470,7 @@ public class Connection implements Runnable { if (src.remaining() <= dst.remaining()) { final long copyStart = DistributionStats.getStatTime(); dst.put(src); - this.owner.getConduit().stats.incBatchCopyTime(copyStart); + this.owner.getConduit().getStats().incBatchCopyTime(copyStart); return; } } @@ -1465,7 +1479,7 @@ public class Connection implements Runnable { this.batchFlusher.flushBuffer(dst); } while (true); } finally { - this.owner.getConduit().stats.incBatchSendTime(start); + this.owner.getConduit().getStats().incBatchSendTime(start); } } @@ -1537,7 +1551,7 @@ public class Connection implements Runnable { this.connected = false; closeSenderSem(); { - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); if (this.finishedConnecting) { if (this.isReceiver) { stats.decReceivers(); @@ -1684,7 +1698,7 @@ public class Connection implements Runnable { initiateSuspicionIfSharedUnordered(); if (this.isReceiver) { if (!this.sharedResource) { - this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get()); + this.conduit.getStats().incThreadOwnedReceivers(-1L, dominoCount.get()); } asyncClose(false); this.owner.removeAndCloseThreadOwnedSockets(); @@ -1692,7 +1706,7 @@ public class Connection implements Runnable { ByteBuffer tmp = this.nioInputBuffer; if (tmp != null) { this.nioInputBuffer = null; - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); Buffers.releaseReceiveBuffer(tmp, stats); } // make sure that if the reader thread exits we notify a thread waiting @@ -1960,7 +1974,7 @@ public class Connection implements Runnable { if (result != null) { this.idleMsgDestreamer = null; } else { - result = new MsgDestreamer(this.owner.getConduit().stats, + result = new MsgDestreamer(this.owner.getConduit().getStats(), this.conduit.getCancelCriterion(), v); } result.setName(p2pReaderName() + " msgId=" + msgId); @@ -2019,7 +2033,7 @@ public class Connection implements Runnable { } } - byte[] lenbytes = new byte[MSG_HEADER_BYTES]; + byte[] headerBytes = new byte[MSG_HEADER_BYTES]; final ByteArrayDataInput dis = new ByteArrayDataInput(); while (!stopped) { @@ -2040,20 +2054,20 @@ public class Connection implements Runnable { break; } int len = 0; - if (readFully(input, lenbytes, lenbytes.length) < 0) { + if (readFully(input, headerBytes, headerBytes.length) < 0) { stopped = true; continue; } // long recvNanos = DistributionStats.getStatTime(); - len = ((lenbytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000) - + ((lenbytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000) - + ((lenbytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100) - + (lenbytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff); + len = ((headerBytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000) + + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000) + + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100) + + (headerBytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff); /* byte msgHdrVersion = */ calcHdrVersion(len); len = calcMsgByteSize(len); - int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET]; - short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8) - + (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff)); + int msgType = headerBytes[MSG_HEADER_TYPE_OFFSET]; + short msgId = (short) (((headerBytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8) + + (headerBytes[MSG_HEADER_ID_OFFSET + 1] & 0xff)); boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0; if (myDirectAck) { msgType &= ~DIRECT_ACK_BIT; // clear the bit @@ -2080,14 +2094,14 @@ public class Connection implements Runnable { if (msgType == NORMAL_MSG_TYPE) { // DMStats stats = this.owner.getConduit().stats; // long start = DistributionStats.getStatTime(); - this.owner.getConduit().stats.incMessagesBeingReceived(true, len); + this.owner.getConduit().getStats().incMessagesBeingReceived(true, len); dis.initialize(bytes, this.remoteVersion); DistributionMessage msg = null; try { ReplyProcessor21.initMessageRPId(); - long startSer = this.owner.getConduit().stats.startMsgDeserialization(); + long startSer = this.owner.getConduit().getStats().startMsgDeserialization(); msg = (DistributionMessage) InternalDataSerializer.readDSFID(dis); - this.owner.getConduit().stats.endMsgDeserialization(startSer); + this.owner.getConduit().getStats().endMsgDeserialization(startSer); if (dis.available() != 0) { logger.warn(LocalizedMessage.create( LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES, @@ -2142,7 +2156,7 @@ public class Connection implements Runnable { } } else if (msgType == CHUNKED_MSG_TYPE) { MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion); - this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len); + this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len); try { md.addChunk(bytes); } catch (IOException ex) { @@ -2151,7 +2165,7 @@ public class Connection implements Runnable { } } else /* (messageType == END_CHUNKED_MSG_TYPE) */ { MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion); - this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len); + this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len); try { md.addChunk(bytes); } catch (IOException ex) { @@ -2166,13 +2180,13 @@ public class Connection implements Runnable { try { msg = md.getMessage(); } catch (ClassNotFoundException ex) { - this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); + this.owner.getConduit().getStats().decMessagesBeingReceived(md.size()); failureEx = ex; rpId = md.getRPid(); logger.warn(LocalizedMessage .create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex)); } catch (IOException ex) { - this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); + this.owner.getConduit().getStats().decMessagesBeingReceived(md.size()); failureMsg = LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE .toLocalizedString(); failureEx = ex; @@ -2194,7 +2208,7 @@ public class Connection implements Runnable { // error condition, so you also need to check to see if the JVM // is still usable: SystemFailure.checkFailure(); - this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); + this.owner.getConduit().getStats().decMessagesBeingReceived(md.size()); failureMsg = LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE .toLocalizedString(); failureEx = ex; @@ -2333,7 +2347,7 @@ public class Connection implements Runnable { // logger.fine("thread-owned receiver with domino count of " + dominoNumber + " // will prefer shared sockets"); } - this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber); + this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber); } if (logger.isDebugEnabled()) { @@ -2704,7 +2718,7 @@ public class Connection implements Runnable { private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force) throws ConnectionException { - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); long start = DistributionStats.getStatTime(); try { ConflationKey ck = null; @@ -2864,7 +2878,7 @@ public class Connection implements Runnable { private ByteBuffer takeFromOutgoingQueue() throws InterruptedException { ByteBuffer result = null; - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); long start = DistributionStats.getStatTime(); try { synchronized (this.outgoingQueue) { @@ -2965,7 +2979,7 @@ public class Connection implements Runnable { */ protected void runNioPusher() { try { - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); final long threadStart = stats.startAsyncThread(); try { stats.incAsyncQueues(1); @@ -3279,7 +3293,7 @@ public class Connection implements Runnable { */ protected void nioWriteFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync, DistributionMessage msg) throws IOException, ConnectionException { - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); if (!this.sharedResource) { stats.incTOSentMsg(); } @@ -3318,7 +3332,7 @@ public class Connection implements Runnable { /** gets the buffer for receiving message length bytes */ protected ByteBuffer getNIOBuffer() { - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); if (nioInputBuffer == null) { int allocSize = this.recvBufferSize; if (allocSize == -1) { @@ -3374,7 +3388,7 @@ public class Connection implements Runnable { boolean origSocketInUse = this.socketInUse; this.socketInUse = true; MsgReader msgReader = null; - DMStats stats = owner.getConduit().stats; + DMStats stats = owner.getConduit().getStats(); final Version version = getRemoteVersion(); try { if (useNIO()) { @@ -3518,7 +3532,7 @@ public class Connection implements Runnable { nioInputBuffer.limit(startPos + nioMessageLength); if (this.handshakeRead) { if (nioMessageType == NORMAL_MSG_TYPE) { - this.owner.getConduit().stats.incMessagesBeingReceived(true, nioMessageLength); + this.owner.getConduit().getStats().incMessagesBeingReceived(true, nioMessageLength); ByteBufferInputStream bbis = remoteVersion == null ? new ByteBufferInputStream(nioInputBuffer) : new VersionedByteBufferInputStream(nioInputBuffer, remoteVersion); @@ -3526,9 +3540,9 @@ public class Connection implements Runnable { try { ReplyProcessor21.initMessageRPId(); // add serialization stats - long startSer = this.owner.getConduit().stats.startMsgDeserialization(); + long startSer = this.owner.getConduit().getStats().startMsgDeserialization(); msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis); - this.owner.getConduit().stats.endMsgDeserialization(startSer); + this.owner.getConduit().getStats().endMsgDeserialization(startSer); if (bbis.available() != 0) { logger.warn(LocalizedMessage.create( LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES, @@ -3593,7 +3607,7 @@ public class Connection implements Runnable { } } else if (nioMessageType == CHUNKED_MSG_TYPE) { MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion); - this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, + this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, nioMessageLength); try { md.addChunk(nioInputBuffer, nioMessageLength); @@ -3604,7 +3618,7 @@ public class Connection implements Runnable { } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ { // logger.info("END_CHUNK msgId="+nioMsgId); MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion); - this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, + this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, nioMessageLength); try { md.addChunk(nioInputBuffer, nioMessageLength); @@ -3621,7 +3635,7 @@ public class Connection implements Runnable { try { msg = md.getMessage(); } catch (ClassNotFoundException ex) { - this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); + this.owner.getConduit().getStats().decMessagesBeingReceived(md.size()); failureMsg = LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE .toLocalizedString(); failureEx = ex; @@ -3629,7 +3643,7 @@ public class Connection implements Runnable { logger.fatal(LocalizedMessage .create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex)); } catch (IOException ex) { - this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); + this.owner.getConduit().getStats().decMessagesBeingReceived(md.size()); failureMsg = LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE .toLocalizedString(); failureEx = ex; @@ -3652,7 +3666,7 @@ public class Connection implements Runnable { // is still usable: SystemFailure.checkFailure(); this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); - this.owner.getConduit().stats.decMessagesBeingReceived(md.size()); + this.owner.getConduit().getStats().decMessagesBeingReceived(md.size()); failureMsg = LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE .toLocalizedString(); failureEx = ex; @@ -3822,7 +3836,7 @@ public class Connection implements Runnable { // } else { // ConnectionTable.threadWantsSharedResources(); } - this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber); + this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber); // Because this thread is not shared resource, it will be used for direct // ack. Direct ack messages can be large. This call will resize the send // buffer. @@ -3915,7 +3929,7 @@ public class Connection implements Runnable { private void compactOrResizeBuffer(int messageLength) { final int oldBufferSize = nioInputBuffer.capacity(); - final DMStats stats = this.owner.getConduit().stats; + final DMStats stats = this.owner.getConduit().getStats(); int allocSize = messageLength + MSG_HEADER_BYTES; if (oldBufferSize < allocSize) { // need a bigger buffer http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java index c55af82..affe5cd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java @@ -114,6 +114,7 @@ public class ConnectionTable { * acks, will be put in this map. */ protected final Map unorderedConnectionMap = new ConcurrentHashMap(); + /** * Used for all accepted connections. These connections are read only; we never send messages, * except for acks; only receive. @@ -201,14 +202,14 @@ public class ConnectionTable { } - private ConnectionTable(TCPConduit c) throws IOException { - this.owner = c; + private ConnectionTable(TCPConduit conduit) throws IOException { + this.owner = conduit; this.idleConnTimer = (this.owner.idleConnectionTimeout != 0) - ? new SystemTimer(c.getDM().getSystem(), true) : null; + ? new SystemTimer(conduit.getDM().getSystem(), true) : null; this.threadOrderedConnMap = new ThreadLocal(); this.threadConnMaps = new ArrayList(); this.threadConnectionMap = new ConcurrentHashMap(); - this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets()); + this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets()); this.socketCloser = new SocketCloser(); } @@ -248,14 +249,14 @@ public class ConnectionTable { // } /** conduit calls acceptConnection after an accept */ - protected void acceptConnection(Socket sock) throws IOException, ConnectionException { - Connection connection = null; + protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory) + throws IOException, ConnectionException, InterruptedException { InetAddress connAddress = sock.getInetAddress(); // for bug 44736 boolean finishedConnecting = false; - Connection conn = null; + Connection connection = null; // boolean exceptionLogged = false; try { - conn = Connection.createReceiver(this, sock); + connection = peerConnectionFactory.createReceiver(this, sock); // check for shutdown (so it doesn't get missed in the finally block) this.owner.getCancelCriterion().checkCancelInProgress(null); @@ -279,26 +280,29 @@ public class ConnectionTable { // in our caller. // no need to log error here since caller will log warning - if (conn != null && !finishedConnecting) { + if (connection != null && !finishedConnecting) { // we must be throwing from checkCancelInProgress so close the connection - closeCon(LocalizedStrings.ConnectionTable_CANCEL_AFTER_ACCEPT.toLocalizedString(), conn); - conn = null; + closeCon(LocalizedStrings.ConnectionTable_CANCEL_AFTER_ACCEPT.toLocalizedString(), + connection); + connection = null; } } - if (conn != null) { + if (connection != null) { synchronized (this.receivers) { - this.owner.stats.incReceivers(); + this.owner.getStats().incReceivers(); if (this.closed) { closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_NO_LONGER_IN_USE - .toLocalizedString(), conn); + .toLocalizedString(), connection); return; } - this.receivers.add(conn); + if (!connection.isSocketClosed()) { + this.receivers.add(connection); + } } if (logger.isDebugEnabled()) { - logger.debug("Accepted {} myAddr={} theirAddr={}", conn, getConduit().getMemberId(), - conn.remoteAddr); + logger.debug("Accepted {} myAddr={} theirAddr={}", connection, getConduit().getMemberId(), + connection.remoteAddr); } } } @@ -328,11 +332,11 @@ public class ConnectionTable { try { con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, id, sharedResource, startTime, ackThreshold, ackSAThreshold); - this.owner.stats.incSenders(sharedResource, preserveOrder); + this.owner.getStats().incSenders(sharedResource, preserveOrder); } finally { // our connection failed to notify anyone waiting for our pending con if (con == null) { - this.owner.stats.incFailedConnect(); + this.owner.getStats().incFailedConnect(); synchronized (m) { Object rmObj = m.remove(id); if (rmObj != pc && rmObj != null) { @@ -521,7 +525,7 @@ public class ConnectionTable { if (logger.isDebugEnabled()) { logger.debug("ConnectionTable: created an ordered connection: {}", result); } - this.owner.stats.incSenders(false/* shared */, true /* preserveOrder */); + this.owner.getStats().incSenders(false/* shared */, true /* preserveOrder */); // Update the list of connections owned by this thread.... @@ -1309,6 +1313,10 @@ public class ConnectionTable { @Override public boolean cancel() { + Connection con = this.c; + if (con != null) { + con.onIdleCancel(); + } this.c = null; return super.cancel(); } @@ -1355,5 +1363,7 @@ public class ConnectionTable { } } - + public int getNumberOfReceivers() { + return receivers.size(); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java index be1f533..1bbfd08 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java @@ -94,7 +94,7 @@ public abstract class MsgReader { public abstract ByteBuffer readAtLeast(int bytes) throws IOException; protected DMStats getStats() { - return conn.getConduit().stats; + return conn.getConduit().getStats(); } public static class Header { http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java new file mode 100644 index 0000000..7bf9638 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.tcp; + +import java.io.IOException; +import java.net.Socket; + +public class PeerConnectionFactory { + /** + * creates a connection that we accepted (it was initiated by an explicit connect being done on + * the other side). We will only receive data on this socket; never send. + */ + public Connection createReceiver(ConnectionTable table, Socket socket) + throws IOException, ConnectionException { + Connection connection = new Connection(table, socket); + connection.initRecevier(); + return connection; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java index 991aaf7..c52a676 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java @@ -20,7 +20,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketAddress; import java.net.SocketException; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedChannelException; @@ -189,10 +188,8 @@ public class TCPConduit implements Runnable { * the object that receives DistributionMessage messages received by this conduit. */ private final DirectChannel directChannel; - /** - * Stats from the delegate - */ - DMStats stats; + + private DMStats stats; /** * Config from the delegate @@ -260,7 +257,7 @@ public class TCPConduit implements Runnable { this.stats = directChannel.getDMStats(); this.config = directChannel.getDMConfig(); } - if (this.stats == null) { + if (this.getStats() == null) { this.stats = new LonerDistributionManager.DummyDMStats(); } @@ -642,7 +639,7 @@ public class TCPConduit implements Runnable { if (directChannel != null) { this.stats = directChannel.getDMStats(); } - if (this.stats == null) { + if (this.getStats() == null) { this.stats = new LonerDistributionManager.DummyDMStats(); } try { @@ -745,7 +742,7 @@ public class TCPConduit implements Runnable { } } } else { - this.stats.incFailedAccept(); + this.getStats().incFailedAccept(); if (e instanceof IOException && "Too many open files".equals(e.getMessage())) { getConTable().fileDescriptorsExhausted(); } else { @@ -800,16 +797,16 @@ public class TCPConduit implements Runnable { protected void basicAcceptConnection(Socket othersock) { try { - getConTable().acceptConnection(othersock); + getConTable().acceptConnection(othersock, new PeerConnectionFactory()); } catch (IOException io) { // exception is logged by the Connection if (!stopped) { - this.stats.incFailedAccept(); + this.getStats().incFailedAccept(); } } catch (ConnectionException ex) { // exception is logged by the Connection if (!stopped) { - this.stats.incFailedAccept(); + this.getStats().incFailedAccept(); } } catch (CancelException e) { } catch (Exception e) { @@ -820,7 +817,7 @@ public class TCPConduit implements Runnable { // } // else { - this.stats.incFailedAccept(); + this.getStats().incFailedAccept(); logger.warn(LocalizedMessage.create( LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1, new Object[] {othersock.getInetAddress(), e}), e); @@ -977,7 +974,7 @@ public class TCPConduit implements Runnable { } // Close the connection (it will get rebuilt later). - this.stats.incReconnectAttempts(); + this.getStats().incReconnectAttempts(); if (conn != null) { try { if (logger.isDebugEnabled()) { @@ -1172,6 +1169,13 @@ public class TCPConduit implements Runnable { return (ct != null) && ct.hasReceiversFor(endPoint); } + /** + * Stats from the delegate + */ + public DMStats getStats() { + return stats; + } + protected class Stopper extends CancelCriterion { /* http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java new file mode 100644 index 0000000..312c64d --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.tcp; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DMStats; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.net.Socket; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +@Category(UnitTest.class) +public class ConnectionTableTest { + + @Test + public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws Exception { + InternalDistributedSystem system = mock(InternalDistributedSystem.class); + when(system.isShareSockets()).thenReturn(false); + + DM dm = mock(DM.class); + when(dm.getSystem()).thenReturn(system); + + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + DMStats dmStats = mock(DMStats.class); + + TCPConduit tcpConduit = mock(TCPConduit.class); + when(tcpConduit.getDM()).thenReturn(dm); + when(tcpConduit.getCancelCriterion()).thenReturn(cancelCriterion); + when(tcpConduit.getStats()).thenReturn(dmStats); + + Connection connection = mock(Connection.class); + when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed as soon at it was + // created + + Socket socket = mock(Socket.class); + + ConnectionTable table = ConnectionTable.create(tcpConduit); + + PeerConnectionFactory factory = mock(PeerConnectionFactory.class); + when(factory.createReceiver(table, socket)).thenReturn(connection); + + table.acceptConnection(socket, factory); + assertEquals(0, table.getNumberOfReceivers()); + } +}