This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5401
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5401 by this
push:
new 40bcdbb Fix review comments. Use
ExpireDisconnectedClientTransactionsMessage instead of
TXManagerImpl.TXRemovalMessage when expire client transactions. Only send new
message to Geode 1.7.0 and later servers - assuming old version servers will be
rolled soon. Handle differently when server receives this new message based on
sender version.
40bcdbb is described below
commit 40bcdbb35f50b4ffc5b8d1e51b97418ca40f6ce8
Author: eshu <[email protected]>
AuthorDate: Thu Jul 26 16:34:09 2018 -0700
Fix review comments.
Use ExpireDisconnectedClientTransactionsMessage instead of
TXManagerImpl.TXRemovalMessage when expire client transactions.
Only send new message to Geode 1.7.0 and later servers - assuming old
version servers will be rolled soon.
Handle differently when server receives this new message based on sender
version.
---
...ntServerTransactionFailoverDistributedTest.java | 18 +--
...overWithMixedVersionServersDistributedTest.java | 33 +----
.../org/apache/geode/internal/DSFIDFactory.java | 6 +-
.../geode/internal/DataSerializableFixedID.java | 4 +-
...xpireDisconnectedClientTransactionsMessage.java | 85 +++++++++++++
.../apache/geode/internal/cache/TXManagerImpl.java | 140 +--------------------
...eDisconnectedClientTransactionsMessageTest.java | 67 ++++++++++
.../geode/internal/cache/TXManagerImplTest.java | 42 ++-----
8 files changed, 185 insertions(+), 210 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
index 1d6f413..b8908ea 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
@@ -98,7 +98,7 @@ public class ClientServerTransactionFailoverDistributedTest
implements Serializa
int numOfOpertions = 5;
createClientRegion(true, port2, port1);
- TransactionId txId = suspendTransaction(numOfOpertions);
+ TransactionId txId = beginAndSuspendTransaction(numOfOpertions);
server2.invoke(() -> {
cacheRule.getCache().close();
});
@@ -162,7 +162,7 @@ public class ClientServerTransactionFailoverDistributedTest
implements Serializa
return (PoolImpl) factory.create(uniqueName);
}
- private TransactionId suspendTransaction(int numOfOperations) {
+ private TransactionId beginAndSuspendTransaction(int numOfOperations) {
Region region = clientCacheRule.getClientCache().getRegion(regionName);
TXManagerImpl txManager =
(TXManagerImpl)
clientCacheRule.getClientCache().getCacheTransactionManager();
@@ -237,8 +237,9 @@ public class ClientServerTransactionFailoverDistributedTest
implements Serializa
FutureTask<TransactionId>[] futureTasks = new
FutureTask[numOfTransactions];
TransactionId[] txIds = new TransactionId[numOfTransactions];
- // suspend transactions
- suspendTransactions(numOfTransactions, numOfOperationsInTransaction,
threads, futureTasks,
+ // begin and suspend transactions
+ beginAndSuspendTransactions(numOfTransactions,
numOfOperationsInTransaction, threads,
+ futureTasks,
txIds);
// unregister client on 2 of the servers
@@ -264,8 +265,8 @@ public class ClientServerTransactionFailoverDistributedTest
implements Serializa
FutureTask<TransactionId>[] futureTasks = new
FutureTask[numOfTransactions];
TransactionId[] txIds = new TransactionId[numOfTransactions];
- // suspend transactions
- suspendTransactions(numOfTransactions, numOfOperations, threads,
futureTasks, txIds);
+ // begin and suspend transactions
+ beginAndSuspendTransactions(numOfTransactions, numOfOperations, threads,
futureTasks, txIds);
// unregister client multiple times
ClientProxyMembershipID clientProxyMembershipID = getClientId();
@@ -284,12 +285,13 @@ public class
ClientServerTransactionFailoverDistributedTest implements Serializa
}
}
- private void suspendTransactions(int numOfTransactions, int numOfOperations,
Thread[] threads,
+ private void beginAndSuspendTransactions(int numOfTransactions, int
numOfOperations,
+ Thread[] threads,
FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
throws InterruptedException, java.util.concurrent.ExecutionException {
for (int i = 0; i < numOfTransactions; i++) {
FutureTask<TransactionId> futureTask =
- new FutureTask<>(() -> suspendTransaction(numOfOperations));
+ new FutureTask<>(() -> beginAndSuspendTransaction(numOfOperations));
futureTasks[i] = futureTask;
Thread thread = new Thread(futureTask);
threads[i] = thread;
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
index 90e378c..6a96176 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
@@ -137,8 +137,8 @@ public class
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
Thread[] threads = new Thread[numOfTransactions];
FutureTask<TransactionId>[] futureTasks = new
FutureTask[numOfTransactions];
TransactionId[] txIds = new TransactionId[numOfTransactions];
- // suspend transactions
- suspendTransactions(numOfTransactions, numOfOperations, threads,
futureTasks, txIds);
+ // begin and suspend transactions
+ beginAndSuspendTransactions(numOfTransactions, numOfOperations, threads,
futureTasks, txIds);
// resume transactions
resumeTransactions(numOfTransactions, numOfOperations, threads, txIds);
waitForResumeTransactionsToComplete(numOfTransactions, threads);
@@ -235,12 +235,13 @@ public class
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
crf.create(regionName);
}
- private void suspendTransactions(int numOfTransactions, int numOfOperations,
Thread[] threads,
+ private void beginAndSuspendTransactions(int numOfTransactions, int
numOfOperations,
+ Thread[] threads,
FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
throws InterruptedException, java.util.concurrent.ExecutionException {
for (int i = 0; i < numOfTransactions; i++) {
FutureTask<TransactionId> futureTask =
- new FutureTask<>(() -> suspendTransaction(numOfOperations));
+ new FutureTask<>(() -> beginAndSuspendTransaction(numOfOperations));
futureTasks[i] = futureTask;
Thread thread = new Thread(futureTask);
threads[i] = thread;
@@ -252,7 +253,7 @@ public class
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
}
}
- private TransactionId suspendTransaction(int numOfOperations) {
+ private TransactionId beginAndSuspendTransaction(int numOfOperations) {
Region region = clientCacheRule.getClientCache().getRegion(regionName);
TXManagerImpl txManager =
(TXManagerImpl)
clientCacheRule.getClientCache().getCacheTransactionManager();
@@ -418,26 +419,4 @@ public class
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
.until(() ->
assertThat(txManager.hostedTransactionsInProgressForTest()).isEqualTo(0));
}
- @Test
- public void clientTransactionExpiredAreRemovedOnNotYetRolledServer() throws
Exception {
- setupPartiallyRolledVersion();
-
- server1.invoke(() -> createServerRegion(1, true));
- server2.invoke(() -> createServerRegion(1, true));
- server3.invoke(() -> createServerRegion(1, true));
- server4.invoke(() -> createServerRegion(1, false));
- client.invoke(() -> createClientRegion());
-
- ClientProxyMembershipID clientProxyMembershipID = client.invoke(() ->
getClientId());
-
- int numOfTransactions = 12;
- int numOfOperations = 1;
- client.invokeAsync(() -> doUnfinishedTransactions(numOfTransactions,
numOfOperations));
-
- server4.invoke(() -> verifyTransactionAreStarted(numOfTransactions));
-
- unregisterClientMultipleTimes(clientProxyMembershipID);
-
- server4.invoke(() -> verifyTransactionAreExpired(numOfTransactions));
- }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index a6ee477..96313b8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -215,6 +215,7 @@ import
org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllMe
import
org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
+import
org.apache.geode.internal.cache.ExpireDisconnectedClientTransactionsMessage;
import org.apache.geode.internal.cache.FilterProfile;
import
org.apache.geode.internal.cache.FindDurableQueueProcessor.FindDurableQueueMessage;
import
org.apache.geode.internal.cache.FindDurableQueueProcessor.FindDurableQueueReply;
@@ -256,7 +257,6 @@ import
org.apache.geode.internal.cache.TXCommitMessage.CommitProcessQueryMessage
import
org.apache.geode.internal.cache.TXCommitMessage.CommitProcessQueryReplyMessage;
import org.apache.geode.internal.cache.TXEntryState;
import org.apache.geode.internal.cache.TXId;
-import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXRemoteCommitMessage;
import
org.apache.geode.internal.cache.TXRemoteCommitMessage.TXRemoteCommitReplyMessage;
import org.apache.geode.internal.cache.TXRemoteRollbackMessage;
@@ -699,7 +699,7 @@ public class DSFIDFactory implements
DataSerializableFixedID {
registerDSFID(PR_BECOME_PRIMARY_BUCKET_MESSAGE,
BecomePrimaryBucketMessage.class);
registerDSFID(PR_BECOME_PRIMARY_BUCKET_REPLY,
BecomePrimaryBucketReplyMessage.class);
registerDSFID(PR_REMOVE_BUCKET_MESSAGE, RemoveBucketMessage.class);
- registerDSFID(TX_MANAGER_REMOVE_TRANSACTIONS,
TXManagerImpl.TXRemovalMessage.class);
+ registerDSFID(EXPIRE_CLIENT_TRANSACTIONS,
ExpireDisconnectedClientTransactionsMessage.class);
registerDSFID(PR_REMOVE_BUCKET_REPLY, RemoveBucketReplyMessage.class);
registerDSFID(PR_MOVE_BUCKET_MESSAGE, MoveBucketMessage.class);
registerDSFID(PR_MOVE_BUCKET_REPLY, MoveBucketReplyMessage.class);
@@ -937,8 +937,6 @@ public class DSFIDFactory implements
DataSerializableFixedID {
registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
- registerDSFID(EXPIRE_CLIENT_TRANSACTIONS,
- TXManagerImpl.ExpireDisconnectedClientTransactionsMessage.class);
}
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index c2a40d3..f1f6be1 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -569,7 +569,8 @@ public interface DataSerializableFixedID extends
SerializationVersions {
short PR_REMOVE_BUCKET_REPLY = 135;
short PR_MOVE_BUCKET_MESSAGE = 136;
short PR_MOVE_BUCKET_REPLY = 137;
- short TX_MANAGER_REMOVE_TRANSACTIONS = 138;
+ // Geode-5401, message changed from remove transaction to expire
transactions.
+ short EXPIRE_CLIENT_TRANSACTIONS = 138;
short REGION_VERSION_VECTOR = 139;
@@ -823,7 +824,6 @@ public interface DataSerializableFixedID extends
SerializationVersions {
short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181;
short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
short ABORT_BACKUP_REQUEST = 2183;
- short EXPIRE_CLIENT_TRANSACTIONS = 2184;
// NOTE, codes > 65535 will take 4 bytes to serialize
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java
new file mode 100644
index 0000000..ece29e1
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
+
+public class ExpireDisconnectedClientTransactionsMessage
+ extends HighPriorityDistributionMessage {
+ Set<TXId> txIds;
+
+ /** for deserialization */
+ public ExpireDisconnectedClientTransactionsMessage() {}
+
+ // only send to geode 1.7.0 and later servers
+ // for prior geode 1.7.0 servers, message won't be sent
+ // assuming these servers will be rolled to new version soon.
+ static void send(DistributionManager dm, Set<InternalDistributedMember>
recipients,
+ Set<TXId> txIds) {
+ ExpireDisconnectedClientTransactionsMessage msg =
+ new ExpireDisconnectedClientTransactionsMessage();
+ msg.txIds = txIds;
+ Set newVersionRecipients = new HashSet();
+ for (InternalDistributedMember recipient : recipients) {
+ // to geode 1.7.0 and later version servers
+ if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
+ newVersionRecipients.add(recipient);
+ }
+ }
+ msg.setRecipients(newVersionRecipients);
+ dm.putOutgoing(msg);
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
+ this.txIds = DataSerializer.readHashSet(in);
+ }
+
+ public int getDSFID() {
+ return EXPIRE_CLIENT_TRANSACTIONS;
+ }
+
+ @Override
+ protected void process(ClusterDistributionManager dm) {
+ InternalCache cache = dm.getCache();
+ InternalDistributedMember sender = getSender();
+ if (cache != null) {
+ TXManagerImpl mgr = cache.getTXMgr();
+ if (sender.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
+ // schedule to expire disconnected client transaction.
+ mgr.expireDisconnectedClientTransactions(this.txIds, false);
+ } else {
+ // check if transaction has been updated before remove it
+ mgr.removeExpiredClientTransactions(this.txIds);
+ }
+ }
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 18cf5ef..9f6b87b 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -14,9 +14,6 @@
*/
package org.apache.geode.internal.cache;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -39,10 +36,10 @@ import java.util.concurrent.locks.LockSupport;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.TestingOnly;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.TransactionDataRebalancedException;
@@ -52,15 +49,12 @@ import org.apache.geode.cache.TransactionListener;
import org.apache.geode.cache.TransactionWriter;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.distributed.TXManagerCancelledException;
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MembershipListener;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
@@ -120,6 +114,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
private final Map<TXId, TXStateProxy> hostedTXStates;
+ // Used for testing only.
private final Set<TXId> scheduledToBeRemovedTx =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"trackScheduledToBeRemovedTx")
? new ConcurrentHashSet<TXId>() : null;
@@ -1196,11 +1191,12 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
synchronized (this.hostedTXStates) {
for (TXId txId : txIds) {
// only expire client transaction if no activity for the given
transactionTimeToLive
- scheduleToRemoveExpiredClientTransction(txId);
+ scheduleToRemoveExpiredClientTransaction(txId);
}
}
}
+ @TestingOnly
/** remove the given TXStates for test */
public void removeTransactions(Set<TXId> txIds, boolean distribute) {
synchronized (this.hostedTXStates) {
@@ -1334,132 +1330,8 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
throw new InternalGemFireError("the parameter TXCommitMessage is not an
exception token");
}
- public static class TXRemovalMessage extends HighPriorityDistributionMessage
{
-
- Set<TXId> txIds;
-
- /** for deserialization */
- public TXRemovalMessage() {}
-
- static void send(DistributionManager dm, Set<InternalDistributedMember>
recipients,
- Set<TXId> txIds) {
- TXRemovalMessage msg = new TXRemovalMessage();
- msg.txIds = txIds;
- // only send to servers with version earlier than geode 1.7.0
- // newer version use ExpireDisconnectedClientTransactionsMessage
- Set oldVersionRecipients = new HashSet();
- for (InternalDistributedMember recipient : recipients) {
- if (recipient.getVersionObject().compareTo(Version.GEODE_170) < 0) {
- oldVersionRecipients.add(recipient);
- }
- }
- msg.setRecipients(oldVersionRecipients);
- dm.putOutgoing(msg);
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
- this.txIds = DataSerializer.readHashSet(in);
- }
-
- public int getDSFID() {
- return TX_MANAGER_REMOVE_TRANSACTIONS;
- }
-
- @Override
- protected void process(ClusterDistributionManager dm) {
- InternalCache cache = dm.getCache();
- if (cache != null) {
- TXManagerImpl mgr = cache.getTXMgr();
- // check if transaction has been updated before remove it
- mgr.removeExpiredClientTransactions(this.txIds);
- }
- }
- }
-
- public static class ExpireDisconnectedClientTransactionsMessage
- extends HighPriorityDistributionMessage {
- Set<TXId> txIds;
-
- /** for deserialization */
- public ExpireDisconnectedClientTransactionsMessage() {}
-
- // only send to geode 1.7.0 and later servers
- static void send(DistributionManager dm, Set<InternalDistributedMember>
recipients,
- Set<TXId> txIds) {
- ExpireDisconnectedClientTransactionsMessage msg =
- new ExpireDisconnectedClientTransactionsMessage();
- msg.txIds = txIds;
- Set newVersionRecipients = new HashSet();
- for (InternalDistributedMember recipient : recipients) {
- // to geode 1.7.0 and later version servers
- if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
- newVersionRecipients.add(recipient);
- }
- }
- msg.setRecipients(newVersionRecipients);
- dm.putOutgoing(msg);
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
- this.txIds = DataSerializer.readHashSet(in);
- }
-
- public int getDSFID() {
- return EXPIRE_CLIENT_TRANSACTIONS;
- }
-
- @Override
- protected void process(ClusterDistributionManager dm) {
- InternalCache cache = dm.getCache();
- if (cache != null) {
- TXManagerImpl mgr = cache.getTXMgr();
- mgr.expireDisconnectedClientTransactions(this.txIds, false);
- }
- }
- }
-
/** timer task for expiring the given TXStates */
public void expireDisconnectedClientTransactions(Set<TXId> txIds, boolean
distribute) {
- long timeout = TimeUnit.SECONDS.toMillis(getTransactionTimeToLive());
- if (distribute) {
- if (timeout <= 0) {
- removeClientTransactionsOnRemoteServer(txIds);
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("expiring the following transactions: {}",
Arrays.toString(txIds.toArray()));
- }
- // schedule to send remove message to server with version earlier than
geode 1.7.0
- SystemTimerTask task = new SystemTimerTask() {
- @Override
- public void run2() {
- removeClientTransactionsOnRemoteServer(txIds);
- }
- };
- getCache().getCCPTimer().schedule(task, timeout);
- }
- }
- // schedule to expire client transactions on server with version geode
1.7.0 and after.
- scheduleToExpireDisconnectedClientTransactions(txIds, distribute);
- }
-
- void removeClientTransactionsOnRemoteServer(Set<TXId> txIds) {
- TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(),
txIds);
- }
-
- /** timer task for expiring the given TXStates */
- public void scheduleToExpireDisconnectedClientTransactions(Set<TXId> txIds,
boolean distribute) {
// increase the client transaction timeout setting to avoid a late
in-flight client operation
// preventing the expiration of the client transaction.
long timeout = (long)
(TimeUnit.SECONDS.toMillis(getTransactionTimeToLive()) * 1.1);
@@ -1500,7 +1372,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
SystemTimerTask task = new SystemTimerTask() {
@Override
public void run2() {
- scheduleToRemoveExpiredClientTransction(txId);
+ scheduleToRemoveExpiredClientTransaction(txId);
if (scheduledToBeRemovedTx != null) {
scheduledToBeRemovedTx.remove(txId);
}
@@ -1510,7 +1382,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
}
}
- void scheduleToRemoveExpiredClientTransction(TXId txId) {
+ void scheduleToRemoveExpiredClientTransaction(TXId txId) {
synchronized (this.hostedTXStates) {
TXStateProxy result = hostedTXStates.get(txId);
if (result != null) {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java
new file mode 100644
index 0000000..3c32620
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
+
+public class ExpireDisconnectedClientTransactionsMessageTest {
+ private final ClusterDistributionManager dm =
mock(ClusterDistributionManager.class);
+ private final InternalCache cache = mock(InternalCache.class);
+ private final TXManagerImpl txManager = mock(TXManagerImpl.class);
+ private final InternalDistributedMember sender =
mock(InternalDistributedMember.class);
+ private final ExpireDisconnectedClientTransactionsMessage message =
+ spy(new ExpireDisconnectedClientTransactionsMessage());
+ private Version version = mock(Version.class);
+
+ @Before
+ public void setup() {
+ when(dm.getCache()).thenReturn(cache);
+ when(cache.getTXMgr()).thenReturn(txManager);
+ doReturn(sender).when(message).getSender();
+ when(sender.getVersionObject()).thenReturn(version);
+ }
+
+ @Test
+ public void
processMessageFromServerOfGeode170AndLaterVersionWillExpireDisconnectedClientTransactions()
{
+ when(version.compareTo(Version.GEODE_170)).thenReturn(1);
+
+ message.process(dm);
+
+ verify(txManager, times(1)).expireDisconnectedClientTransactions(any(),
eq(false));
+ }
+
+ @Test
+ public void
processMessageFromServerOfPriorGeode170VersionWillRemoveExpiredClientTransactions()
{
+ when(version.compareTo(Version.GEODE_170)).thenReturn(-1);
+
+ message.process(dm);
+
+ verify(txManager, times(1)).removeExpiredClientTransactions(any());
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
index cf42bab..77c67e5 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
@@ -395,27 +395,12 @@ public class TXManagerImplTest {
TXStateProxyImpl tx = spy((TXStateProxyImpl)
txMgr.getOrSetHostedTXState(txid, msg));
doReturn(true).when(tx).isOverTransactionTimeoutLimit();
- txMgr.scheduleToRemoveExpiredClientTransction(txid);
+ txMgr.scheduleToRemoveExpiredClientTransaction(txid);
assertTrue(txMgr.isHostedTXStatesEmpty());
}
@Test
- public void
processExpireDisconnectedClientTransactionsMessageWillExpireDisconnectedClientTransactions()
{
- TXManagerImpl.ExpireDisconnectedClientTransactionsMessage message =
- new TXManagerImpl.ExpireDisconnectedClientTransactionsMessage();
-
- InternalCache cache = mock(InternalCache.class);
- TXManagerImpl txManager = mock(TXManagerImpl.class);
- when(dm.getCache()).thenReturn(cache);
- when(cache.getTXMgr()).thenReturn(txManager);
-
- message.process(dm);
-
- verify(txManager, times(1)).expireDisconnectedClientTransactions(any(),
eq(false));
- }
-
- @Test
public void
clientTransactionsToBeRemovedAndDistributedAreSentToRemoveServerIfWithNoTimeout()
{
Set<TXId> txIds = (Set<TXId>) mock(Set.class);
doReturn(0).when(spyTxMgr).getTransactionTimeToLive();
@@ -428,8 +413,7 @@ public class TXManagerImplTest {
spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
- verify(spyTxMgr,
times(1)).removeClientTransactionsOnRemoteServer(eq(txIds));
- verify(spyTxMgr,
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
+ verify(spyTxMgr,
times(1)).expireClientTransactionsOnRemoteServer(eq(txIds));
}
@Test
@@ -453,8 +437,7 @@ public class TXManagerImplTest {
spyTxMgr.expireDisconnectedClientTransactions(txIds, false);
- verify(spyTxMgr,
never()).removeClientTransactionsOnRemoteServer(eq(txIds));
- verify(spyTxMgr,
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(false));
+ verify(spyTxMgr,
never()).expireClientTransactionsOnRemoteServer(eq(txIds));
verify(spyTxMgr, times(1)).removeHostedTXState(eq(txIds));
verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId1));
verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId3));
@@ -463,21 +446,10 @@ public class TXManagerImplTest {
}
@Test
- public void
clientTransactionsToBeRemovedAndDistributedAreScheduledToSentToRemoveServerIfWithTimeout()
{
- Set<TXId> txIds = mock(Set.class);
- doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
-
- spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
-
- verify(timer, times(1)).schedule(any(), eq(1000L));
- verify(spyTxMgr,
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
- }
-
- @Test
public void
clientTransactionsToBeExpiredAndDistributedAreSentToRemoveServer() {
Set<TXId> txIds = mock(Set.class);
- spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, true);
+ spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
verify(spyTxMgr,
times(1)).expireClientTransactionsOnRemoteServer(eq(txIds));
}
@@ -486,7 +458,7 @@ public class TXManagerImplTest {
public void clientTransactionsNotToBeDistributedAreNotSentToRemoveServer() {
Set<TXId> txIds = mock(Set.class);
- spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, false);
+ spyTxMgr.expireDisconnectedClientTransactions(txIds, false);
verify(spyTxMgr,
never()).expireClientTransactionsOnRemoteServer(eq(txIds));
}
@@ -503,7 +475,7 @@ public class TXManagerImplTest {
set.add(txId1);
set.add(txId2);
- spyTxMgr.scheduleToExpireDisconnectedClientTransactions(set, false);
+ spyTxMgr.expireDisconnectedClientTransactions(set, false);
verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId1),
eq(1100L));
verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId2),
eq(1100L));
@@ -518,7 +490,7 @@ public class TXManagerImplTest {
}
@Test
- public void clientTransactionIsScheduledToBeReIfWithNoTimeout() {
+ public void clientTransactionIsScheduledToBeRemovedIfWithTimeout() {
spyTxMgr.scheduleToRemoveClientTransaction(txid, 1000);
verify(timer, times(1)).schedule(any(), eq(1000L));