GEODE-1400: An inflight transaction op could arrive later than a client failover operation
* Handle inflight p2p transaction message received later than failover message. * Add unit tests. * Move hasTxAlreadyFinished method to TXManagerImpl. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/384d379a Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/384d379a Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/384d379a Branch: refs/heads/feature/GEODE-1400 Commit: 384d379ae7040c8fe222afdeab96973f50157296 Parents: 8eac0fa Author: eshu <[email protected]> Authored: Tue May 24 12:04:52 2016 -0700 Committer: eshu <[email protected]> Committed: Tue May 31 14:39:42 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/RemoteOperationMessage.java | 46 +++- .../gemfire/internal/cache/TXManagerImpl.java | 71 +++++- .../cache/partitioned/PartitionMessage.java | 54 ++++- .../cache/RemoteOperationMessageTest.java | 93 ++++++++ .../internal/cache/TXManagerImplTest.java | 236 +++++++++++++++++++ .../cache/partitioned/PartitionMessageTest.java | 100 ++++++++ 6 files changed, 566 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java index 8ffab72..19e1dea 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java @@ -187,7 +187,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme /** * check to see if the cache is closing */ - final public boolean checkCacheClosing(DistributionManager dm) { + public boolean checkCacheClosing(DistributionManager dm) { GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); // return (cache != null && cache.isClosed()); return cache == null || cache.isClosed(); @@ -197,11 +197,11 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme * check to see if the distributed system is closing * @return true if the distributed system is closing */ - final public boolean checkDSClosing(DistributionManager dm) { + public boolean checkDSClosing(DistributionManager dm) { InternalDistributedSystem ds = dm.getSystem(); return (ds == null || ds.isDisconnecting()); } - + /** * Upon receipt of the message, both process the message and send an * acknowledgement, not necessarily in that order. Note: Any hang in this @@ -222,8 +222,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId())); return; } - GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem()); - r = gfc.getRegionByPathForProcessing(this.regionPath); + GemFireCacheImpl gfc = getCache(dm); + r = getRegionByPath(gfc); if (r == null && failIfRegionMissing()) { // if the distributed system is disconnecting, don't send a reply saying // the partitioned region can't be found (bug 36585) @@ -235,13 +235,19 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme thr = UNHANDLED_EXCEPTION; // [bruce] r might be null here, so we have to go to the cache instance to get the txmgr - TXManagerImpl txMgr = GemFireCacheImpl.getInstance().getTxManager(); - TXStateProxy tx = null; - try { - tx = txMgr.masqueradeAs(this); - sendReply = operateOnRegion(dm, r, startTime); - } finally { - txMgr.unmasquerade(tx); + TXManagerImpl txMgr = getTXManager(gfc); + TXStateProxy tx = txMgr.masqueradeAs(this); + if (tx == null) { + sendReply = operateOnRegion(dm, r, startTime); + } else { + try { + TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId()); + if (!hasTxAlreadyFinished(tx, txMgr, txid)) { + sendReply = operateOnRegion(dm, r, startTime); + } + } finally { + txMgr.unmasquerade(tx); + } } thr = null; @@ -310,6 +316,22 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme } } } + + boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) { + return txMgr.hasTxAlreadyFinished(tx, txid); + } + + TXManagerImpl getTXManager(GemFireCacheImpl cache) { + return cache.getTxManager(); + } + + LocalRegion getRegionByPath(GemFireCacheImpl gfc) { + return gfc.getRegionByPathForProcessing(this.regionPath); + } + + GemFireCacheImpl getCache(final DistributionManager dm) { + return (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem()); + } /** Send a generic ReplyMessage. This is in a method so that subclasses can override the reply message type * @param pr the Partitioned Region for the message whose statistics are incremented http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java index dd4653c..49926e6 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java @@ -84,7 +84,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa * * @see CacheTransactionManager */ -public final class TXManagerImpl implements CacheTransactionManager, +public class TXManagerImpl implements CacheTransactionManager, MembershipListener { private static final Logger logger = LogService.getLogger(); @@ -729,8 +729,26 @@ public final class TXManagerImpl implements CacheTransactionManager, return null; } TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId()); - TXStateProxy val; - val = this.hostedTXStates.get(key); + TXStateProxy val = getOrSetHostedTXState(key, msg); + + if (val != null) { + boolean success = getLock(val, key); + while (!success) { + val = getOrSetHostedTXState(key, msg); + if (val != null) { + success = getLock(val, key); + } else { + break; + } + } + } + + setTXState(val); + return val; + } + + TXStateProxy getOrSetHostedTXState(TXId key, TransactionMessage msg) { + TXStateProxy val = this.hostedTXStates.get(key); if (val == null) { synchronized(this.hostedTXStates) { val = this.hostedTXStates.get(key); @@ -746,14 +764,49 @@ public final class TXManagerImpl implements CacheTransactionManager, } } } - if (val != null) { - if (!val.getLock().isHeldByCurrentThread()) { - val.getLock().lock(); + return val; + } + + boolean getLock(TXStateProxy val, TXId key) { + if (!val.getLock().isHeldByCurrentThread()) { + val.getLock().lock(); + synchronized (this.hostedTXStates) { + TXStateProxy curVal = this.hostedTXStates.get(key); + // Inflight op could be received later than TXFailover operation. + if (curVal == null) { + if (!isHostedTxRecentlyCompleted(key)) { + this.hostedTXStates.put(key, val); + // Failover op removed the val + // It is possible that the same operation can be executed + // twice by two threads, but data is consistent. + } + } else { + if (val != curVal) { + //Failover op replaced with a new TXStateProxyImpl + //Use the new one instead. + val.getLock().unlock(); + return false; + } + } } } - - setTXState(val); - return val; + return true; + } + + public boolean hasTxAlreadyFinished(TXStateProxy tx, TXId txid) { + if (tx == null) { + return false; + } + if (isHostedTxRecentlyCompleted(txid)) { + //Should only happen when handling a later arrival of transactional op from proxy, + //while the transaction has failed over and already committed or rolled back. + //Just send back reply as a success op. + //The client connection should be lost from proxy, or + //the proxy is closed for failover to occur. + logger.info("TxId {} has already finished." , txid); + return true; + } + return false; } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java index db4cc59..9c54587 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java @@ -58,6 +58,7 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegionException; import com.gemstone.gemfire.internal.cache.PrimaryBucketException; +import com.gemstone.gemfire.internal.cache.TXId; import com.gemstone.gemfire.internal.cache.TXManagerImpl; import com.gemstone.gemfire.internal.cache.TXStateProxy; import com.gemstone.gemfire.internal.cache.TransactionMessage; @@ -262,8 +263,8 @@ public abstract class PartitionMessage extends DistributionMessage implements /** * check to see if the cache is closing */ - final public boolean checkCacheClosing(DistributionManager dm) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + public boolean checkCacheClosing(DistributionManager dm) { + GemFireCacheImpl cache = getGemFireCacheImpl(); // return (cache != null && cache.isClosed()); return cache == null || cache.isClosed(); } @@ -272,11 +273,32 @@ public abstract class PartitionMessage extends DistributionMessage implements * check to see if the distributed system is closing * @return true if the distributed system is closing */ - final public boolean checkDSClosing(DistributionManager dm) { + public boolean checkDSClosing(DistributionManager dm) { InternalDistributedSystem ds = dm.getSystem(); return (ds == null || ds.isDisconnecting()); } + boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) { + return txMgr.hasTxAlreadyFinished(tx, txid); + } + + PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException { + return PartitionedRegion.getPRFromId(this.regionId); + } + + GemFireCacheImpl getGemFireCacheImpl() { + return GemFireCacheImpl.getInstance(); + } + + TXManagerImpl getTXManagerImpl(GemFireCacheImpl cache) { + return cache.getTxManager(); + } + + long getStartPartitionMessageProcessingTime(PartitionedRegion pr) { + return pr.getPrStats().startPartitionMessageProcessing(); + } + + /** * Upon receipt of the message, both process the message and send an * acknowledgement, not necessarily in that order. Note: Any hang in this @@ -298,7 +320,7 @@ public abstract class PartitionMessage extends DistributionMessage implements thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString(dm.getId())); return; } - pr = PartitionedRegion.getPRFromId(this.regionId); + pr = getPartitionedRegion(); if (pr == null && failIfRegionMissing()) { // if the distributed system is disconnecting, don't send a reply saying // the partitioned region can't be found (bug 36585) @@ -307,21 +329,27 @@ public abstract class PartitionMessage extends DistributionMessage implements } if (pr != null) { - startTime = pr.getPrStats().startPartitionMessageProcessing(); + startTime = getStartPartitionMessageProcessingTime(pr); } thr = UNHANDLED_EXCEPTION; - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + GemFireCacheImpl cache = getGemFireCacheImpl(); if(cache==null) { throw new ForceReattemptException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0.toLocalizedString()); } - TXManagerImpl txMgr = cache.getTxManager(); - TXStateProxy tx = null; - try { - tx = txMgr.masqueradeAs(this); - sendReply = operateOnPartitionedRegion(dm, pr, startTime); - } finally { - txMgr.unmasquerade(tx); + TXManagerImpl txMgr = getTXManagerImpl(cache); + TXStateProxy tx = txMgr.masqueradeAs(this); + if (tx == null) { + sendReply = operateOnPartitionedRegion(dm, pr, startTime); + } else { + try { + TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId()); + if (!hasTxAlreadyFinished(tx, txMgr, txid)) { + sendReply = operateOnPartitionedRegion(dm, pr, startTime); + } + } finally { + txMgr.unmasquerade(tx); + } } thr = null; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java new file mode 100644 index 0000000..ecfc2b0 --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache; + +import static org.mockito.Mockito.*; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.internal.stubbing.answers.CallsRealMethods; + +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.TXId; +import com.gemstone.gemfire.internal.cache.TXManagerImpl; +import com.gemstone.gemfire.internal.cache.TXStateProxy; +import com.gemstone.gemfire.internal.cache.TXStateProxyImpl; +import com.gemstone.gemfire.test.fake.Fakes; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + + +@Category(UnitTest.class) +public class RemoteOperationMessageTest { + private GemFireCacheImpl cache; + private RemoteOperationMessage msg; + private DistributionManager dm; + private LocalRegion r; + private TXManagerImpl txMgr; + private TXId txid; + private long startTime = 0; + TXStateProxy tx; + + @Before + public void setUp() throws InterruptedException { + cache = Fakes.cache(); + dm = mock(DistributionManager.class); + msg = mock(RemoteOperationMessage.class); + r = mock(LocalRegion.class); + txMgr = mock(TXManagerImpl.class); + txid = new TXId(null, 0); + tx = mock(TXStateProxyImpl.class); + + when(msg.checkCacheClosing(dm)).thenReturn(false); + when(msg.checkDSClosing(dm)).thenReturn(false); + when(msg.getCache(dm)).thenReturn(cache); + when(msg.getRegionByPath(cache)).thenReturn(r); + when(msg.getTXManager(cache)).thenReturn(txMgr); + when(txMgr.hasTxAlreadyFinished(tx, txid)).thenCallRealMethod(); + + doAnswer(new CallsRealMethods()).when(msg).process(dm); + } + + @Test + public void messageWithNoTXPerformsOnRegion() throws InterruptedException, RemoteOperationException { + when(txMgr.masqueradeAs(msg)).thenReturn(null); + msg.process(dm); + + verify(msg, times(1)).operateOnRegion(dm, r, startTime); + } + + @Test + public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, RemoteOperationException { + when(txMgr.masqueradeAs(msg)).thenReturn(tx); + when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenCallRealMethod(); + msg.process(dm); + + verify(msg, times(1)).operateOnRegion(dm, r, startTime); + } + + @Test + public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, RemoteOperationException { + when(txMgr.masqueradeAs(msg)).thenReturn(tx); + when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true); + msg.process(dm); + + verify(msg, times(0)).operateOnRegion(dm, r, startTime); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java new file mode 100644 index 0000000..a4b8127 --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.gemstone.gemfire.internal.cache; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage; +import com.gemstone.gemfire.test.fake.Fakes; +import com.gemstone.gemfire.test.junit.categories.UnitTest; +import com.jayway.awaitility.Awaitility; + + +@Category(UnitTest.class) +public class TXManagerImplTest { + private TXManagerImpl txMgr; + TXId txid; + DestroyMessage msg; + TXCommitMessage txCommitMsg; + TXId completedTxid; + TXId notCompletedTxid; + InternalDistributedMember member; + CountDownLatch latch = new CountDownLatch(1); + TXStateProxy tx1, tx2; + + @Before + public void setUp() { + Cache cache = Fakes.cache(); + txMgr = new TXManagerImpl(null, cache); + txid = new TXId(null, 0); + msg = mock(DestroyMessage.class); + txCommitMsg = mock(TXCommitMessage.class); + member = mock(InternalDistributedMember.class); + completedTxid = new TXId(member, 1); + notCompletedTxid = new TXId(member, 2); + + when(this.msg.canStartRemoteTransaction()).thenReturn(true); + when(this.msg.canParticipateInTransaction()).thenReturn(true); + } + + @Test + public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock(){ + TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); + + assertNotNull(tx); + assertEquals(tx, txMgr.getHostedTXState(txid)); + assertTrue(txMgr.getLock(tx, txid)); + } + + @Test + public void getLockAfterTXStateRemoved() throws InterruptedException{ + TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); + + assertEquals(tx, txMgr.getHostedTXState(txid)); + assertTrue(txMgr.getLock(tx, txid)); + assertNotNull(tx); + assertTrue(txMgr.getLock(tx, txid)); + tx.getLock().unlock(); + + TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg); + assertEquals(tx, oldtx); + + Thread t1 = new Thread(new Runnable() { + public void run() { + txMgr.removeHostedTXState(txid); + } + }); + t1.start(); + + t1.join(); + + TXStateProxy curTx = txMgr.getHostedTXState(txid); + assertNull(curTx); + + //after failover command removed the txid from hostedTXState, + //getLock should put back the original TXStateProxy + assertTrue(txMgr.getLock(tx, txid)); + assertEquals(tx, txMgr.getHostedTXState(txid)); + + tx.getLock().unlock(); + } + + @Test + public void getLockAfterTXStateReplaced() throws InterruptedException{ + TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg); + + assertEquals(oldtx, txMgr.getHostedTXState(txid)); + assertTrue(txMgr.getLock(oldtx, txid)); + assertNotNull(oldtx); + oldtx.getLock().unlock(); + + TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); + assertEquals(tx, oldtx); + + Thread t1 = new Thread(new Runnable() { + public void run() { + txMgr.removeHostedTXState(txid); + //replace with new TXState + txMgr.getOrSetHostedTXState(txid, msg); + } + }); + t1.start(); + + t1.join(); + + TXStateProxy curTx = txMgr.getHostedTXState(txid); + assertNotNull(curTx); + //replaced + assertNotEquals(tx, curTx); + + //after TXStateProxy replaced, getLock will not get + assertFalse(txMgr.getLock(tx, txid)); + + } + + @Test + public void getLockAfterTXStateCommitted() throws InterruptedException{ + TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg); + + assertEquals(oldtx, txMgr.getHostedTXState(txid)); + assertTrue(txMgr.getLock(oldtx, txid)); + assertNotNull(oldtx); + oldtx.getLock().unlock(); + + TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); + assertEquals(tx, oldtx); + + Thread t1 = new Thread(new Runnable() { + public void run() { + txMgr.removeHostedTXState(txid); + txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg); + } + }); + t1.start(); + + t1.join(); + + TXStateProxy curTx = txMgr.getHostedTXState(txid); + assertNull(curTx); + + //after TXStateProxy committed, getLock will get the lock for the oldtx + //but caller should not perform ops on this TXStateProxy + assertTrue(txMgr.getLock(tx, txid)); + } + + @Test + public void masqueradeAsCanGetLock() throws InterruptedException{ + TXStateProxy tx; + + tx = txMgr.masqueradeAs(msg); + assertNotNull(tx); + } + + @Test + public void masqueradeAsCanGetLockAfterTXStateIsReplaced() throws InterruptedException{ + TXStateProxy tx; + + Thread t1 = new Thread(new Runnable() { + public void run() { + tx1 = txMgr.getHostedTXState(txid); + assertNull(tx1); + tx1 =txMgr.getOrSetHostedTXState(txid, msg); + assertNotNull(tx1); + assertTrue(txMgr.getLock(tx1, txid)); + + latch.countDown(); + + Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS).until(() -> tx1.getLock().hasQueuedThreads()); + + txMgr.removeHostedTXState(txid); + + tx2 =txMgr.getOrSetHostedTXState(txid, msg); + assertNotNull(tx2); + assertTrue(txMgr.getLock(tx2, txid)); + + tx2.getLock().unlock(); + tx1.getLock().unlock(); + } + }); + t1.start(); + + assertTrue(latch.await(60, TimeUnit.SECONDS)); + + tx = txMgr.masqueradeAs(msg); + assertNotNull(tx); + assertEquals(tx, tx2); + tx.getLock().unlock(); + + t1.join(); + + } + + @Test + public void hasTxAlreadyFinishedDetectsNoTx() { + assertFalse(txMgr.hasTxAlreadyFinished(null, txid)); + } + + @Test + public void hasTxAlreadyFinishedDetectsTxNotFinished() { + TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg); + assertFalse(txMgr.hasTxAlreadyFinished(tx, notCompletedTxid)); + } + + @Test + public void hasTxAlreadyFinishedDetectsTxFinished() throws InterruptedException { + TXStateProxy tx = txMgr.getOrSetHostedTXState(completedTxid, msg); + txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg); + assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/384d379a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java new file mode 100644 index 0000000..bbbf714 --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.partitioned; + +import static org.mockito.Mockito.*; + +import java.io.IOException; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.internal.stubbing.answers.CallsRealMethods; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.query.QueryException; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.internal.cache.DataLocationException; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.TXId; +import com.gemstone.gemfire.internal.cache.TXManagerImpl; +import com.gemstone.gemfire.internal.cache.TXStateProxy; +import com.gemstone.gemfire.internal.cache.TXStateProxyImpl; +import com.gemstone.gemfire.test.fake.Fakes; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + + +@Category(UnitTest.class) +public class PartitionMessageTest { + + private GemFireCacheImpl cache; + private PartitionMessage msg; + private DistributionManager dm; + private PartitionedRegion pr; + private TXManagerImpl txMgr; + private TXId txid; + private long startTime = 1; + TXStateProxy tx; + + @Before + public void setUp() throws PRLocallyDestroyedException, InterruptedException { + cache = Fakes.cache(); + dm = mock(DistributionManager.class); + msg = mock(PartitionMessage.class); + pr = mock(PartitionedRegion.class); + txMgr = mock(TXManagerImpl.class); + tx = mock(TXStateProxyImpl.class); + txid = new TXId(null, 0); + + when(msg.checkCacheClosing(dm)).thenReturn(false); + when(msg.checkDSClosing(dm)).thenReturn(false); + when(msg.getPartitionedRegion()).thenReturn(pr); + when(msg.getGemFireCacheImpl()).thenReturn(cache); + when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime); + when(msg.getTXManagerImpl(cache)).thenReturn(txMgr); + when(msg.hasTxAlreadyFinished(null, txMgr, txid)).thenCallRealMethod(); + + doAnswer(new CallsRealMethods()).when(msg).process(dm); + } + + @Test + public void messageWithNoTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException { + when(txMgr.masqueradeAs(msg)).thenReturn(null); + msg.process(dm); + + verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime); + } + + @Test + public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException { + when(txMgr.masqueradeAs(msg)).thenReturn(tx); + msg.process(dm); + + verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime); + } + + @Test + public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException { + when(txMgr.masqueradeAs(msg)).thenReturn(tx); + when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true); + msg.process(dm); + + verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime); + } + +}
