Repository: incubator-geode
Updated Branches:
  refs/heads/develop 58d7fca30 -> 8899fc8d7


GEODE-1588: AckReader and Dispatching thread are shut down before sending 
gateway sender close connection messages

* There was an issue where the gateway sender thread was reading off the same 
socket as the ack reader.
  Instead, we force the ack reader thread to stop first, and close the 
inputstream to prevent reading garbled data
* Another issue was the ack reader thread was being spun up after being shut 
down.  Now we prevent the dispatching thread
  from doing so by checking to see if it is being shut down.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8899fc8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8899fc8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8899fc8d

Branch: refs/heads/develop
Commit: 8899fc8d744bfd3060bafd17b1b33e02c7db9e5f
Parents: 58d7fca
Author: Jason Huynh <[email protected]>
Authored: Wed Jun 22 10:42:53 2016 -0700
Committer: Jason Huynh <[email protected]>
Committed: Fri Jul 1 10:58:07 2016 -0700

----------------------------------------------------------------------
 .../AbstractGatewaySenderEventProcessor.java    |  5 ++-
 ...rentParallelGatewaySenderEventProcessor.java |  8 +++--
 ...urrentSerialGatewaySenderEventProcessor.java |  5 +--
 .../wan/GatewaySenderEventRemoteDispatcher.java | 34 ++++++++++++++++----
 4 files changed, 37 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index ce08e8d..e3e1a9e 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1150,10 +1150,9 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends Thread {
         }
       }
     }
-   
-    dispatcher.stop();
-    //set isStopped to true
+
     setIsStopped(true);
+    dispatcher.stop();
 
     if (this.isAlive()) {
       this.interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 07a3be5..82a53d3 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -237,6 +237,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor 
extends AbstractGatew
     if (!this.isAlive()) {
       return;
     }
+
+    setIsStopped(true);
+
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor 
Logger Group", logger);
 
@@ -248,12 +251,12 @@ public class 
ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
         return thread;
       }
     };
-    
+
     List<SenderStopperCallable> stopperCallables = new 
ArrayList<SenderStopperCallable>();
     for (ParallelGatewaySenderEventProcessor parallelProcessor : 
this.processors) {
       stopperCallables.add(new SenderStopperCallable(parallelProcessor));
     }
-    
+
     ExecutorService stopperService = 
Executors.newFixedThreadPool(processors.length, threadFactory);
     try {
       List<Future<Boolean>> futures = 
stopperService.invokeAll(stopperCallables);
@@ -275,7 +278,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor 
extends AbstractGatew
       throw rejectedExecutionEx;
     }
     
-    setIsStopped(true);
     stopperService.shutdown();
     closeProcessor();
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index ff810ec..a557ce1 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -268,6 +268,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor 
extends
       return;
     }
 
+    setIsStopped(true);
+
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup(
             "ConcurrentSerialGatewaySenderEventProcessor Logger Group",
@@ -312,8 +314,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor 
extends
     }
     //shutdown the stopperService. This will release all the stopper threads
     stopperService.shutdown();
-    setIsStopped(true);
-    
+
     closeProcessor();
     
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
 
b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index b178192..3948484 100644
--- 
a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ 
b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -301,6 +301,9 @@ public class GatewaySenderEventRemoteDispatcher implements
    * @throws GatewaySenderException
    */
   public Connection getConnection(boolean startAckReaderThread) throws 
GatewaySenderException{
+    if (this.processor.isStopped()) {
+      return null;
+    }
     // IF the connection is null 
     // OR the connection's ServerLocation doesn't match with the one stored in 
sender
     // THEN initialize the connection
@@ -343,7 +346,7 @@ public class GatewaySenderEventRemoteDispatcher implements
       if (con != null) {
         if (!con.isDestroyed()) {
           con.destroy();
-         this.sender.getProxy().returnConnection(con);
+          this.sender.getProxy().returnConnection(con);
         }
         
         // Reset the connection so the next time through a new one will be
@@ -364,6 +367,9 @@ public class GatewaySenderEventRemoteDispatcher implements
    */
   private void initializeConnection() throws GatewaySenderException,
       GemFireSecurityException {
+    if (this.processor.isStopped()) {
+      return;
+    }
     this.connectionLifeCycleLock.writeLock().lock(); 
     try {
       // Attempt to acquire a connection
@@ -625,9 +631,9 @@ public class GatewaySenderEventRemoteDispatcher implements
             }
           } else {
             // If we have received IOException.
-           // if (logger.isDebugEnabled()) {
+            if (logger.isDebugEnabled()) {
               logger.debug("{}: Received null ack from remote site.", 
processor.getSender());
-            //}
+            }
             processor.handleException();
             try { // This wait is before trying to getting new connection to
                   // receive ack. Without this there will be continuous call to
@@ -723,9 +729,11 @@ public class GatewaySenderEventRemoteDispatcher implements
       // not. No need to take lock as the reader thread may be blocked and we 
might not
       // get chance to destroy unless that returns.
       if (connection != null) {
-        if (!connection.isDestroyed()) {
-          connection.destroy();
-          sender.getProxy().returnConnection(connection);
+        Connection conn = connection;
+        shutDownAckReaderConnection();
+        if (!conn.isDestroyed()) {
+          conn.destroy();
+          sender.getProxy().returnConnection(conn);
         }
       }
       this.shutdown = true;
@@ -743,12 +751,24 @@ public class GatewaySenderEventRemoteDispatcher implements
         
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
       }
     }
+
+    private void shutDownAckReaderConnection() {
+      Connection conn = connection;
+      //attempt to unblock the ackreader thread by shutting down the 
inputStream, if it was stuck on a read
+      try {
+        if (conn != null && conn.getSocket() != null) {
+          conn.getSocket().shutdownInput();
+        }
+      } catch (IOException e) {
+        logger.warn("Unable to shutdown AckReaderThread Connection");
+      }
+    }
   }
     
   public void stopAckReaderThread() {
     if (this.ackReaderThread != null) {
       this.ackReaderThread.shutdown();
-    }    
+    }
   }
   
   @Override

Reply via email to