This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d73bcd9a2a GEODE-10335: TXManagerImpl.close resets currentInstance
(#7844)
d73bcd9a2a is described below
commit d73bcd9a2aa0eeb94fad0e9d225a87b9af024000
Author: Mario Ivanac <[email protected]>
AuthorDate: Thu Sep 8 08:10:05 2022 +0200
GEODE-10335: TXManagerImpl.close resets currentInstance (#7844)
* GEODE-10335: TXManagerImpl.close resets currentInstance
* GEODE-10335: added test
---
.../apache/geode/internal/cache/TXManagerImpl.java | 32 ++--
.../geode/internal/cache/TXManagerImplTest.java | 164 +++++++++++++--------
2 files changed, 122 insertions(+), 74 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 96b8a2a7d5..1dacdcd06d 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.logging.log4j.Logger;
@@ -88,7 +89,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
private final ThreadLocal<Boolean> pauseJTA;
@MakeNotStatic
- private static TXManagerImpl currentInstance = null;
+ private static final AtomicReference<TXManagerImpl> currentInstance = new
AtomicReference<>();
// The unique transaction ID for this Manager
private final AtomicInteger uniqId;
@@ -202,16 +203,16 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
isTXDistributed = new ThreadLocal<>();
transactionTimeToLive = Integer
.getInteger(GeodeGlossary.GEMFIRE_PREFIX +
"cacheServer.transactionTimeToLive", 180);
- currentInstance = this;
+ currentInstance.set(this);
this.statisticsClock = statisticsClock;
}
public static TXManagerImpl getCurrentInstanceForTest() {
- return currentInstance;
+ return currentInstance.get();
}
public static void setCurrentInstanceForTest(TXManagerImpl instance) {
- currentInstance = instance;
+ currentInstance.set(instance);
}
InternalCache getCache() {
@@ -687,6 +688,10 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
for (final TransactionListener listener : listeners) {
closeListener(listener);
}
+ TXManagerImpl instance = currentInstance.get();
+ if (instance != null) {
+ currentInstance.set(null);
+ }
}
private void closeListener(TransactionListener tl) {
@@ -855,17 +860,19 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
}
public static int getCurrentTXUniqueId() {
- if (currentInstance == null) {
+ TXManagerImpl instance = currentInstance.get();
+ if (instance == null) {
return NOTX;
}
- return currentInstance.getMyTXUniqueId();
+ return instance.getMyTXUniqueId();
}
public static TXStateProxy getCurrentTXState() {
- if (currentInstance == null) {
+ TXManagerImpl instance = currentInstance.get();
+ if (instance == null) {
return null;
}
- return currentInstance.getTXState();
+ return instance.getTXState();
}
public int getMyTXUniqueId() {
@@ -1633,7 +1640,10 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
@Override
public void run2() {
- TXManagerImpl mgr = TXManagerImpl.currentInstance;
+ TXManagerImpl mgr = TXManagerImpl.currentInstance.get();
+ if (mgr == null) {
+ return;
+ }
TXStateProxy tx = mgr.suspendedTXs.remove(txId);
if (tx != null) {
try {
@@ -1781,7 +1791,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
};
public static void incRefCount(AbstractRegionEntry re) {
- TXManagerImpl mgr = currentInstance;
+ TXManagerImpl mgr = currentInstance.get();
if (mgr != null) {
mgr.refCountMap.create(re, incCallback, null, null, true);
}
@@ -1791,7 +1801,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
* Return true if refCount went to zero.
*/
public static boolean decRefCount(AbstractRegionEntry re) {
- TXManagerImpl mgr = currentInstance;
+ TXManagerImpl mgr = currentInstance.get();
if (mgr != null) {
return mgr.refCountMap.removeConditionally(re, decCallback, null, null)
!= null;
} else {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
index 1d3e4fd6da..24b1af2ccb 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
@@ -17,12 +17,7 @@ package org.apache.geode.internal.cache;
import static
org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
@@ -105,9 +100,12 @@ public class TXManagerImplTest {
public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock() {
TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
- assertNotNull(tx);
- assertEquals(tx, txMgr.getHostedTXState(txid));
- assertTrue(txMgr.getLock(tx, txid));
+ assertThat(tx).isNotNull();
+ assertThat(txMgr.getHostedTXState(txid)).isEqualTo(tx);
+ assertThat(txMgr.getLock(tx, txid)).isTrue();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -116,9 +114,13 @@ public class TXManagerImplTest {
TXManagerImpl txManager = new TXManagerImpl(mock(CachePerfStats.class),
cache, disabledClock());
txManager.setDistributed(false);
TXStateProxy proxy = txManager.beginJTA();
- assertEquals(1, proxy.getTxId().getUniqId());
- assertNotNull(txManager);
+ assertThat(proxy.getTxId().getUniqId()).isEqualTo(1);
+ assertThat(txManager).isNotNull();
TXManagerImpl.INITIAL_UNIQUE_ID_VALUE = 0;
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNotNull();
+
+ txManager.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -126,22 +128,26 @@ public class TXManagerImplTest {
TXManagerImpl txManager = new TXManagerImpl(mock(CachePerfStats.class),
cache, disabledClock());
txManager.setDistributed(false);
TXStateProxy proxy = txManager.beginJTA();
- assertEquals(1, proxy.getTxId().getUniqId());
- assertNotNull(txManager);
+ assertThat(proxy.getTxId().getUniqId()).isEqualTo(1);
+ assertThat(txManager).isNotNull();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNotNull();
+
+ txManager.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
public void getLockAfterTXStateRemoved() throws InterruptedException {
TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
- assertEquals(tx, txMgr.getHostedTXState(txid));
- assertTrue(txMgr.getLock(tx, txid));
- assertNotNull(tx);
- assertTrue(txMgr.getLock(tx, txid));
+ assertThat(txMgr.getHostedTXState(txid)).isEqualTo(tx);
+ assertThat(txMgr.getLock(tx, txid)).isTrue();
+ assertThat(tx).isNotNull();
+ assertThat(txMgr.getLock(tx, txid)).isTrue();
tx.getLock().unlock();
TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
- assertEquals(tx, oldtx);
+ assertThat(oldtx).isEqualTo(tx);
Thread t1 = new Thread(() -> txMgr.removeHostedTXState(txid));
t1.start();
@@ -149,12 +155,12 @@ public class TXManagerImplTest {
t1.join();
TXStateProxy curTx = txMgr.getHostedTXState(txid);
- assertNull(curTx);
+ assertThat(curTx).isNull();
// after failover command removed the txid from hostedTXState,
// getLock should put back the original TXStateProxy
- assertTrue(txMgr.getLock(tx, txid));
- assertEquals(tx, txMgr.getHostedTXState(txid));
+ assertThat(txMgr.getLock(tx, txid)).isTrue();
+ assertThat(txMgr.getHostedTXState(txid)).isEqualTo(tx);
tx.getLock().unlock();
}
@@ -163,13 +169,13 @@ public class TXManagerImplTest {
public void getLockAfterTXStateReplaced() throws InterruptedException {
TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
- assertEquals(oldtx, txMgr.getHostedTXState(txid));
- assertTrue(txMgr.getLock(oldtx, txid));
- assertNotNull(oldtx);
+ assertThat(txMgr.getHostedTXState(txid)).isEqualTo(oldtx);
+ assertThat(txMgr.getLock(oldtx, txid)).isTrue();
+ assertThat(oldtx).isNotNull();
oldtx.getLock().unlock();
TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
- assertEquals(tx, oldtx);
+ assertThat(oldtx).isEqualTo(tx);
Thread t1 = new Thread(() -> {
txMgr.removeHostedTXState(txid);
@@ -181,12 +187,12 @@ public class TXManagerImplTest {
t1.join();
TXStateProxy curTx = txMgr.getHostedTXState(txid);
- assertNotNull(curTx);
+ assertThat(curTx).isNotNull();
// replaced
- assertNotEquals(tx, curTx);
+ assertThat(curTx).isNotEqualTo(tx);
// after TXStateProxy replaced, getLock will not get
- assertFalse(txMgr.getLock(tx, txid));
+ assertThat(txMgr.getLock(tx, txid)).isFalse();
}
@@ -194,13 +200,13 @@ public class TXManagerImplTest {
public void getLockAfterTXStateCommitted() throws InterruptedException {
TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg);
- assertEquals(oldtx, txMgr.getHostedTXState(txid));
- assertTrue(txMgr.getLock(oldtx, txid));
- assertNotNull(oldtx);
+ assertThat(txMgr.getHostedTXState(txid)).isEqualTo(oldtx);
+ assertThat(txMgr.getLock(oldtx, txid)).isTrue();
+ assertThat(oldtx).isNotNull();
oldtx.getLock().unlock();
TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg);
- assertEquals(tx, oldtx);
+ assertThat(oldtx).isEqualTo(tx);
Thread t1 = new Thread(() -> {
when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
@@ -225,12 +231,12 @@ public class TXManagerImplTest {
t1.join();
TXStateProxy curTx = txMgr.getHostedTXState(txid);
- assertNull(curTx);
+ assertThat(curTx).isNull();
- assertFalse(tx.isInProgress());
+ assertThat(tx.isInProgress()).isFalse();
// after TXStateProxy committed, getLock will get the lock for the oldtx
// but caller should not perform ops on this TXStateProxy
- assertTrue(txMgr.getLock(tx, txid));
+ assertThat(txMgr.getLock(tx, txid)).isTrue();
}
@Test
@@ -238,7 +244,10 @@ public class TXManagerImplTest {
TXStateProxy tx;
tx = txMgr.masqueradeAs(msg);
- assertNotNull(tx);
+ assertThat(tx).isNotNull();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -247,10 +256,10 @@ public class TXManagerImplTest {
Thread t1 = new Thread(() -> {
tx1 = txMgr.getHostedTXState(txid);
- assertNull(tx1);
+ assertThat(tx1).isNull();
tx1 = txMgr.getOrSetHostedTXState(txid, msg);
- assertNotNull(tx1);
- assertTrue(txMgr.getLock(tx1, txid));
+ assertThat(tx1).isNotNull();
+ assertThat(txMgr.getLock(tx1, txid)).isTrue();
latch.countDown();
@@ -260,19 +269,19 @@ public class TXManagerImplTest {
txMgr.removeHostedTXState(txid);
tx2 = txMgr.getOrSetHostedTXState(txid, msg);
- assertNotNull(tx2);
- assertTrue(txMgr.getLock(tx2, txid));
+ assertThat(tx2).isNotNull();
+ assertThat(txMgr.getLock(tx2, txid)).isTrue();
tx2.getLock().unlock();
tx1.getLock().unlock();
});
t1.start();
- assertTrue(latch.await(60, TimeUnit.SECONDS));
+ assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
tx = txMgr.masqueradeAs(msg);
- assertNotNull(tx);
- assertEquals(tx, tx2);
+ assertThat(tx).isNotNull();
+ assertThat(tx2).isEqualTo(tx);
tx.getLock().unlock();
t1.join();
@@ -282,7 +291,10 @@ public class TXManagerImplTest {
@Test
public void testTxStateWithNotFinishedTx() {
TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg);
- assertTrue(tx.isInProgress());
+ assertThat(tx.isInProgress()).isTrue();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -296,7 +308,10 @@ public class TXManagerImplTest {
} finally {
txMgr.unmasquerade(tx);
}
- assertFalse(tx.isInProgress());
+ assertThat(tx.isInProgress()).isFalse();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -310,7 +325,10 @@ public class TXManagerImplTest {
} finally {
txMgr.unmasquerade(tx);
}
- assertFalse(tx.isInProgress());
+ assertThat(tx.isInProgress()).isFalse();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
private void setupTx() throws InterruptedException {
@@ -342,10 +360,10 @@ public class TXManagerImplTest {
});
t1.start();
- assertTrue(latch.await(60, TimeUnit.SECONDS));
+ assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
TXStateProxy tx = txMgr.masqueradeAs(rollbackMsg);
- assertEquals(tx, tx1);
+ assertThat(tx1).isEqualTo(tx);
t1.join();
rollbackTransaction(tx);
}
@@ -375,42 +393,50 @@ public class TXManagerImplTest {
public void txStateNotCleanedupIfNotRemovedFromHostedTxStatesMap() {
tx1 = txMgr.getOrSetHostedTXState(txid, msg);
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1;
- assertNotNull(txStateProxy);
- assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+ assertThat(txStateProxy).isNotNull();
+ assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse();
txMgr.masqueradeAs(tx1);
txMgr.unmasquerade(tx1);
- assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+ assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse();
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
public void txStateCleanedUpIfRemovedFromHostedTxStatesMapCausedByFailover()
{
tx1 = txMgr.getOrSetHostedTXState(txid, msg);
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1;
- assertNotNull(txStateProxy);
- assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+ assertThat(txStateProxy).isNotNull();
+ assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse();
txStateProxy.setRemovedCausedByFailover(true);
txMgr.masqueradeAs(tx1);
// during TX failover, tx can be removed from the hostedTXStates map by
FindRemoteTXMessage
txMgr.getHostedTXStates().remove(txid);
txMgr.unmasquerade(tx1);
- assertTrue(txStateProxy.getLocalRealDeal().isClosed());
+ assertThat(txStateProxy.getLocalRealDeal().isClosed()).isTrue();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
public void
txStateDoesNotCleanUpIfRemovedFromHostedTxStatesMapNotCausedByFailover() {
tx1 = txMgr.getOrSetHostedTXState(txid, msg);
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1;
- assertNotNull(txStateProxy);
- assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+ assertThat(txStateProxy).isNotNull();
+ assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse();
txMgr.masqueradeAs(tx1);
// during TX failover, tx can be removed from the hostedTXStates map by
FindRemoteTXMessage
txMgr.getHostedTXStates().remove(txid);
txMgr.unmasquerade(tx1);
- assertFalse(txStateProxy.getLocalRealDeal().isClosed());
+ assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -422,7 +448,10 @@ public class TXManagerImplTest {
txMgr.scheduleToRemoveExpiredClientTransaction(txid);
- assertTrue(txMgr.isHostedTXStatesEmpty());
+ assertThat(txMgr.isHostedTXStatesEmpty()).isTrue();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -450,7 +479,7 @@ public class TXManagerImplTest {
doReturn(0).when(spyTxMgr).getTransactionTimeToLive();
when(txIds.iterator()).thenAnswer(
(Answer<Iterator<TXId>>) invocation -> Arrays.asList(txId1,
txId3).iterator());
- assertEquals(2, spyTxMgr.getHostedTXStates().size());
+ assertThat(spyTxMgr.getHostedTXStates()).hasSize(2);
spyTxMgr.expireDisconnectedClientTransactions(txIds, false);
@@ -458,8 +487,8 @@ public class TXManagerImplTest {
verify(spyTxMgr, times(1)).removeHostedTXState(eq(txIds));
verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId1));
verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId3));
- assertEquals(tx2, spyTxMgr.getHostedTXStates().get(txId2));
- assertEquals(1, spyTxMgr.getHostedTXStates().size());
+ assertThat(spyTxMgr.getHostedTXStates().get(txId2)).isEqualTo(tx2);
+ assertThat(spyTxMgr.getHostedTXStates()).hasSize(1);
}
@Test
@@ -542,7 +571,10 @@ public class TXManagerImplTest {
TXStateProxy tx;
tx = txMgr.masqueradeAs(msg);
- assertNotNull(tx.getTarget());
+ assertThat(tx.getTarget()).isNotNull();
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -554,6 +586,9 @@ public class TXManagerImplTest {
txMgr.removeHostedTXState(txid, true);
verify(txStateProxy).setRemovedCausedByFailover(eq(true));
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
@Test
@@ -565,6 +600,9 @@ public class TXManagerImplTest {
txMgr.removeHostedTXState(txid);
verify(txStateProxy, never()).setRemovedCausedByFailover(eq(true));
+
+ txMgr.close();
+ assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull();
}
}