This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-7087
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7087 by this
push:
new cd2a9f2 GEODE-7087: Do not always clean up transaction if not hosted.
cd2a9f2 is described below
commit cd2a9f2007932ecd1477a79863b52b0dcfc5fe5c
Author: eshu <[email protected]>
AuthorDate: Thu Sep 5 14:51:38 2019 -0700
GEODE-7087: Do not always clean up transaction if not hosted.
* Only clean up transaction if not hosted casued by fail over.
* unmasquerade should not clean transaction in other cases e.g.
transaction
has been committed.
---
.../geode/internal/cache/FindRemoteTXMessage.java | 2 +-
.../apache/geode/internal/cache/TXManagerImpl.java | 15 ++++++--
.../geode/internal/cache/TXStateProxyImpl.java | 10 +++++
.../tier/sockets/command/TXFailoverCommand.java | 2 +-
.../geode/internal/cache/TXManagerImplTest.java | 44 +++++++++++++++++++++-
5 files changed, 65 insertions(+), 8 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
index 4c8cd34..fa46821 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/FindRemoteTXMessage.java
@@ -110,7 +110,7 @@ public class FindRemoteTXMessage extends
HighPriorityDistributionMessage
reply.isPartialCommitMessage = true;
}
// cleanup the local txStateProxy fixes bug 43069
- mgr.removeHostedTXState(txId);
+ mgr.removeHostedTXState(txId, true);
}
}
reply.setRecipient(getSender());
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index e545d9f..925fd92 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1047,7 +1047,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
updateLastOperationTime(tx);
}
try {
- cleanupTransactionIfNoLongerHost(tx);
+ cleanupTransactionIfNoLongerHostCausedByFailover(tx);
} finally {
setTXState(null);
tx.getLock().unlock();
@@ -1055,12 +1055,12 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
}
}
- void cleanupTransactionIfNoLongerHost(TXStateProxy tx) {
+ void cleanupTransactionIfNoLongerHostCausedByFailover(TXStateProxy tx) {
synchronized (hostedTXStates) {
if (!hostedTXStates.containsKey(tx.getTxId())) {
// clean up the transaction if no longer the host of the transaction
- // this could occur when a failover command removed the transaction.
- if (tx.isRealDealLocal()) {
+ // caused by a failover command removed the transaction.
+ if (tx.isRealDealLocal() && ((TXStateProxyImpl)
tx).isRemovedCausedByFailover()) {
((TXStateProxyImpl) tx).getLocalRealDeal().cleanup();
}
}
@@ -1077,10 +1077,17 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
* @return the TXStateProxy
*/
public TXStateProxy removeHostedTXState(TXId txId) {
+ return removeHostedTXState(txId, false);
+ }
+
+ public TXStateProxy removeHostedTXState(TXId txId, boolean causedByFailover)
{
synchronized (this.hostedTXStates) {
TXStateProxy result = this.hostedTXStates.remove(txId);
if (result != null) {
result.close();
+ if (causedByFailover) {
+ ((TXStateProxyImpl) result).setRemovedCausedByFailover(true);
+ }
}
return result;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index 86ac1a6..f9712df 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -80,6 +80,8 @@ public class TXStateProxyImpl implements TXStateProxy {
private long lastOperationTimeFromClient;
private final StatisticsClock statisticsClock;
+ private boolean removedCausedByFailover = false;
+
public TXStateProxyImpl(InternalCache cache, TXManagerImpl managerImpl, TXId
id,
InternalDistributedMember clientMember, StatisticsClock statisticsClock)
{
this.cache = cache;
@@ -205,6 +207,14 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
+ boolean isRemovedCausedByFailover() {
+ return removedCausedByFailover;
+ }
+
+ void setRemovedCausedByFailover(boolean removedCausedByFailover) {
+ this.removedCausedByFailover = removedCausedByFailover;
+ }
+
@Override
public void precommit()
throws CommitConflictException,
UnsupportedOperationInTransactionException {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
index 5231dfc..891f923 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
@@ -126,7 +126,7 @@ public class TXFailoverCommand extends BaseCommand {
writeException(clientMessage, new
TransactionDataNodeHasDepartedException(
"Could not find transaction host for " + txId), false,
serverConnection);
serverConnection.setAsTrue(RESPONDED);
- mgr.removeHostedTXState(txId);
+ mgr.removeHostedTXState(txId, true);
return;
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
index 345872c..868c08b 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -382,11 +383,12 @@ public class TXManagerImplTest {
}
@Test
- public void txStateCleanedUpIfRemovedFromHostedTxStatesMap() {
+ public void txStateCleanedUpIfRemovedFromHostedTxStatesMapCausedByFailover()
{
tx1 = txMgr.getOrSetHostedTXState(txid, msg);
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1;
assertNotNull(txStateProxy);
assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+ txStateProxy.setRemovedCausedByFailover(true);
txMgr.masqueradeAs(tx1);
// during TX failover, tx can be removed from the hostedTXStates map by
FindRemoteTXMessage
@@ -396,6 +398,20 @@ public class TXManagerImplTest {
}
@Test
+ public void
txStateDoesNotCleanUpIfRemovedFromHostedTxStatesMapNotCausedByFailover() {
+ tx1 = txMgr.getOrSetHostedTXState(txid, msg);
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1;
+ assertNotNull(txStateProxy);
+ assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+
+ txMgr.masqueradeAs(tx1);
+ // during TX failover, tx can be removed from the hostedTXStates map by
FindRemoteTXMessage
+ txMgr.getHostedTXStates().remove(txid);
+ txMgr.unmasquerade(tx1);
+ assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+ }
+
+ @Test
public void
clientTransactionWithIdleTimeLongerThanTransactionTimeoutIsRemoved()
throws Exception {
when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
@@ -519,7 +535,8 @@ public class TXManagerImplTest {
tx1 = mock(TXStateProxyImpl.class);
ReentrantLock lock = mock(ReentrantLock.class);
when(tx1.getLock()).thenReturn(lock);
- doThrow(new
RuntimeException()).when(spyTxMgr).cleanupTransactionIfNoLongerHost(tx1);
+ doThrow(new RuntimeException()).when(spyTxMgr)
+ .cleanupTransactionIfNoLongerHostCausedByFailover(tx1);
spyTxMgr.unmasquerade(tx1);
@@ -533,4 +550,27 @@ public class TXManagerImplTest {
tx = txMgr.masqueradeAs(msg);
assertNotNull(tx.getTarget());
}
+
+ @Test
+ public void removeHostedTXStateSetFlagIfCausedByFailover() {
+ Map<TXId, TXStateProxy> hostedTXStates = txMgr.getHostedTXStates();
+ TXStateProxyImpl txStateProxy = mock(TXStateProxyImpl.class);
+ hostedTXStates.put(txid, txStateProxy);
+
+ txMgr.removeHostedTXState(txid, true);
+
+ verify(txStateProxy).setRemovedCausedByFailover(eq(true));
+ }
+
+ @Test
+ public void removeHostedTXStateDoesNotSetFlagIfNotCausedByFailover() {
+ Map<TXId, TXStateProxy> hostedTXStates = txMgr.getHostedTXStates();
+ TXStateProxyImpl txStateProxy = mock(TXStateProxyImpl.class);
+ hostedTXStates.put(txid, txStateProxy);
+
+ txMgr.removeHostedTXState(txid);
+
+ verify(txStateProxy, never()).setRemovedCausedByFailover(eq(true));
+ }
+
}