[ 
https://issues.apache.org/jira/browse/GEODE-4096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312080#comment-16312080
 ] 

ASF GitHub Bot commented on GEODE-4096:
---------------------------------------

nabarunnag closed pull request #1186: GEODE-4096: Fixed race condition for 
connection global variable
URL: https://github.com/apache/geode/pull/1186
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3a15342675..782d7c0d27 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -48,7 +48,7 @@
 
   private static final Logger logger = LogService.getLogger();
 
-  private final AbstractGatewaySenderEventProcessor processor;
+  protected final AbstractGatewaySenderEventProcessor processor;
 
   private volatile Connection connection;
 
@@ -67,6 +67,10 @@
    */
   private int failedConnectCount = 0;
 
+  void setAckReaderThread(AckReaderThread ackReaderThread) {
+    this.ackReaderThread = ackReaderThread;
+  }
+
   public 
GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor 
eventProcessor) {
     this.processor = eventProcessor;
     this.sender = eventProcessor.getSender();
@@ -77,9 +81,17 @@ public 
GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor ev
       if (e.getCause() instanceof GemFireSecurityException) {
         throw e;
       }
+
     }
   }
 
+  GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor 
processor,
+      Connection connection) {
+    this.processor = processor;
+    this.sender = processor.getSender();
+    this.connection = connection;
+  }
+
   protected GatewayAck readAcknowledgement() {
     SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
     GatewayAck ack = null;
@@ -299,6 +311,7 @@ private boolean _dispatchBatch(List events, boolean 
isRetry) {
    */
   public Connection getConnection(boolean startAckReaderThread) throws 
GatewaySenderException {
     if (this.processor.isStopped()) {
+      stop();
       return null;
     }
     // IF the connection is null
@@ -364,7 +377,7 @@ public void destroyConnection() {
    */
   private void initializeConnection() throws GatewaySenderException, 
GemFireSecurityException {
     if (ackReaderThread != null) {
-      ackReaderThread.shutDownAckReaderConnection();
+      ackReaderThread.shutDownAckReaderConnection(connection);
     }
     this.connectionLifeCycleLock.writeLock().lock();
     try {
@@ -560,6 +573,10 @@ public AckReaderThread(GatewaySender sender, 
AbstractGatewaySenderEventProcessor
       this(sender, processor.getName());
     }
 
+    boolean isShutdown() {
+      return shutdown;
+    }
+
     public AckReaderThread(GatewaySender sender, String name) {
       super("AckReaderThread for : " + name);
       this.setDaemon(true);
@@ -751,7 +768,7 @@ public void shutdown() {
       // get chance to destroy unless that returns.
       Connection conn = connection;
       if (conn != null) {
-        shutDownAckReaderConnection();
+        shutDownAckReaderConnection(conn);
         if (!conn.isDestroyed()) {
           conn.destroy();
           sender.getProxy().returnConnection(conn);
@@ -774,7 +791,7 @@ public void shutdown() {
       }
     }
 
-    private void shutDownAckReaderConnection() {
+    private void shutDownAckReaderConnection(Connection connection) {
       Connection conn = connection;
       // attempt to unblock the ackReader thread by shutting down the 
inputStream, if it was stuck
       // on a read
@@ -808,7 +825,7 @@ public boolean isConnectedToRemote() {
 
   public void shutDownAckReaderConnection() {
     if (ackReaderThread != null) {
-      ackReaderThread.shutDownAckReaderConnection();
+      ackReaderThread.shutDownAckReaderConnection(connection);
     }
   }
 
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
new file mode 100644
index 0000000000..6480d85813
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class GatewaySenderEventRemoteDispatcherJUnitTest {
+  @Test
+  public void 
getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() {
+    AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+    AbstractGatewaySenderEventProcessor eventProcessor =
+        mock(AbstractGatewaySenderEventProcessor.class);
+    GatewaySenderEventRemoteDispatcher dispatcher =
+        new GatewaySenderEventRemoteDispatcher(eventProcessor, null);
+    GatewaySenderEventRemoteDispatcher.AckReaderThread ackReaderThread =
+        dispatcher.new AckReaderThread(sender, "AckReaderThread");
+    dispatcher.setAckReaderThread(ackReaderThread);
+    assertFalse(ackReaderThread.isShutdown());
+    when(eventProcessor.isStopped()).thenReturn(true);
+    assertNull(dispatcher.getConnection(false));
+    assertTrue(ackReaderThread.isShutdown());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Race Condition between ConcurrentSerialGatewaySenderEventProcessor stopper 
> thread and the _dispatchBatch method for the connection global variable.
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: GEODE-4096
>                 URL: https://issues.apache.org/jira/browse/GEODE-4096
>             Project: Geode
>          Issue Type: Bug
>          Components: wan
>            Reporter: nabarun
>            Assignee: nabarun
>
> *+Order of execution for this race condition to occur+*.
> #  _dispatchBatch is trying to dispatch a batch of events but was somehow 
> unsuccessful 
> # It silently decides that the remote server may not be ready so it wants to 
> retry
> # Same time we decide to stop the SerialGatewaySenderEventProcessor hence we 
> call the Stopper Thread.
> # Before the threads are started on all the senders / dispatchers it sets the 
> isStopped flag for the SerialGatewaySenderEventProcessor to true.
> # Then the _dispatchBatch method which was in retry mode makes a 
> getConnection call to get the connection. This method does a check on the 
> SerialGatewaySenderEventProcessor's isStopped flag. It sees that the flag is 
> set and this return null.
> # This null is stored in the global variable connection for the dispatcher.
> # Now that the _dispatchBatch method calls sees that the connection is null 
> it should raise an exception and destroyConnection.
> # Meanwhile there was a AckThreadReader that was running and the stopper 
> thread for the event processor wants to stop it, but since the connection 
> global variable was set to null by the get connection method call by 
> _disptachBatch.
> # Hence the shutDownAckReaderThreadConnection is executed on null and hence 
> the AckReaderThread continues to keep running - being stuck on socketRead0.
> # But the problem is that the AckReaderThread acquire a 
> connectionLifeCycle.readLock. to readAcknowledgement, but the 
> destroyConnection calls from the stopper thread and _dispatchBatch's 
> exception handling code needs a connectionLifeCycleLock.writeLock which they 
> can't because readLock is held by the AckReaderThread, causing a deadlock



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to