[ https://issues.apache.org/jira/browse/GEODE-4096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312080#comment-16312080 ]
ASF GitHub Bot commented on GEODE-4096: --------------------------------------- nabarunnag closed pull request #1186: GEODE-4096: Fixed race condition for connection global variable URL: https://github.com/apache/geode/pull/1186 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 3a15342675..782d7c0d27 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -48,7 +48,7 @@ private static final Logger logger = LogService.getLogger(); - private final AbstractGatewaySenderEventProcessor processor; + protected final AbstractGatewaySenderEventProcessor processor; private volatile Connection connection; @@ -67,6 +67,10 @@ */ private int failedConnectCount = 0; + void setAckReaderThread(AckReaderThread ackReaderThread) { + this.ackReaderThread = ackReaderThread; + } + public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) { this.processor = eventProcessor; this.sender = eventProcessor.getSender(); @@ -77,9 +81,17 @@ public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor ev if (e.getCause() instanceof GemFireSecurityException) { throw e; } + } } + GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor processor, + Connection connection) { + this.processor = processor; + this.sender = processor.getSender(); + this.connection = connection; + } + protected GatewayAck readAcknowledgement() { SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy()); GatewayAck ack = null; @@ -299,6 +311,7 @@ private boolean _dispatchBatch(List events, boolean isRetry) { */ public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException { if (this.processor.isStopped()) { + stop(); return null; } // IF the connection is null @@ -364,7 +377,7 @@ public void destroyConnection() { */ private void initializeConnection() throws GatewaySenderException, GemFireSecurityException { if (ackReaderThread != null) { - ackReaderThread.shutDownAckReaderConnection(); + ackReaderThread.shutDownAckReaderConnection(connection); } this.connectionLifeCycleLock.writeLock().lock(); try { @@ -560,6 +573,10 @@ public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor this(sender, processor.getName()); } + boolean isShutdown() { + return shutdown; + } + public AckReaderThread(GatewaySender sender, String name) { super("AckReaderThread for : " + name); this.setDaemon(true); @@ -751,7 +768,7 @@ public void shutdown() { // get chance to destroy unless that returns. Connection conn = connection; if (conn != null) { - shutDownAckReaderConnection(); + shutDownAckReaderConnection(conn); if (!conn.isDestroyed()) { conn.destroy(); sender.getProxy().returnConnection(conn); @@ -774,7 +791,7 @@ public void shutdown() { } } - private void shutDownAckReaderConnection() { + private void shutDownAckReaderConnection(Connection connection) { Connection conn = connection; // attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck // on a read @@ -808,7 +825,7 @@ public boolean isConnectedToRemote() { public void shutDownAckReaderConnection() { if (ackReaderThread != null) { - ackReaderThread.shutDownAckReaderConnection(); + ackReaderThread.shutDownAckReaderConnection(connection); } } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java new file mode 100644 index 0000000000..6480d85813 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class GatewaySenderEventRemoteDispatcherJUnitTest { + @Test + public void getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() { + AbstractGatewaySender sender = mock(AbstractGatewaySender.class); + AbstractGatewaySenderEventProcessor eventProcessor = + mock(AbstractGatewaySenderEventProcessor.class); + GatewaySenderEventRemoteDispatcher dispatcher = + new GatewaySenderEventRemoteDispatcher(eventProcessor, null); + GatewaySenderEventRemoteDispatcher.AckReaderThread ackReaderThread = + dispatcher.new AckReaderThread(sender, "AckReaderThread"); + dispatcher.setAckReaderThread(ackReaderThread); + assertFalse(ackReaderThread.isShutdown()); + when(eventProcessor.isStopped()).thenReturn(true); + assertNull(dispatcher.getConnection(false)); + assertTrue(ackReaderThread.isShutdown()); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Race Condition between ConcurrentSerialGatewaySenderEventProcessor stopper > thread and the _dispatchBatch method for the connection global variable. > --------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: GEODE-4096 > URL: https://issues.apache.org/jira/browse/GEODE-4096 > Project: Geode > Issue Type: Bug > Components: wan > Reporter: nabarun > Assignee: nabarun > > *+Order of execution for this race condition to occur+*. > # _dispatchBatch is trying to dispatch a batch of events but was somehow > unsuccessful > # It silently decides that the remote server may not be ready so it wants to > retry > # Same time we decide to stop the SerialGatewaySenderEventProcessor hence we > call the Stopper Thread. > # Before the threads are started on all the senders / dispatchers it sets the > isStopped flag for the SerialGatewaySenderEventProcessor to true. > # Then the _dispatchBatch method which was in retry mode makes a > getConnection call to get the connection. This method does a check on the > SerialGatewaySenderEventProcessor's isStopped flag. It sees that the flag is > set and this return null. > # This null is stored in the global variable connection for the dispatcher. > # Now that the _dispatchBatch method calls sees that the connection is null > it should raise an exception and destroyConnection. > # Meanwhile there was a AckThreadReader that was running and the stopper > thread for the event processor wants to stop it, but since the connection > global variable was set to null by the get connection method call by > _disptachBatch. > # Hence the shutDownAckReaderThreadConnection is executed on null and hence > the AckReaderThread continues to keep running - being stuck on socketRead0. > # But the problem is that the AckReaderThread acquire a > connectionLifeCycle.readLock. to readAcknowledgement, but the > destroyConnection calls from the stopper thread and _dispatchBatch's > exception handling code needs a connectionLifeCycleLock.writeLock which they > can't because readLock is held by the AckReaderThread, causing a deadlock -- This message was sent by Atlassian JIRA (v6.4.14#64029)