This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5401 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5401 by this push: new 40bcdbb Fix review comments. Use ExpireDisconnectedClientTransactionsMessage instead of TXManagerImpl.TXRemovalMessage when expire client transactions. Only send new message to Geode 1.7.0 and later servers - assuming old version servers will be rolled soon. Handle differently when server receives this new message based on sender version. 40bcdbb is described below commit 40bcdbb35f50b4ffc5b8d1e51b97418ca40f6ce8 Author: eshu <e...@pivotal.io> AuthorDate: Thu Jul 26 16:34:09 2018 -0700 Fix review comments. Use ExpireDisconnectedClientTransactionsMessage instead of TXManagerImpl.TXRemovalMessage when expire client transactions. Only send new message to Geode 1.7.0 and later servers - assuming old version servers will be rolled soon. Handle differently when server receives this new message based on sender version. --- ...ntServerTransactionFailoverDistributedTest.java | 18 +-- ...overWithMixedVersionServersDistributedTest.java | 33 +---- .../org/apache/geode/internal/DSFIDFactory.java | 6 +- .../geode/internal/DataSerializableFixedID.java | 4 +- ...xpireDisconnectedClientTransactionsMessage.java | 85 +++++++++++++ .../apache/geode/internal/cache/TXManagerImpl.java | 140 +-------------------- ...eDisconnectedClientTransactionsMessageTest.java | 67 ++++++++++ .../geode/internal/cache/TXManagerImplTest.java | 42 ++----- 8 files changed, 185 insertions(+), 210 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java index 1d6f413..b8908ea 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java @@ -98,7 +98,7 @@ public class ClientServerTransactionFailoverDistributedTest implements Serializa int numOfOpertions = 5; createClientRegion(true, port2, port1); - TransactionId txId = suspendTransaction(numOfOpertions); + TransactionId txId = beginAndSuspendTransaction(numOfOpertions); server2.invoke(() -> { cacheRule.getCache().close(); }); @@ -162,7 +162,7 @@ public class ClientServerTransactionFailoverDistributedTest implements Serializa return (PoolImpl) factory.create(uniqueName); } - private TransactionId suspendTransaction(int numOfOperations) { + private TransactionId beginAndSuspendTransaction(int numOfOperations) { Region region = clientCacheRule.getClientCache().getRegion(regionName); TXManagerImpl txManager = (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager(); @@ -237,8 +237,9 @@ public class ClientServerTransactionFailoverDistributedTest implements Serializa FutureTask<TransactionId>[] futureTasks = new FutureTask[numOfTransactions]; TransactionId[] txIds = new TransactionId[numOfTransactions]; - // suspend transactions - suspendTransactions(numOfTransactions, numOfOperationsInTransaction, threads, futureTasks, + // begin and suspend transactions + beginAndSuspendTransactions(numOfTransactions, numOfOperationsInTransaction, threads, + futureTasks, txIds); // unregister client on 2 of the servers @@ -264,8 +265,8 @@ public class ClientServerTransactionFailoverDistributedTest implements Serializa FutureTask<TransactionId>[] futureTasks = new FutureTask[numOfTransactions]; TransactionId[] txIds = new TransactionId[numOfTransactions]; - // suspend transactions - suspendTransactions(numOfTransactions, numOfOperations, threads, futureTasks, txIds); + // begin and suspend transactions + beginAndSuspendTransactions(numOfTransactions, numOfOperations, threads, futureTasks, txIds); // unregister client multiple times ClientProxyMembershipID clientProxyMembershipID = getClientId(); @@ -284,12 +285,13 @@ public class ClientServerTransactionFailoverDistributedTest implements Serializa } } - private void suspendTransactions(int numOfTransactions, int numOfOperations, Thread[] threads, + private void beginAndSuspendTransactions(int numOfTransactions, int numOfOperations, + Thread[] threads, FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds) throws InterruptedException, java.util.concurrent.ExecutionException { for (int i = 0; i < numOfTransactions; i++) { FutureTask<TransactionId> futureTask = - new FutureTask<>(() -> suspendTransaction(numOfOperations)); + new FutureTask<>(() -> beginAndSuspendTransaction(numOfOperations)); futureTasks[i] = futureTask; Thread thread = new Thread(futureTask); threads[i] = thread; diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java index 90e378c..6a96176 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java @@ -137,8 +137,8 @@ public class ClientServerTransactionFailoverWithMixedVersionServersDistributedTe Thread[] threads = new Thread[numOfTransactions]; FutureTask<TransactionId>[] futureTasks = new FutureTask[numOfTransactions]; TransactionId[] txIds = new TransactionId[numOfTransactions]; - // suspend transactions - suspendTransactions(numOfTransactions, numOfOperations, threads, futureTasks, txIds); + // begin and suspend transactions + beginAndSuspendTransactions(numOfTransactions, numOfOperations, threads, futureTasks, txIds); // resume transactions resumeTransactions(numOfTransactions, numOfOperations, threads, txIds); waitForResumeTransactionsToComplete(numOfTransactions, threads); @@ -235,12 +235,13 @@ public class ClientServerTransactionFailoverWithMixedVersionServersDistributedTe crf.create(regionName); } - private void suspendTransactions(int numOfTransactions, int numOfOperations, Thread[] threads, + private void beginAndSuspendTransactions(int numOfTransactions, int numOfOperations, + Thread[] threads, FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds) throws InterruptedException, java.util.concurrent.ExecutionException { for (int i = 0; i < numOfTransactions; i++) { FutureTask<TransactionId> futureTask = - new FutureTask<>(() -> suspendTransaction(numOfOperations)); + new FutureTask<>(() -> beginAndSuspendTransaction(numOfOperations)); futureTasks[i] = futureTask; Thread thread = new Thread(futureTask); threads[i] = thread; @@ -252,7 +253,7 @@ public class ClientServerTransactionFailoverWithMixedVersionServersDistributedTe } } - private TransactionId suspendTransaction(int numOfOperations) { + private TransactionId beginAndSuspendTransaction(int numOfOperations) { Region region = clientCacheRule.getClientCache().getRegion(regionName); TXManagerImpl txManager = (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager(); @@ -418,26 +419,4 @@ public class ClientServerTransactionFailoverWithMixedVersionServersDistributedTe .until(() -> assertThat(txManager.hostedTransactionsInProgressForTest()).isEqualTo(0)); } - @Test - public void clientTransactionExpiredAreRemovedOnNotYetRolledServer() throws Exception { - setupPartiallyRolledVersion(); - - server1.invoke(() -> createServerRegion(1, true)); - server2.invoke(() -> createServerRegion(1, true)); - server3.invoke(() -> createServerRegion(1, true)); - server4.invoke(() -> createServerRegion(1, false)); - client.invoke(() -> createClientRegion()); - - ClientProxyMembershipID clientProxyMembershipID = client.invoke(() -> getClientId()); - - int numOfTransactions = 12; - int numOfOperations = 1; - client.invokeAsync(() -> doUnfinishedTransactions(numOfTransactions, numOfOperations)); - - server4.invoke(() -> verifyTransactionAreStarted(numOfTransactions)); - - unregisterClientMultipleTimes(clientProxyMembershipID); - - server4.invoke(() -> verifyTransactionAreExpired(numOfTransactions)); - } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index a6ee477..96313b8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -215,6 +215,7 @@ import org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllMe import org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.ExpireDisconnectedClientTransactionsMessage; import org.apache.geode.internal.cache.FilterProfile; import org.apache.geode.internal.cache.FindDurableQueueProcessor.FindDurableQueueMessage; import org.apache.geode.internal.cache.FindDurableQueueProcessor.FindDurableQueueReply; @@ -256,7 +257,6 @@ import org.apache.geode.internal.cache.TXCommitMessage.CommitProcessQueryMessage import org.apache.geode.internal.cache.TXCommitMessage.CommitProcessQueryReplyMessage; import org.apache.geode.internal.cache.TXEntryState; import org.apache.geode.internal.cache.TXId; -import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.TXRemoteCommitMessage; import org.apache.geode.internal.cache.TXRemoteCommitMessage.TXRemoteCommitReplyMessage; import org.apache.geode.internal.cache.TXRemoteRollbackMessage; @@ -699,7 +699,7 @@ public class DSFIDFactory implements DataSerializableFixedID { registerDSFID(PR_BECOME_PRIMARY_BUCKET_MESSAGE, BecomePrimaryBucketMessage.class); registerDSFID(PR_BECOME_PRIMARY_BUCKET_REPLY, BecomePrimaryBucketReplyMessage.class); registerDSFID(PR_REMOVE_BUCKET_MESSAGE, RemoveBucketMessage.class); - registerDSFID(TX_MANAGER_REMOVE_TRANSACTIONS, TXManagerImpl.TXRemovalMessage.class); + registerDSFID(EXPIRE_CLIENT_TRANSACTIONS, ExpireDisconnectedClientTransactionsMessage.class); registerDSFID(PR_REMOVE_BUCKET_REPLY, RemoveBucketReplyMessage.class); registerDSFID(PR_MOVE_BUCKET_MESSAGE, MoveBucketMessage.class); registerDSFID(PR_MOVE_BUCKET_REPLY, MoveBucketReplyMessage.class); @@ -937,8 +937,6 @@ public class DSFIDFactory implements DataSerializableFixedID { registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY, GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class); registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class); - registerDSFID(EXPIRE_CLIENT_TRANSACTIONS, - TXManagerImpl.ExpireDisconnectedClientTransactionsMessage.class); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java index c2a40d3..f1f6be1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java @@ -569,7 +569,8 @@ public interface DataSerializableFixedID extends SerializationVersions { short PR_REMOVE_BUCKET_REPLY = 135; short PR_MOVE_BUCKET_MESSAGE = 136; short PR_MOVE_BUCKET_REPLY = 137; - short TX_MANAGER_REMOVE_TRANSACTIONS = 138; + // Geode-5401, message changed from remove transaction to expire transactions. + short EXPIRE_CLIENT_TRANSACTIONS = 138; short REGION_VERSION_VECTOR = 139; @@ -823,7 +824,6 @@ public interface DataSerializableFixedID extends SerializationVersions { short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181; short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182; short ABORT_BACKUP_REQUEST = 2183; - short EXPIRE_CLIENT_TRANSACTIONS = 2184; // NOTE, codes > 65535 will take 4 bytes to serialize diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java new file mode 100644 index 0000000..ece29e1 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.geode.DataSerializer; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Version; + +public class ExpireDisconnectedClientTransactionsMessage + extends HighPriorityDistributionMessage { + Set<TXId> txIds; + + /** for deserialization */ + public ExpireDisconnectedClientTransactionsMessage() {} + + // only send to geode 1.7.0 and later servers + // for prior geode 1.7.0 servers, message won't be sent + // assuming these servers will be rolled to new version soon. + static void send(DistributionManager dm, Set<InternalDistributedMember> recipients, + Set<TXId> txIds) { + ExpireDisconnectedClientTransactionsMessage msg = + new ExpireDisconnectedClientTransactionsMessage(); + msg.txIds = txIds; + Set newVersionRecipients = new HashSet(); + for (InternalDistributedMember recipient : recipients) { + // to geode 1.7.0 and later version servers + if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) { + newVersionRecipients.add(recipient); + } + } + msg.setRecipients(newVersionRecipients); + dm.putOutgoing(msg); + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.txIds = DataSerializer.readHashSet(in); + } + + public int getDSFID() { + return EXPIRE_CLIENT_TRANSACTIONS; + } + + @Override + protected void process(ClusterDistributionManager dm) { + InternalCache cache = dm.getCache(); + InternalDistributedMember sender = getSender(); + if (cache != null) { + TXManagerImpl mgr = cache.getTXMgr(); + if (sender.getVersionObject().compareTo(Version.GEODE_170) >= 0) { + // schedule to expire disconnected client transaction. + mgr.expireDisconnectedClientTransactions(this.txIds, false); + } else { + // check if transaction has been updated before remove it + mgr.removeExpiredClientTransactions(this.txIds); + } + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index 18cf5ef..9f6b87b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -14,9 +14,6 @@ */ package org.apache.geode.internal.cache; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -39,10 +36,10 @@ import java.util.concurrent.locks.LockSupport; import org.apache.logging.log4j.Logger; -import org.apache.geode.DataSerializer; import org.apache.geode.GemFireException; import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; +import org.apache.geode.annotations.TestingOnly; import org.apache.geode.cache.CacheTransactionManager; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.TransactionDataRebalancedException; @@ -52,15 +49,12 @@ import org.apache.geode.cache.TransactionListener; import org.apache.geode.cache.TransactionWriter; import org.apache.geode.cache.UnsupportedOperationInTransactionException; import org.apache.geode.distributed.TXManagerCancelledException; -import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SystemTimer.SystemTimerTask; -import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.entries.AbstractRegionEntry; import org.apache.geode.internal.cache.tier.sockets.Message; import org.apache.geode.internal.concurrent.ConcurrentHashSet; @@ -120,6 +114,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene private final Map<TXId, TXStateProxy> hostedTXStates; + // Used for testing only. private final Set<TXId> scheduledToBeRemovedTx = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "trackScheduledToBeRemovedTx") ? new ConcurrentHashSet<TXId>() : null; @@ -1196,11 +1191,12 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene synchronized (this.hostedTXStates) { for (TXId txId : txIds) { // only expire client transaction if no activity for the given transactionTimeToLive - scheduleToRemoveExpiredClientTransction(txId); + scheduleToRemoveExpiredClientTransaction(txId); } } } + @TestingOnly /** remove the given TXStates for test */ public void removeTransactions(Set<TXId> txIds, boolean distribute) { synchronized (this.hostedTXStates) { @@ -1334,132 +1330,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene throw new InternalGemFireError("the parameter TXCommitMessage is not an exception token"); } - public static class TXRemovalMessage extends HighPriorityDistributionMessage { - - Set<TXId> txIds; - - /** for deserialization */ - public TXRemovalMessage() {} - - static void send(DistributionManager dm, Set<InternalDistributedMember> recipients, - Set<TXId> txIds) { - TXRemovalMessage msg = new TXRemovalMessage(); - msg.txIds = txIds; - // only send to servers with version earlier than geode 1.7.0 - // newer version use ExpireDisconnectedClientTransactionsMessage - Set oldVersionRecipients = new HashSet(); - for (InternalDistributedMember recipient : recipients) { - if (recipient.getVersionObject().compareTo(Version.GEODE_170) < 0) { - oldVersionRecipients.add(recipient); - } - } - msg.setRecipients(oldVersionRecipients); - dm.putOutgoing(msg); - } - - @Override - public void toData(DataOutput out) throws IOException { - DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.txIds = DataSerializer.readHashSet(in); - } - - public int getDSFID() { - return TX_MANAGER_REMOVE_TRANSACTIONS; - } - - @Override - protected void process(ClusterDistributionManager dm) { - InternalCache cache = dm.getCache(); - if (cache != null) { - TXManagerImpl mgr = cache.getTXMgr(); - // check if transaction has been updated before remove it - mgr.removeExpiredClientTransactions(this.txIds); - } - } - } - - public static class ExpireDisconnectedClientTransactionsMessage - extends HighPriorityDistributionMessage { - Set<TXId> txIds; - - /** for deserialization */ - public ExpireDisconnectedClientTransactionsMessage() {} - - // only send to geode 1.7.0 and later servers - static void send(DistributionManager dm, Set<InternalDistributedMember> recipients, - Set<TXId> txIds) { - ExpireDisconnectedClientTransactionsMessage msg = - new ExpireDisconnectedClientTransactionsMessage(); - msg.txIds = txIds; - Set newVersionRecipients = new HashSet(); - for (InternalDistributedMember recipient : recipients) { - // to geode 1.7.0 and later version servers - if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) { - newVersionRecipients.add(recipient); - } - } - msg.setRecipients(newVersionRecipients); - dm.putOutgoing(msg); - } - - @Override - public void toData(DataOutput out) throws IOException { - DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out); - } - - @Override - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.txIds = DataSerializer.readHashSet(in); - } - - public int getDSFID() { - return EXPIRE_CLIENT_TRANSACTIONS; - } - - @Override - protected void process(ClusterDistributionManager dm) { - InternalCache cache = dm.getCache(); - if (cache != null) { - TXManagerImpl mgr = cache.getTXMgr(); - mgr.expireDisconnectedClientTransactions(this.txIds, false); - } - } - } - /** timer task for expiring the given TXStates */ public void expireDisconnectedClientTransactions(Set<TXId> txIds, boolean distribute) { - long timeout = TimeUnit.SECONDS.toMillis(getTransactionTimeToLive()); - if (distribute) { - if (timeout <= 0) { - removeClientTransactionsOnRemoteServer(txIds); - } else { - if (logger.isDebugEnabled()) { - logger.debug("expiring the following transactions: {}", Arrays.toString(txIds.toArray())); - } - // schedule to send remove message to server with version earlier than geode 1.7.0 - SystemTimerTask task = new SystemTimerTask() { - @Override - public void run2() { - removeClientTransactionsOnRemoteServer(txIds); - } - }; - getCache().getCCPTimer().schedule(task, timeout); - } - } - // schedule to expire client transactions on server with version geode 1.7.0 and after. - scheduleToExpireDisconnectedClientTransactions(txIds, distribute); - } - - void removeClientTransactionsOnRemoteServer(Set<TXId> txIds) { - TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(), txIds); - } - - /** timer task for expiring the given TXStates */ - public void scheduleToExpireDisconnectedClientTransactions(Set<TXId> txIds, boolean distribute) { // increase the client transaction timeout setting to avoid a late in-flight client operation // preventing the expiration of the client transaction. long timeout = (long) (TimeUnit.SECONDS.toMillis(getTransactionTimeToLive()) * 1.1); @@ -1500,7 +1372,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene SystemTimerTask task = new SystemTimerTask() { @Override public void run2() { - scheduleToRemoveExpiredClientTransction(txId); + scheduleToRemoveExpiredClientTransaction(txId); if (scheduledToBeRemovedTx != null) { scheduledToBeRemovedTx.remove(txId); } @@ -1510,7 +1382,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } } - void scheduleToRemoveExpiredClientTransction(TXId txId) { + void scheduleToRemoveExpiredClientTransaction(TXId txId) { synchronized (this.hostedTXStates) { TXStateProxy result = hostedTXStates.get(txId); if (result != null) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java new file mode 100644 index 0000000..3c32620 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Version; + +public class ExpireDisconnectedClientTransactionsMessageTest { + private final ClusterDistributionManager dm = mock(ClusterDistributionManager.class); + private final InternalCache cache = mock(InternalCache.class); + private final TXManagerImpl txManager = mock(TXManagerImpl.class); + private final InternalDistributedMember sender = mock(InternalDistributedMember.class); + private final ExpireDisconnectedClientTransactionsMessage message = + spy(new ExpireDisconnectedClientTransactionsMessage()); + private Version version = mock(Version.class); + + @Before + public void setup() { + when(dm.getCache()).thenReturn(cache); + when(cache.getTXMgr()).thenReturn(txManager); + doReturn(sender).when(message).getSender(); + when(sender.getVersionObject()).thenReturn(version); + } + + @Test + public void processMessageFromServerOfGeode170AndLaterVersionWillExpireDisconnectedClientTransactions() { + when(version.compareTo(Version.GEODE_170)).thenReturn(1); + + message.process(dm); + + verify(txManager, times(1)).expireDisconnectedClientTransactions(any(), eq(false)); + } + + @Test + public void processMessageFromServerOfPriorGeode170VersionWillRemoveExpiredClientTransactions() { + when(version.compareTo(Version.GEODE_170)).thenReturn(-1); + + message.process(dm); + + verify(txManager, times(1)).removeExpiredClientTransactions(any()); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java index cf42bab..77c67e5 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java @@ -395,27 +395,12 @@ public class TXManagerImplTest { TXStateProxyImpl tx = spy((TXStateProxyImpl) txMgr.getOrSetHostedTXState(txid, msg)); doReturn(true).when(tx).isOverTransactionTimeoutLimit(); - txMgr.scheduleToRemoveExpiredClientTransction(txid); + txMgr.scheduleToRemoveExpiredClientTransaction(txid); assertTrue(txMgr.isHostedTXStatesEmpty()); } @Test - public void processExpireDisconnectedClientTransactionsMessageWillExpireDisconnectedClientTransactions() { - TXManagerImpl.ExpireDisconnectedClientTransactionsMessage message = - new TXManagerImpl.ExpireDisconnectedClientTransactionsMessage(); - - InternalCache cache = mock(InternalCache.class); - TXManagerImpl txManager = mock(TXManagerImpl.class); - when(dm.getCache()).thenReturn(cache); - when(cache.getTXMgr()).thenReturn(txManager); - - message.process(dm); - - verify(txManager, times(1)).expireDisconnectedClientTransactions(any(), eq(false)); - } - - @Test public void clientTransactionsToBeRemovedAndDistributedAreSentToRemoveServerIfWithNoTimeout() { Set<TXId> txIds = (Set<TXId>) mock(Set.class); doReturn(0).when(spyTxMgr).getTransactionTimeToLive(); @@ -428,8 +413,7 @@ public class TXManagerImplTest { spyTxMgr.expireDisconnectedClientTransactions(txIds, true); - verify(spyTxMgr, times(1)).removeClientTransactionsOnRemoteServer(eq(txIds)); - verify(spyTxMgr, times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true)); + verify(spyTxMgr, times(1)).expireClientTransactionsOnRemoteServer(eq(txIds)); } @Test @@ -453,8 +437,7 @@ public class TXManagerImplTest { spyTxMgr.expireDisconnectedClientTransactions(txIds, false); - verify(spyTxMgr, never()).removeClientTransactionsOnRemoteServer(eq(txIds)); - verify(spyTxMgr, times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(false)); + verify(spyTxMgr, never()).expireClientTransactionsOnRemoteServer(eq(txIds)); verify(spyTxMgr, times(1)).removeHostedTXState(eq(txIds)); verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId1)); verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId3)); @@ -463,21 +446,10 @@ public class TXManagerImplTest { } @Test - public void clientTransactionsToBeRemovedAndDistributedAreScheduledToSentToRemoveServerIfWithTimeout() { - Set<TXId> txIds = mock(Set.class); - doReturn(1).when(spyTxMgr).getTransactionTimeToLive(); - - spyTxMgr.expireDisconnectedClientTransactions(txIds, true); - - verify(timer, times(1)).schedule(any(), eq(1000L)); - verify(spyTxMgr, times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true)); - } - - @Test public void clientTransactionsToBeExpiredAndDistributedAreSentToRemoveServer() { Set<TXId> txIds = mock(Set.class); - spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, true); + spyTxMgr.expireDisconnectedClientTransactions(txIds, true); verify(spyTxMgr, times(1)).expireClientTransactionsOnRemoteServer(eq(txIds)); } @@ -486,7 +458,7 @@ public class TXManagerImplTest { public void clientTransactionsNotToBeDistributedAreNotSentToRemoveServer() { Set<TXId> txIds = mock(Set.class); - spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, false); + spyTxMgr.expireDisconnectedClientTransactions(txIds, false); verify(spyTxMgr, never()).expireClientTransactionsOnRemoteServer(eq(txIds)); } @@ -503,7 +475,7 @@ public class TXManagerImplTest { set.add(txId1); set.add(txId2); - spyTxMgr.scheduleToExpireDisconnectedClientTransactions(set, false); + spyTxMgr.expireDisconnectedClientTransactions(set, false); verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId1), eq(1100L)); verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId2), eq(1100L)); @@ -518,7 +490,7 @@ public class TXManagerImplTest { } @Test - public void clientTransactionIsScheduledToBeReIfWithNoTimeout() { + public void clientTransactionIsScheduledToBeRemovedIfWithTimeout() { spyTxMgr.scheduleToRemoveClientTransaction(txid, 1000); verify(timer, times(1)).schedule(any(), eq(1000L));