Repository: geode
Updated Branches:
  refs/heads/develop 36665aef4 -> 3acb47e60


GEODE-3286: Failing to cleanup connections from ConnectionTable receiver table

- prevent adding a closed connection to the connection table's receivers
- add a new unit test for connection table
- adding connection factory object for creating receiving connections
- have the idle connection timeout ensure connections are removed from 
connection
  table receivers
- modify tcpConduit stat accesses to allow easier mocking

Signed-off-by: Hitesh Khamesra <hitesh...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/8aed2684
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/8aed2684
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/8aed2684

Branch: refs/heads/develop
Commit: 8aed26846de6e9ff1c123acae98a7b5ce6d82a83
Parents: e7515f5
Author: Brian Rowe <br...@pivotal.io>
Authored: Tue Jul 25 15:43:35 2017 -0700
Committer: Hitesh Khamesra <hitesh...@yahoo.com>
Committed: Tue Jul 25 15:43:35 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/internal/tcp/Connection.java   | 118 +++++++++++--------
 .../geode/internal/tcp/ConnectionTable.java     |  52 ++++----
 .../apache/geode/internal/tcp/MsgReader.java    |   2 +-
 .../internal/tcp/PeerConnectionFactory.java     |  32 +++++
 .../apache/geode/internal/tcp/TCPConduit.java   |  30 +++--
 .../geode/internal/tcp/ConnectionTableTest.java |  66 +++++++++++
 6 files changed, 213 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 1afe6ff..9b1a10a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -510,22 +510,22 @@ public class Connection implements Runnable {
    * creates a connection that we accepted (it was initiated by an explicit 
connect being done on
    * the other side). We will only receive data on this socket; never send.
    */
-  protected static Connection createReceiver(ConnectionTable t, Socket s)
+  protected static Connection createReceiver(ConnectionTable table, Socket 
socket)
       throws IOException, ConnectionException {
-    Connection c = new Connection(t, s);
+    Connection connection = new Connection(table, socket);
     boolean readerStarted = false;
     try {
-      c.startReader(t);
+      connection.startReader(table);
       readerStarted = true;
     } finally {
       if (!readerStarted) {
-        c.closeForReconnect(
+        connection.closeForReconnect(
             
LocalizedStrings.Connection_COULD_NOT_START_READER_THREAD.toLocalizedString());
       }
     }
-    c.waitForHandshake();
-    c.finishedConnecting = true;
-    return c;
+    connection.waitForHandshake();
+    connection.finishedConnecting = true;
+    return connection;
   }
 
   /**
@@ -568,6 +568,12 @@ public class Connection implements Runnable {
     }
   }
 
+  protected void initRecevier() {
+    this.startReader(owner);
+    this.waitForHandshake();
+    this.finishedConnecting = true;
+  }
+
   void setIdleTimeoutTask(SystemTimerTask task) {
     this.idleTask = task;
   }
@@ -591,7 +597,7 @@ public class Connection implements Runnable {
     this.accessed = false;
     if (isIdle) {
       this.timedOut = true;
-      this.owner.getConduit().stats.incLostLease();
+      this.owner.getConduit().getStats().incLostLease();
       if (logger.isDebugEnabled()) {
         logger.debug("Closing idle connection {} shared={} ordered={}", this, 
this.sharedResource,
             this.preserveOrder);
@@ -1059,7 +1065,7 @@ public class Connection implements Runnable {
                 
LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0,
                 remoteAddr));
           }
-          t.getConduit().stats.incReconnectAttempts();
+          t.getConduit().getStats().incReconnectAttempts();
         }
         // create connection
         try {
@@ -1086,7 +1092,7 @@ public class Connection implements Runnable {
         } // IOException
         finally {
           if (conn == null) {
-            t.getConduit().stats.incFailedConnect();
+            t.getConduit().getStats().incFailedConnect();
           }
         }
         if (conn != null) {
@@ -1322,6 +1328,14 @@ public class Connection implements Runnable {
     this.batchFlusher.start();
   }
 
+  public void onIdleCancel() {
+    // Make sure receivers are removed from the connection table, this should 
always be a noop, but
+    // is done here as a failsafe.
+    if (isReceiver) {
+      owner.removeReceiver(this);
+    }
+  }
+
   private class BatchBufferFlusher extends Thread {
     private volatile boolean flushNeeded = false;
     private volatile boolean timeToStop = false;
@@ -1330,7 +1344,7 @@ public class Connection implements Runnable {
 
     public BatchBufferFlusher() {
       setDaemon(true);
-      this.stats = owner.getConduit().stats;
+      this.stats = owner.getConduit().getStats();
     }
 
     /**
@@ -1367,7 +1381,7 @@ public class Connection implements Runnable {
           } // while
         }
       } finally {
-        owner.getConduit().stats.incBatchWaitTime(start);
+        owner.getConduit().getStats().incBatchWaitTime(start);
       }
     }
 
@@ -1456,7 +1470,7 @@ public class Connection implements Runnable {
           if (src.remaining() <= dst.remaining()) {
             final long copyStart = DistributionStats.getStatTime();
             dst.put(src);
-            this.owner.getConduit().stats.incBatchCopyTime(copyStart);
+            this.owner.getConduit().getStats().incBatchCopyTime(copyStart);
             return;
           }
         }
@@ -1465,7 +1479,7 @@ public class Connection implements Runnable {
         this.batchFlusher.flushBuffer(dst);
       } while (true);
     } finally {
-      this.owner.getConduit().stats.incBatchSendTime(start);
+      this.owner.getConduit().getStats().incBatchSendTime(start);
     }
   }
 
@@ -1537,7 +1551,7 @@ public class Connection implements Runnable {
           this.connected = false;
           closeSenderSem();
           {
-            final DMStats stats = this.owner.getConduit().stats;
+            final DMStats stats = this.owner.getConduit().getStats();
             if (this.finishedConnecting) {
               if (this.isReceiver) {
                 stats.decReceivers();
@@ -1684,7 +1698,7 @@ public class Connection implements Runnable {
       initiateSuspicionIfSharedUnordered();
       if (this.isReceiver) {
         if (!this.sharedResource) {
-          this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
+          this.conduit.getStats().incThreadOwnedReceivers(-1L, 
dominoCount.get());
         }
         asyncClose(false);
         this.owner.removeAndCloseThreadOwnedSockets();
@@ -1692,7 +1706,7 @@ public class Connection implements Runnable {
       ByteBuffer tmp = this.nioInputBuffer;
       if (tmp != null) {
         this.nioInputBuffer = null;
-        final DMStats stats = this.owner.getConduit().stats;
+        final DMStats stats = this.owner.getConduit().getStats();
         Buffers.releaseReceiveBuffer(tmp, stats);
       }
       // make sure that if the reader thread exits we notify a thread waiting
@@ -1960,7 +1974,7 @@ public class Connection implements Runnable {
         if (result != null) {
           this.idleMsgDestreamer = null;
         } else {
-          result = new MsgDestreamer(this.owner.getConduit().stats,
+          result = new MsgDestreamer(this.owner.getConduit().getStats(),
               this.conduit.getCancelCriterion(), v);
         }
         result.setName(p2pReaderName() + " msgId=" + msgId);
@@ -2019,7 +2033,7 @@ public class Connection implements Runnable {
       }
     }
 
-    byte[] lenbytes = new byte[MSG_HEADER_BYTES];
+    byte[] headerBytes = new byte[MSG_HEADER_BYTES];
 
     final ByteArrayDataInput dis = new ByteArrayDataInput();
     while (!stopped) {
@@ -2040,20 +2054,20 @@ public class Connection implements Runnable {
           break;
         }
         int len = 0;
-        if (readFully(input, lenbytes, lenbytes.length) < 0) {
+        if (readFully(input, headerBytes, headerBytes.length) < 0) {
           stopped = true;
           continue;
         }
         // long recvNanos = DistributionStats.getStatTime();
-        len = ((lenbytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000)
-            + ((lenbytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000)
-            + ((lenbytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100)
-            + (lenbytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff);
+        len = ((headerBytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000)
+            + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000)
+            + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100)
+            + (headerBytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff);
         /* byte msgHdrVersion = */ calcHdrVersion(len);
         len = calcMsgByteSize(len);
-        int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
-        short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
-            + (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
+        int msgType = headerBytes[MSG_HEADER_TYPE_OFFSET];
+        short msgId = (short) (((headerBytes[MSG_HEADER_ID_OFFSET] & 0xff) << 
8)
+            + (headerBytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
         boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
         if (myDirectAck) {
           msgType &= ~DIRECT_ACK_BIT; // clear the bit
@@ -2080,14 +2094,14 @@ public class Connection implements Runnable {
             if (msgType == NORMAL_MSG_TYPE) {
               // DMStats stats = this.owner.getConduit().stats;
               // long start = DistributionStats.getStatTime();
-              this.owner.getConduit().stats.incMessagesBeingReceived(true, 
len);
+              
this.owner.getConduit().getStats().incMessagesBeingReceived(true, len);
               dis.initialize(bytes, this.remoteVersion);
               DistributionMessage msg = null;
               try {
                 ReplyProcessor21.initMessageRPId();
-                long startSer = 
this.owner.getConduit().stats.startMsgDeserialization();
+                long startSer = 
this.owner.getConduit().getStats().startMsgDeserialization();
                 msg = (DistributionMessage) 
InternalDataSerializer.readDSFID(dis);
-                this.owner.getConduit().stats.endMsgDeserialization(startSer);
+                
this.owner.getConduit().getStats().endMsgDeserialization(startSer);
                 if (dis.available() != 0) {
                   logger.warn(LocalizedMessage.create(
                       
LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
@@ -2142,7 +2156,7 @@ public class Connection implements Runnable {
               }
             } else if (msgType == CHUNKED_MSG_TYPE) {
               MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
-              this.owner.getConduit().stats.incMessagesBeingReceived(md.size() 
== 0, len);
+              
this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, 
len);
               try {
                 md.addChunk(bytes);
               } catch (IOException ex) {
@@ -2151,7 +2165,7 @@ public class Connection implements Runnable {
               }
             } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
               MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
-              this.owner.getConduit().stats.incMessagesBeingReceived(md.size() 
== 0, len);
+              
this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, 
len);
               try {
                 md.addChunk(bytes);
               } catch (IOException ex) {
@@ -2166,13 +2180,13 @@ public class Connection implements Runnable {
               try {
                 msg = md.getMessage();
               } catch (ClassNotFoundException ex) {
-                
this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+                
this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
                 failureEx = ex;
                 rpId = md.getRPid();
                 logger.warn(LocalizedMessage
                     
.create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex));
               } catch (IOException ex) {
-                
this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+                
this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
                 failureMsg = 
LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE
                     .toLocalizedString();
                 failureEx = ex;
@@ -2194,7 +2208,7 @@ public class Connection implements Runnable {
                 // error condition, so you also need to check to see if the JVM
                 // is still usable:
                 SystemFailure.checkFailure();
-                
this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+                
this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
                 failureMsg = 
LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE
                     .toLocalizedString();
                 failureEx = ex;
@@ -2333,7 +2347,7 @@ public class Connection implements Runnable {
                   // logger.fine("thread-owned receiver with domino count of " 
+ dominoNumber + "
                   // will prefer shared sockets");
                 }
-                this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
+                this.conduit.getStats().incThreadOwnedReceivers(1L, 
dominoNumber);
               }
 
               if (logger.isDebugEnabled()) {
@@ -2704,7 +2718,7 @@ public class Connection implements Runnable {
 
   private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, 
boolean force)
       throws ConnectionException {
-    final DMStats stats = this.owner.getConduit().stats;
+    final DMStats stats = this.owner.getConduit().getStats();
     long start = DistributionStats.getStatTime();
     try {
       ConflationKey ck = null;
@@ -2864,7 +2878,7 @@ public class Connection implements Runnable {
 
   private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
     ByteBuffer result = null;
-    final DMStats stats = this.owner.getConduit().stats;
+    final DMStats stats = this.owner.getConduit().getStats();
     long start = DistributionStats.getStatTime();
     try {
       synchronized (this.outgoingQueue) {
@@ -2965,7 +2979,7 @@ public class Connection implements Runnable {
    */
   protected void runNioPusher() {
     try {
-      final DMStats stats = this.owner.getConduit().stats;
+      final DMStats stats = this.owner.getConduit().getStats();
       final long threadStart = stats.startAsyncThread();
       try {
         stats.incAsyncQueues(1);
@@ -3279,7 +3293,7 @@ public class Connection implements Runnable {
    */
   protected void nioWriteFully(SocketChannel channel, ByteBuffer buffer, 
boolean forceAsync,
       DistributionMessage msg) throws IOException, ConnectionException {
-    final DMStats stats = this.owner.getConduit().stats;
+    final DMStats stats = this.owner.getConduit().getStats();
     if (!this.sharedResource) {
       stats.incTOSentMsg();
     }
@@ -3318,7 +3332,7 @@ public class Connection implements Runnable {
 
   /** gets the buffer for receiving message length bytes */
   protected ByteBuffer getNIOBuffer() {
-    final DMStats stats = this.owner.getConduit().stats;
+    final DMStats stats = this.owner.getConduit().getStats();
     if (nioInputBuffer == null) {
       int allocSize = this.recvBufferSize;
       if (allocSize == -1) {
@@ -3374,7 +3388,7 @@ public class Connection implements Runnable {
     boolean origSocketInUse = this.socketInUse;
     this.socketInUse = true;
     MsgReader msgReader = null;
-    DMStats stats = owner.getConduit().stats;
+    DMStats stats = owner.getConduit().getStats();
     final Version version = getRemoteVersion();
     try {
       if (useNIO()) {
@@ -3518,7 +3532,7 @@ public class Connection implements Runnable {
           nioInputBuffer.limit(startPos + nioMessageLength);
           if (this.handshakeRead) {
             if (nioMessageType == NORMAL_MSG_TYPE) {
-              this.owner.getConduit().stats.incMessagesBeingReceived(true, 
nioMessageLength);
+              
this.owner.getConduit().getStats().incMessagesBeingReceived(true, 
nioMessageLength);
               ByteBufferInputStream bbis =
                   remoteVersion == null ? new 
ByteBufferInputStream(nioInputBuffer)
                       : new VersionedByteBufferInputStream(nioInputBuffer, 
remoteVersion);
@@ -3526,9 +3540,9 @@ public class Connection implements Runnable {
               try {
                 ReplyProcessor21.initMessageRPId();
                 // add serialization stats
-                long startSer = 
this.owner.getConduit().stats.startMsgDeserialization();
+                long startSer = 
this.owner.getConduit().getStats().startMsgDeserialization();
                 msg = (DistributionMessage) 
InternalDataSerializer.readDSFID(bbis);
-                this.owner.getConduit().stats.endMsgDeserialization(startSer);
+                
this.owner.getConduit().getStats().endMsgDeserialization(startSer);
                 if (bbis.available() != 0) {
                   logger.warn(LocalizedMessage.create(
                       
LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
@@ -3593,7 +3607,7 @@ public class Connection implements Runnable {
               }
             } else if (nioMessageType == CHUNKED_MSG_TYPE) {
               MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
-              this.owner.getConduit().stats.incMessagesBeingReceived(md.size() 
== 0,
+              
this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
                   nioMessageLength);
               try {
                 md.addChunk(nioInputBuffer, nioMessageLength);
@@ -3604,7 +3618,7 @@ public class Connection implements Runnable {
             } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
               // logger.info("END_CHUNK msgId="+nioMsgId);
               MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
-              this.owner.getConduit().stats.incMessagesBeingReceived(md.size() 
== 0,
+              
this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
                   nioMessageLength);
               try {
                 md.addChunk(nioInputBuffer, nioMessageLength);
@@ -3621,7 +3635,7 @@ public class Connection implements Runnable {
               try {
                 msg = md.getMessage();
               } catch (ClassNotFoundException ex) {
-                
this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+                
this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
                 failureMsg = 
LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE
                     .toLocalizedString();
                 failureEx = ex;
@@ -3629,7 +3643,7 @@ public class Connection implements Runnable {
                 logger.fatal(LocalizedMessage
                     
.create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex));
               } catch (IOException ex) {
-                
this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+                
this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
                 failureMsg = 
LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE
                     .toLocalizedString();
                 failureEx = ex;
@@ -3652,7 +3666,7 @@ public class Connection implements Runnable {
                 // is still usable:
                 SystemFailure.checkFailure();
                 
this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
-                
this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+                
this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
                 failureMsg = 
LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE
                     .toLocalizedString();
                 failureEx = ex;
@@ -3822,7 +3836,7 @@ public class Connection implements Runnable {
                     // } else {
                     // ConnectionTable.threadWantsSharedResources();
                   }
-                  this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
+                  this.conduit.getStats().incThreadOwnedReceivers(1L, 
dominoNumber);
                   // Because this thread is not shared resource, it will be 
used for direct
                   // ack. Direct ack messages can be large. This call will 
resize the send
                   // buffer.
@@ -3915,7 +3929,7 @@ public class Connection implements Runnable {
 
   private void compactOrResizeBuffer(int messageLength) {
     final int oldBufferSize = nioInputBuffer.capacity();
-    final DMStats stats = this.owner.getConduit().stats;
+    final DMStats stats = this.owner.getConduit().getStats();
     int allocSize = messageLength + MSG_HEADER_BYTES;
     if (oldBufferSize < allocSize) {
       // need a bigger buffer

http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index c55af82..affe5cd 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -114,6 +114,7 @@ public class ConnectionTable {
    * acks, will be put in this map.
    */
   protected final Map unorderedConnectionMap = new ConcurrentHashMap();
+
   /**
    * Used for all accepted connections. These connections are read only; we 
never send messages,
    * except for acks; only receive.
@@ -201,14 +202,14 @@ public class ConnectionTable {
   }
 
 
-  private ConnectionTable(TCPConduit c) throws IOException {
-    this.owner = c;
+  private ConnectionTable(TCPConduit conduit) throws IOException {
+    this.owner = conduit;
     this.idleConnTimer = (this.owner.idleConnectionTimeout != 0)
-        ? new SystemTimer(c.getDM().getSystem(), true) : null;
+        ? new SystemTimer(conduit.getDM().getSystem(), true) : null;
     this.threadOrderedConnMap = new ThreadLocal();
     this.threadConnMaps = new ArrayList();
     this.threadConnectionMap = new ConcurrentHashMap();
-    this.p2pReaderThreadPool = 
createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
+    this.p2pReaderThreadPool = 
createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
     this.socketCloser = new SocketCloser();
   }
 
@@ -248,14 +249,14 @@ public class ConnectionTable {
   // }
 
   /** conduit calls acceptConnection after an accept */
-  protected void acceptConnection(Socket sock) throws IOException, 
ConnectionException {
-    Connection connection = null;
+  protected void acceptConnection(Socket sock, PeerConnectionFactory 
peerConnectionFactory)
+      throws IOException, ConnectionException, InterruptedException {
     InetAddress connAddress = sock.getInetAddress(); // for bug 44736
     boolean finishedConnecting = false;
-    Connection conn = null;
+    Connection connection = null;
     // boolean exceptionLogged = false;
     try {
-      conn = Connection.createReceiver(this, sock);
+      connection = peerConnectionFactory.createReceiver(this, sock);
 
       // check for shutdown (so it doesn't get missed in the finally block)
       this.owner.getCancelCriterion().checkCancelInProgress(null);
@@ -279,26 +280,29 @@ public class ConnectionTable {
       // in our caller.
       // no need to log error here since caller will log warning
 
-      if (conn != null && !finishedConnecting) {
+      if (connection != null && !finishedConnecting) {
         // we must be throwing from checkCancelInProgress so close the 
connection
-        
closeCon(LocalizedStrings.ConnectionTable_CANCEL_AFTER_ACCEPT.toLocalizedString(),
 conn);
-        conn = null;
+        
closeCon(LocalizedStrings.ConnectionTable_CANCEL_AFTER_ACCEPT.toLocalizedString(),
+            connection);
+        connection = null;
       }
     }
 
-    if (conn != null) {
+    if (connection != null) {
       synchronized (this.receivers) {
-        this.owner.stats.incReceivers();
+        this.owner.getStats().incReceivers();
         if (this.closed) {
           
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_NO_LONGER_IN_USE
-              .toLocalizedString(), conn);
+              .toLocalizedString(), connection);
           return;
         }
-        this.receivers.add(conn);
+        if (!connection.isSocketClosed()) {
+          this.receivers.add(connection);
+        }
       }
       if (logger.isDebugEnabled()) {
-        logger.debug("Accepted {} myAddr={} theirAddr={}", conn, 
getConduit().getMemberId(),
-            conn.remoteAddr);
+        logger.debug("Accepted {} myAddr={} theirAddr={}", connection, 
getConduit().getMemberId(),
+            connection.remoteAddr);
       }
     }
   }
@@ -328,11 +332,11 @@ public class ConnectionTable {
     try {
       con = Connection.createSender(owner.getMembershipManager(), this, 
preserveOrder, id,
           sharedResource, startTime, ackThreshold, ackSAThreshold);
-      this.owner.stats.incSenders(sharedResource, preserveOrder);
+      this.owner.getStats().incSenders(sharedResource, preserveOrder);
     } finally {
       // our connection failed to notify anyone waiting for our pending con
       if (con == null) {
-        this.owner.stats.incFailedConnect();
+        this.owner.getStats().incFailedConnect();
         synchronized (m) {
           Object rmObj = m.remove(id);
           if (rmObj != pc && rmObj != null) {
@@ -521,7 +525,7 @@ public class ConnectionTable {
     if (logger.isDebugEnabled()) {
       logger.debug("ConnectionTable: created an ordered connection: {}", 
result);
     }
-    this.owner.stats.incSenders(false/* shared */, true /* preserveOrder */);
+    this.owner.getStats().incSenders(false/* shared */, true /* preserveOrder 
*/);
 
     // Update the list of connections owned by this thread....
 
@@ -1309,6 +1313,10 @@ public class ConnectionTable {
 
     @Override
     public boolean cancel() {
+      Connection con = this.c;
+      if (con != null) {
+        con.onIdleCancel();
+      }
       this.c = null;
       return super.cancel();
     }
@@ -1355,5 +1363,7 @@ public class ConnectionTable {
     }
   }
 
-
+  public int getNumberOfReceivers() {
+    return receivers.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index be1f533..1bbfd08 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -94,7 +94,7 @@ public abstract class MsgReader {
   public abstract ByteBuffer readAtLeast(int bytes) throws IOException;
 
   protected DMStats getStats() {
-    return conn.getConduit().stats;
+    return conn.getConduit().getStats();
   }
 
   public static class Header {

http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
new file mode 100644
index 0000000..7bf9638
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+public class PeerConnectionFactory {
+  /**
+   * creates a connection that we accepted (it was initiated by an explicit 
connect being done on
+   * the other side). We will only receive data on this socket; never send.
+   */
+  public Connection createReceiver(ConnectionTable table, Socket socket)
+      throws IOException, ConnectionException {
+    Connection connection = new Connection(table, socket);
+    connection.initRecevier();
+    return connection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 991aaf7..c52a676 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -20,7 +20,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.SocketException;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
@@ -189,10 +188,8 @@ public class TCPConduit implements Runnable {
    * the object that receives DistributionMessage messages received by this 
conduit.
    */
   private final DirectChannel directChannel;
-  /**
-   * Stats from the delegate
-   */
-  DMStats stats;
+
+  private DMStats stats;
 
   /**
    * Config from the delegate
@@ -260,7 +257,7 @@ public class TCPConduit implements Runnable {
       this.stats = directChannel.getDMStats();
       this.config = directChannel.getDMConfig();
     }
-    if (this.stats == null) {
+    if (this.getStats() == null) {
       this.stats = new LonerDistributionManager.DummyDMStats();
     }
 
@@ -642,7 +639,7 @@ public class TCPConduit implements Runnable {
     if (directChannel != null) {
       this.stats = directChannel.getDMStats();
     }
-    if (this.stats == null) {
+    if (this.getStats() == null) {
       this.stats = new LonerDistributionManager.DummyDMStats();
     }
     try {
@@ -745,7 +742,7 @@ public class TCPConduit implements Runnable {
               }
             }
           } else {
-            this.stats.incFailedAccept();
+            this.getStats().incFailedAccept();
             if (e instanceof IOException && "Too many open 
files".equals(e.getMessage())) {
               getConTable().fileDescriptorsExhausted();
             } else {
@@ -800,16 +797,16 @@ public class TCPConduit implements Runnable {
 
   protected void basicAcceptConnection(Socket othersock) {
     try {
-      getConTable().acceptConnection(othersock);
+      getConTable().acceptConnection(othersock, new PeerConnectionFactory());
     } catch (IOException io) {
       // exception is logged by the Connection
       if (!stopped) {
-        this.stats.incFailedAccept();
+        this.getStats().incFailedAccept();
       }
     } catch (ConnectionException ex) {
       // exception is logged by the Connection
       if (!stopped) {
-        this.stats.incFailedAccept();
+        this.getStats().incFailedAccept();
       }
     } catch (CancelException e) {
     } catch (Exception e) {
@@ -820,7 +817,7 @@ public class TCPConduit implements Runnable {
         // }
         // else
         {
-          this.stats.incFailedAccept();
+          this.getStats().incFailedAccept();
           logger.warn(LocalizedMessage.create(
               
LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
               new Object[] {othersock.getInetAddress(), e}), e);
@@ -977,7 +974,7 @@ public class TCPConduit implements Runnable {
           }
 
           // Close the connection (it will get rebuilt later).
-          this.stats.incReconnectAttempts();
+          this.getStats().incReconnectAttempts();
           if (conn != null) {
             try {
               if (logger.isDebugEnabled()) {
@@ -1172,6 +1169,13 @@ public class TCPConduit implements Runnable {
     return (ct != null) && ct.hasReceiversFor(endPoint);
   }
 
+  /**
+   * Stats from the delegate
+   */
+  public DMStats getStats() {
+    return stats;
+  }
+
   protected class Stopper extends CancelCriterion {
 
     /*

http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
new file mode 100644
index 0000000..312c64d
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.Socket;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@Category(UnitTest.class)
+public class ConnectionTableTest {
+
+  @Test
+  public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws 
Exception {
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class);
+    when(system.isShareSockets()).thenReturn(false);
+
+    DM dm = mock(DM.class);
+    when(dm.getSystem()).thenReturn(system);
+
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    DMStats dmStats = mock(DMStats.class);
+
+    TCPConduit tcpConduit = mock(TCPConduit.class);
+    when(tcpConduit.getDM()).thenReturn(dm);
+    when(tcpConduit.getCancelCriterion()).thenReturn(cancelCriterion);
+    when(tcpConduit.getStats()).thenReturn(dmStats);
+
+    Connection connection = mock(Connection.class);
+    when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed 
as soon at it was
+                                                        // created
+
+    Socket socket = mock(Socket.class);
+
+    ConnectionTable table = ConnectionTable.create(tcpConduit);
+
+    PeerConnectionFactory factory = mock(PeerConnectionFactory.class);
+    when(factory.createReceiver(table, socket)).thenReturn(connection);
+
+    table.acceptConnection(socket, factory);
+    assertEquals(0, table.getNumberOfReceivers());
+  }
+}

Reply via email to