This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5401
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5401 by this
push:
new 0beb056 GEODE-5401: Check if transaction has been failed over before
expiring client transactions.
0beb056 is described below
commit 0beb0565e8221926ec43631bbbe91175009dcb11
Author: eshu <[email protected]>
AuthorDate: Wed Jul 25 15:45:35 2018 -0700
GEODE-5401: Check if transaction has been failed over before expiring
client transactions.
Add a new message when sending expire client transactions to peers.
---
...ntServerTransactionFailoverDistributedTest.java | 338 ++++++++++++++++
...overWithMixedVersionServersDistributedTest.java | 443 +++++++++++++++++++++
.../org/apache/geode/internal/DSFIDFactory.java | 2 +
.../geode/internal/DataSerializableFixedID.java | 1 +
.../apache/geode/internal/cache/TXManagerImpl.java | 207 +++++++++-
.../geode/internal/cache/TXStateProxyImpl.java | 23 ++
.../cache/tier/sockets/ClientHealthMonitor.java | 36 +-
.../geode/internal/cache/TXManagerImplTest.java | 188 ++++++++-
.../geode/internal/cache/TXStateProxyImplTest.java | 22 +
9 files changed, 1203 insertions(+), 57 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
new file mode 100644
index 0000000..1d6f413
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.FutureTask;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class ClientServerTransactionFailoverDistributedTest implements
Serializable {
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+ private VM server1;
+ private VM server2;
+ private VM server3;
+ private VM server4;
+ private int port1;
+ private int port2;
+ private int port3;
+ private int port4;
+
+ private final int numOfOperationsInTransaction = 10;
+ private final int key1 = 1;
+ private final String originalValue = "originalValue";
+
+ @Rule
+ public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setUp() {
+ server1 = getVM(0);
+ server2 = getVM(1);
+ server3 = getVM(2);
+ server4 = getVM(3);
+
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ }
+
+ @Test
+ public void
clientLongTransactionDoesNotLoseOperationsAfterFailoverDirectlyToTransactionHost()
+ throws Exception {
+ port1 = server1.invoke(() -> createServerRegion(1, false));
+ server1.invoke(() -> doPut(key1, originalValue));
+ port2 = server2.invoke(() -> createServerRegion(1, true));
+
+ int numOfOpertions = 5;
+ createClientRegion(true, port2, port1);
+ TransactionId txId = suspendTransaction(numOfOpertions);
+ server2.invoke(() -> {
+ cacheRule.getCache().close();
+ });
+ resumeTransaction(txId, numOfOpertions);
+
+ server1.invoke(() -> verifyTransactionResult(1, numOfOpertions));
+ }
+
+ private void doPut(int key, String value) {
+ cacheRule.getCache().getRegion(regionName).put(key, value);
+ }
+
+ private int createServerRegion(int totalNumBuckets, boolean isAccessor)
throws Exception {
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ factory.setTotalNumBuckets(totalNumBuckets);
+ if (isAccessor) {
+ factory.setLocalMaxMemory(0);
+ }
+ PartitionAttributes partitionAttributes = factory.create();
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(partitionAttributes).create(regionName);
+
+ TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+ txManager.setTransactionTimeToLiveForTest(4);
+
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ return server.getPort();
+ }
+
+ private void createClientRegion(boolean connectToFirstPort, int... ports) {
+ clientCacheRule.createClientCache();
+
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ PoolImpl pool;
+ try {
+ pool = getPool(connectToFirstPort, ports);
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+
+ ClientRegionFactory crf =
+
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+ crf.setPoolName(pool.getName());
+ crf.create(regionName);
+
+ if (ports.length > 1 && connectToFirstPort) {
+ // first connection to the first port in the list
+ pool.acquireConnection(new ServerLocation(hostName, ports[0]));
+ }
+ }
+
+ private PoolImpl getPool(boolean connectToFirstPort, int... ports) {
+ PoolFactory factory = PoolManager.createFactory();
+ for (int port : ports) {
+ factory.addServer(hostName, port);
+ }
+ factory.setReadTimeout(12000).setSocketBufferSize(1000);
+
+ return (PoolImpl) factory.create(uniqueName);
+ }
+
+ private TransactionId suspendTransaction(int numOfOperations) {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ TXManagerImpl txManager =
+ (TXManagerImpl)
clientCacheRule.getClientCache().getCacheTransactionManager();
+
+ txManager.begin();
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ int whichTransaction = ((TXId)
txStateProxy.getTransactionId()).getUniqId();
+ int key = getKey(whichTransaction, numOfOperations);
+ String value = getValue(key);
+ region.put(key, value);
+
+ return txManager.suspend();
+ }
+
+ private void resumeTransaction(TransactionId txId, int numOfOperations)
throws Exception {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ TXManagerImpl txManager =
+ (TXManagerImpl)
clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.resume(txId);
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ int whichTransaction = ((TXId)
txStateProxy.getTransactionId()).getUniqId();
+
+ int initialKey = getKey(whichTransaction, numOfOperations);
+ int key = 0;
+ for (int i = 0; i < numOfOperations; i++) {
+ key = initialKey + i;
+ String value = getValue(key);
+ region.put(key, value);
+ Thread.sleep(1000);
+ }
+ txManager.commit();
+ }
+
+ private void verifyTransactionResult(int numOfTransactions, int
numOfOperations) {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ int numOfEntries = numOfOperations * numOfTransactions;
+ for (int i = 1; i <= numOfEntries; i++) {
+ LogService.getLogger().info("region get key {} value {} ", i,
region.get(i));
+ }
+ for (int i = 1; i <= numOfEntries; i++) {
+ assertEquals("value" + i, region.get(i));
+ }
+ }
+
+ private ClientProxyMembershipID getClientId() {
+ DistributedMember distributedMember =
+
clientCacheRule.getClientCache().getInternalDistributedSystem().getDistributedMember();
+ return ClientProxyMembershipID.getClientId(distributedMember);
+ }
+
+ private void unregisterClient(ClientProxyMembershipID
clientProxyMembershipID) throws Exception {
+ ClientHealthMonitor clientHealthMonitor =
ClientHealthMonitor.getInstance();
+
clientHealthMonitor.removeAllConnectionsAndUnregisterClient(clientProxyMembershipID,
+ new Exception());
+ }
+
+ private int getKey(int whichTransaction, int numOfOperations) {
+ return numOfOperations * (whichTransaction - 1) + 1;
+ }
+
+ private String getValue(int key) {
+ return "value" + key;
+ }
+
+ @Test
+ public void
multipleClientLongTransactionsCanFailoverWithoutLosingOperations() throws
Exception {
+ // set up
+ setupClientAndServerForMultipleTransactions();
+
+ int numOfTransactions = 12;
+ Thread[] threads = new Thread[numOfTransactions];
+ FutureTask<TransactionId>[] futureTasks = new
FutureTask[numOfTransactions];
+ TransactionId[] txIds = new TransactionId[numOfTransactions];
+
+ // suspend transactions
+ suspendTransactions(numOfTransactions, numOfOperationsInTransaction,
threads, futureTasks,
+ txIds);
+
+ // unregister client on 2 of the servers
+ ClientProxyMembershipID clientProxyMembershipID = getClientId();
+ server1.invoke(() -> unregisterClient(clientProxyMembershipID));
+ server2.invoke(() -> unregisterClient(clientProxyMembershipID));
+
+ resumeTransactions(numOfTransactions, numOfOperationsInTransaction,
threads, txIds);
+ waitForResumeTransactionsToComplete(numOfTransactions, threads);
+
+ server4.invoke(() -> verifyTransactionResult(numOfTransactions,
numOfOperationsInTransaction));
+ }
+
+ @Test
+ public void
multipleClientLongTransactionsCanFailoverMultipleTimesWithoutLosingOperations()
+ throws Exception {
+ // set up
+ setupClientAndServerForMultipleTransactions();
+
+ int numOfTransactions = 12;
+ int numOfOperations = 12;
+ Thread[] threads = new Thread[numOfTransactions];
+ FutureTask<TransactionId>[] futureTasks = new
FutureTask[numOfTransactions];
+ TransactionId[] txIds = new TransactionId[numOfTransactions];
+
+ // suspend transactions
+ suspendTransactions(numOfTransactions, numOfOperations, threads,
futureTasks, txIds);
+
+ // unregister client multiple times
+ ClientProxyMembershipID clientProxyMembershipID = getClientId();
+
+ resumeTransactions(numOfTransactions, numOfOperations, threads, txIds);
+ unregisterClientMultipleTimes(clientProxyMembershipID);
+ waitForResumeTransactionsToComplete(numOfTransactions, threads);
+
+ server4.invoke(() -> verifyTransactionResult(numOfTransactions,
numOfOperations));
+ }
+
+ private void waitForResumeTransactionsToComplete(int numOfTransactions,
Thread[] threads)
+ throws InterruptedException {
+ for (int i = 0; i < numOfTransactions; i++) {
+ threads[i].join();
+ }
+ }
+
+ private void suspendTransactions(int numOfTransactions, int numOfOperations,
Thread[] threads,
+ FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ for (int i = 0; i < numOfTransactions; i++) {
+ FutureTask<TransactionId> futureTask =
+ new FutureTask<>(() -> suspendTransaction(numOfOperations));
+ futureTasks[i] = futureTask;
+ Thread thread = new Thread(futureTask);
+ threads[i] = thread;
+ thread.start();
+ }
+
+ for (int i = 0; i < numOfTransactions; i++) {
+ txIds[i] = futureTasks[i].get();
+ }
+ }
+
+ private void setupClientAndServerForMultipleTransactions() {
+ port4 = server4.invoke(() -> createServerRegion(1, false));
+ server4.invoke(() -> doPut(key1, originalValue));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+ port2 = server2.invoke(() -> createServerRegion(1, true));
+ port3 = server3.invoke(() -> createServerRegion(1, true));
+ createClientRegion(false, port1, port2, port3, port4);
+ }
+
+ private void resumeTransactions(int numOfTransactions, int numOfOperations,
Thread[] threads,
+ TransactionId[] txIds)
+ throws InterruptedException {
+ for (int i = 0; i < numOfTransactions; i++) {
+ TransactionId txId = txIds[i];
+ Thread thread = new Thread(() -> {
+ try {
+ resumeTransaction(txId, numOfOperations);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ thread.start();
+ threads[i] = thread;
+ }
+ }
+
+ private void unregisterClientMultipleTimes(ClientProxyMembershipID
clientProxyMembershipID)
+ throws Exception {
+ int numOfUnregisterClients = 4;
+ for (int i = 0; i < numOfUnregisterClients; i++) {
+ getVM(i).invoke(() -> unregisterClient(clientProxyMembershipID));
+ Thread.sleep(1000);
+ }
+ }
+}
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
new file mode 100644
index 0000000..90e378c
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static
org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static
org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.Properties;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.standalone.VersionManager;
+import
org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class
ClientServerTransactionFailoverWithMixedVersionServersDistributedTest
+ implements Serializable {
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+ private VM server1;
+ private VM server2;
+ private VM server3;
+ private VM server4;
+ private VM locator;
+ private VM client;
+ private int locatorPort;
+ private File locatorLog;
+ private Host host;
+ private final int transactionTimeoutSecond = 2;
+
+ @ClassRule
+ public static DistributedTestRule distributedTestRule = new
DistributedTestRule(6);
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new
SerializableTemporaryFolder();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setup() throws Exception {
+ host = Host.getHost(0);
+ String startingVersion = "160";
+ server1 = host.getVM(startingVersion, 0);
+ server2 = host.getVM(startingVersion, 1);
+ server3 = host.getVM(startingVersion, 2);
+ server4 = host.getVM(startingVersion, 3);
+ client = host.getVM(startingVersion, 4);
+ locator = host.getVM(startingVersion, 5);
+
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ locatorLog = new File(temporaryFolder.newFolder(uniqueName),
"locator.log");
+ }
+
+ @Test
+ public void
clientTransactionOperationsAreNotLostIfTransactionIsOnRolledServer()
+ throws Exception {
+ setupPartiallyRolledVersion();
+
+ server1.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> createServerRegion(1, true));
+ server3.invoke(() -> createServerRegion(1, true));
+ server4.invoke(() -> createServerRegion(1, true));
+ client.invoke(() -> createClientRegion());
+
+ ClientProxyMembershipID clientProxyMembershipID = client.invoke(() ->
getClientId());
+
+ int numOfTransactions = 12;
+ int numOfOperations = 12;
+ client.invokeAsync(() -> doTransactions(numOfTransactions,
numOfOperations));
+
+ unregisterClientMultipleTimes(clientProxyMembershipID);
+
+ server1.invoke(() -> verifyTransactionResult(numOfTransactions,
numOfOperations, false));
+ client.invoke(() -> verifyTransactionResult(numOfTransactions,
numOfOperations, true));
+ }
+
+ private void doTransactions(int numOfTransactions, int numOfOperations)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ Thread[] threads = new Thread[numOfTransactions];
+ FutureTask<TransactionId>[] futureTasks = new
FutureTask[numOfTransactions];
+ TransactionId[] txIds = new TransactionId[numOfTransactions];
+ // suspend transactions
+ suspendTransactions(numOfTransactions, numOfOperations, threads,
futureTasks, txIds);
+ // resume transactions
+ resumeTransactions(numOfTransactions, numOfOperations, threads, txIds);
+ waitForResumeTransactionsToComplete(numOfTransactions, threads);
+ }
+
+ private void setupPartiallyRolledVersion() throws Exception {
+ locatorPort = locator.invoke(() -> startLocator());
+ server1.invoke(() -> createCacheServer());
+ server2.invoke(() -> createCacheServer());
+ server3.invoke(() -> createCacheServer());
+ server4.invoke(() -> createCacheServer());
+ client.invoke(() -> createClientCache());
+
+ // roll locator
+ locator = rollLocatorToCurrent(locator);
+ // roll server1
+ server1 = rollServerToCurrent(server1);
+ server2 = rollServerToCurrent(server2);
+ }
+
+ private int startLocator() throws IOException {
+ Properties config = createLocatorConfig();
+ InetAddress bindAddress = InetAddress.getByName(hostName);
+ Locator locator = Locator.startLocatorAndDS(locatorPort, locatorLog,
bindAddress, config);
+ return locator.getPort();
+ }
+
+ private Properties createLocatorConfig() {
+ Properties config = new Properties();
+ config.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+ config.setProperty(ENABLE_NETWORK_PARTITION_DETECTION, "false");
+ config.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+ return config;
+ }
+
+ private void createCacheServer() throws Exception {
+ cacheRule.createCache(createServerConfig());
+
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ }
+
+ private Properties createServerConfig() {
+ Properties config = createLocatorConfig();
+ config.setProperty(LOCATORS, hostName + "[" + locatorPort + "]");
+ return config;
+ }
+
+ private void createClientCache() {
+ clientCacheRule.createClientCache();
+ }
+
+ private VM rollLocatorToCurrent(VM oldLocator) throws Exception {
+ // Roll the locator
+ oldLocator.invoke(() -> stopLocator());
+ VM rollLocator = host.getVM(VersionManager.CURRENT_VERSION,
oldLocator.getId());
+ rollLocator.invoke(() -> startLocator());
+ return rollLocator;
+ }
+
+ private void stopLocator() {
+ Locator.getLocator().stop();
+ }
+
+ private VM rollServerToCurrent(VM oldServer) throws Exception {
+ // Roll the server
+ oldServer.invoke(() -> cacheRule.getCache().close());
+ VM rollServer = host.getVM(VersionManager.CURRENT_VERSION,
oldServer.getId());
+ rollServer.invoke(() -> createCacheServer());
+ return rollServer;
+ }
+
+ private void createServerRegion(int totalNumBuckets, boolean isAccessor)
throws Exception {
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ factory.setTotalNumBuckets(totalNumBuckets);
+ if (isAccessor) {
+ factory.setLocalMaxMemory(0);
+ }
+ PartitionAttributes partitionAttributes = factory.create();
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(partitionAttributes).create(regionName);
+
+ TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+ txManager.setTransactionTimeToLiveForTest(transactionTimeoutSecond);
+ }
+
+ private void createClientRegion() throws Exception {
+ Pool pool = PoolManager.createFactory().addLocator(hostName,
locatorPort).create(uniqueName);
+
+ ClientRegionFactory crf =
+
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+ crf.setPoolName(pool.getName());
+ crf.create(regionName);
+ }
+
+ private void suspendTransactions(int numOfTransactions, int numOfOperations,
Thread[] threads,
+ FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ for (int i = 0; i < numOfTransactions; i++) {
+ FutureTask<TransactionId> futureTask =
+ new FutureTask<>(() -> suspendTransaction(numOfOperations));
+ futureTasks[i] = futureTask;
+ Thread thread = new Thread(futureTask);
+ threads[i] = thread;
+ thread.start();
+ }
+
+ for (int i = 0; i < numOfTransactions; i++) {
+ txIds[i] = futureTasks[i].get();
+ }
+ }
+
+ private TransactionId suspendTransaction(int numOfOperations) {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ TXManagerImpl txManager =
+ (TXManagerImpl)
clientCacheRule.getClientCache().getCacheTransactionManager();
+ startTransaction(numOfOperations, region, txManager);
+
+ return txManager.suspend();
+ }
+
+ private void startTransaction(int numOfOperations, Region region,
TXManagerImpl txManager) {
+ txManager.begin();
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ int whichTransaction = ((TXId)
txStateProxy.getTransactionId()).getUniqId();
+ int key = getKey(whichTransaction, numOfOperations);
+ String value = getValue(key);
+ region.put(key, value);
+ }
+
+ private int getKey(int whichTransaction, int numOfOperations) {
+ return numOfOperations * (whichTransaction - 1) + 1;
+ }
+
+ private String getValue(int key) {
+ return "value" + key;
+ }
+
+ private ClientProxyMembershipID getClientId() {
+ DistributedMember distributedMember =
+
clientCacheRule.getClientCache().getInternalDistributedSystem().getDistributedMember();
+ return ClientProxyMembershipID.getClientId(distributedMember);
+ }
+
+ private void resumeTransactions(int numOfTransactions, int numOfOperations,
Thread[] threads,
+ TransactionId[] txIds)
+ throws InterruptedException {
+ for (int i = 0; i < numOfTransactions; i++) {
+ TransactionId txId = txIds[i];
+ Thread thread = new Thread(() -> {
+ try {
+ resumeTransaction(txId, numOfOperations);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ thread.start();
+ threads[i] = thread;
+ }
+ }
+
+ private void resumeTransaction(TransactionId txId, int numOfOperations)
throws Exception {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ TXManagerImpl txManager =
+ (TXManagerImpl)
clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.resume(txId);
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ int whichTransaction = ((TXId)
txStateProxy.getTransactionId()).getUniqId();
+
+ int initialKey = getKey(whichTransaction, numOfOperations);
+ int key = 0;
+ for (int i = 0; i < numOfOperations; i++) {
+ key = initialKey + i;
+ String value = getValue(key);
+ region.put(key, value);
+ Thread.sleep(1000);
+ }
+ txManager.commit();
+ }
+
+ private void unregisterClientMultipleTimes(ClientProxyMembershipID
clientProxyMembershipID)
+ throws Exception {
+ int numOfUnregisterClients = 4;
+ for (int i = 0; i < numOfUnregisterClients; i++) {
+ getVM(i).invoke(() -> unregisterClient(clientProxyMembershipID));
+ Thread.sleep(1000);
+ }
+ }
+
+ private void unregisterClient(ClientProxyMembershipID
clientProxyMembershipID) throws Exception {
+ ClientHealthMonitor clientHealthMonitor =
ClientHealthMonitor.getInstance();
+
clientHealthMonitor.removeAllConnectionsAndUnregisterClient(clientProxyMembershipID,
+ new Exception());
+ }
+
+ private void waitForResumeTransactionsToComplete(int numOfTransactions,
Thread[] threads)
+ throws InterruptedException {
+ for (int i = 0; i < numOfTransactions; i++) {
+ threads[i].join();
+ }
+ }
+
+ private void verifyTransactionResult(int numOfTransactions, int
numOfOperations,
+ boolean isClient) {
+ Region region;
+ if (isClient) {
+ region = clientCacheRule.getClientCache().getRegion(regionName);
+ } else {
+ region = cacheRule.getCache().getRegion(regionName);
+ }
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> assertThat(region.get(1)).isEqualTo("value1"));
+ int numOfEntries = numOfOperations * numOfTransactions;
+ for (int i = 1; i <= numOfEntries; i++) {
+ LogService.getLogger().info("region get key {} value {} ", i,
region.get(i));
+ }
+ for (int i = 1; i <= numOfEntries; i++) {
+ assertEquals("value" + i, region.get(i));
+ }
+ }
+
+ @Test
+ public void clientTransactionExpiredAreRemovedOnRolledServer() throws
Exception {
+ setupPartiallyRolledVersion();
+
+ server1.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> createServerRegion(1, true));
+ server3.invoke(() -> createServerRegion(1, true));
+ server4.invoke(() -> createServerRegion(1, true));
+ client.invoke(() -> createClientRegion());
+
+ ClientProxyMembershipID clientProxyMembershipID = client.invoke(() ->
getClientId());
+
+ int numOfTransactions = 12;
+ int numOfOperations = 1;
+ client.invokeAsync(() -> doUnfinishedTransactions(numOfTransactions,
numOfOperations));
+
+ server1.invoke(() -> verifyTransactionAreStarted(numOfTransactions));
+
+ unregisterClientMultipleTimes(clientProxyMembershipID);
+
+ server1.invoke(() -> verifyTransactionAreExpired(numOfTransactions));
+ }
+
+ private void doUnfinishedTransactions(int numOfTransactions, int
numOfOperations)
+ throws InterruptedException, java.util.concurrent.ExecutionException {
+ Thread[] threads = new Thread[numOfTransactions];
+ for (int i = 0; i < numOfTransactions; i++) {
+ Thread thread = new Thread(() -> startTransaction(numOfOperations));
+ threads[i] = thread;
+ thread.start();
+ }
+
+ for (int i = 0; i < numOfTransactions; i++) {
+ threads[i].join();
+ }
+ }
+
+ private void startTransaction(int numOfOperations) {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ TXManagerImpl txManager =
+ (TXManagerImpl)
clientCacheRule.getClientCache().getCacheTransactionManager();
+ startTransaction(numOfOperations, region, txManager);
+ }
+
+ private void verifyTransactionAreStarted(int numOfTransactions) {
+ TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() ->
assertThat(txManager.hostedTransactionsInProgressForTest())
+ .isEqualTo(numOfTransactions));
+ }
+
+ private void verifyTransactionAreExpired(int numOfTransactions) {
+ TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ .until(() ->
assertThat(txManager.hostedTransactionsInProgressForTest()).isEqualTo(0));
+ }
+
+ @Test
+ public void clientTransactionExpiredAreRemovedOnNotYetRolledServer() throws
Exception {
+ setupPartiallyRolledVersion();
+
+ server1.invoke(() -> createServerRegion(1, true));
+ server2.invoke(() -> createServerRegion(1, true));
+ server3.invoke(() -> createServerRegion(1, true));
+ server4.invoke(() -> createServerRegion(1, false));
+ client.invoke(() -> createClientRegion());
+
+ ClientProxyMembershipID clientProxyMembershipID = client.invoke(() ->
getClientId());
+
+ int numOfTransactions = 12;
+ int numOfOperations = 1;
+ client.invokeAsync(() -> doUnfinishedTransactions(numOfTransactions,
numOfOperations));
+
+ server4.invoke(() -> verifyTransactionAreStarted(numOfTransactions));
+
+ unregisterClientMultipleTimes(clientProxyMembershipID);
+
+ server4.invoke(() -> verifyTransactionAreExpired(numOfTransactions));
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 204459f..a6ee477 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -937,6 +937,8 @@ public class DSFIDFactory implements
DataSerializableFixedID {
registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+ registerDSFID(EXPIRE_CLIENT_TRANSACTIONS,
+ TXManagerImpl.ExpireDisconnectedClientTransactionsMessage.class);
}
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index ecf20ab..c2a40d3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -823,6 +823,7 @@ public interface DataSerializableFixedID extends
SerializationVersions {
short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181;
short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
short ABORT_BACKUP_REQUEST = 2183;
+ short EXPIRE_CLIENT_TRANSACTIONS = 2184;
// NOTE, codes > 65535 will take 4 bytes to serialize
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 639656f..18cf5ef 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -60,6 +60,7 @@ import
org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MembershipListener;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
+import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
@@ -119,6 +120,10 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
private final Map<TXId, TXStateProxy> hostedTXStates;
+ private final Set<TXId> scheduledToBeRemovedTx =
+ Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"trackScheduledToBeRemovedTx")
+ ? new ConcurrentHashSet<TXId>() : null;
+
/**
* the number of client initiated transactions to store for client failover
*/
@@ -997,8 +1002,6 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
return val;
}
-
-
/**
* Associate the transactional state with this thread.
*
@@ -1017,6 +1020,9 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
*/
public void unmasquerade(TXStateProxy tx) {
if (tx != null) {
+ if (tx.isOnBehalfOfClient()) {
+ updateLastOperationTime(tx);
+ }
cleanupTransactionIfNoLongerHost(tx);
setTXState(null);
tx.getLock().unlock();
@@ -1035,8 +1041,12 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
}
}
+ void updateLastOperationTime(TXStateProxy tx) {
+ ((TXStateProxyImpl)
tx).setLastOperationTimeFromClient(System.currentTimeMillis());
+ }
+
/**
- * Cleanup the remote txState after commit and rollback
+ * Cleanup the txState
*
* @return the TXStateProxy
*/
@@ -1050,6 +1060,12 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
}
}
+ public void removeHostedTXState(Set<TXId> txIds) {
+ for (TXId txId : txIds) {
+ removeHostedTXState(txId);
+ }
+ }
+
/**
* Called when the CacheServer is shutdown. Removes txStates hosted on
client's behalf
*/
@@ -1167,12 +1183,27 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
return result;
}
- /** remove the given TXStates */
- public void removeTransactions(Set<TXId> txIds, boolean distribute) {
+ /**
+ * This method is only being invoked by pre geode 1.7.0 server during
rolling upgrade now.
+ * The remote server has waited for transactionTimeToLive and require this
server to
+ * remove the client transactions. Need to check if there is no activity of
the client
+ * transaction.
+ */
+ public void removeExpiredClientTransactions(Set<TXId> txIds) {
if (logger.isDebugEnabled()) {
logger.debug("expiring the following transactions: {}", txIds);
}
synchronized (this.hostedTXStates) {
+ for (TXId txId : txIds) {
+ // only expire client transaction if no activity for the given
transactionTimeToLive
+ scheduleToRemoveExpiredClientTransction(txId);
+ }
+ }
+ }
+
+ /** remove the given TXStates for test */
+ public void removeTransactions(Set<TXId> txIds, boolean distribute) {
+ synchronized (this.hostedTXStates) {
Iterator<Map.Entry<TXId, TXStateProxy>> iterator =
this.hostedTXStates.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<TXId, TXStateProxy> entry = iterator.next();
@@ -1182,10 +1213,6 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
}
}
}
- if (distribute) {
- // tell other VMs to also remove the transactions
- TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(),
txIds);
- }
}
void saveTXStateForClientFailover(TXStateProxy tx) {
@@ -1314,10 +1341,19 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
/** for deserialization */
public TXRemovalMessage() {}
- static void send(DistributionManager dm, Set recipients, Set<TXId> txIds) {
+ static void send(DistributionManager dm, Set<InternalDistributedMember>
recipients,
+ Set<TXId> txIds) {
TXRemovalMessage msg = new TXRemovalMessage();
msg.txIds = txIds;
- msg.setRecipients(recipients);
+ // only send to servers with version earlier than geode 1.7.0
+ // newer version use ExpireDisconnectedClientTransactionsMessage
+ Set oldVersionRecipients = new HashSet();
+ for (InternalDistributedMember recipient : recipients) {
+ if (recipient.getVersionObject().compareTo(Version.GEODE_170) < 0) {
+ oldVersionRecipients.add(recipient);
+ }
+ }
+ msg.setRecipients(oldVersionRecipients);
dm.putOutgoing(msg);
}
@@ -1340,7 +1376,148 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
InternalCache cache = dm.getCache();
if (cache != null) {
TXManagerImpl mgr = cache.getTXMgr();
- mgr.removeTransactions(this.txIds, false);
+ // check if transaction has been updated before remove it
+ mgr.removeExpiredClientTransactions(this.txIds);
+ }
+ }
+ }
+
+ public static class ExpireDisconnectedClientTransactionsMessage
+ extends HighPriorityDistributionMessage {
+ Set<TXId> txIds;
+
+ /** for deserialization */
+ public ExpireDisconnectedClientTransactionsMessage() {}
+
+ // only send to geode 1.7.0 and later servers
+ static void send(DistributionManager dm, Set<InternalDistributedMember>
recipients,
+ Set<TXId> txIds) {
+ ExpireDisconnectedClientTransactionsMessage msg =
+ new ExpireDisconnectedClientTransactionsMessage();
+ msg.txIds = txIds;
+ Set newVersionRecipients = new HashSet();
+ for (InternalDistributedMember recipient : recipients) {
+ // to geode 1.7.0 and later version servers
+ if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
+ newVersionRecipients.add(recipient);
+ }
+ }
+ msg.setRecipients(newVersionRecipients);
+ dm.putOutgoing(msg);
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
+ this.txIds = DataSerializer.readHashSet(in);
+ }
+
+ public int getDSFID() {
+ return EXPIRE_CLIENT_TRANSACTIONS;
+ }
+
+ @Override
+ protected void process(ClusterDistributionManager dm) {
+ InternalCache cache = dm.getCache();
+ if (cache != null) {
+ TXManagerImpl mgr = cache.getTXMgr();
+ mgr.expireDisconnectedClientTransactions(this.txIds, false);
+ }
+ }
+ }
+
+ /** timer task for expiring the given TXStates */
+ public void expireDisconnectedClientTransactions(Set<TXId> txIds, boolean
distribute) {
+ long timeout = TimeUnit.SECONDS.toMillis(getTransactionTimeToLive());
+ if (distribute) {
+ if (timeout <= 0) {
+ removeClientTransactionsOnRemoteServer(txIds);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("expiring the following transactions: {}",
Arrays.toString(txIds.toArray()));
+ }
+ // schedule to send remove message to server with version earlier than
geode 1.7.0
+ SystemTimerTask task = new SystemTimerTask() {
+ @Override
+ public void run2() {
+ removeClientTransactionsOnRemoteServer(txIds);
+ }
+ };
+ getCache().getCCPTimer().schedule(task, timeout);
+ }
+ }
+ // schedule to expire client transactions on server with version geode
1.7.0 and after.
+ scheduleToExpireDisconnectedClientTransactions(txIds, distribute);
+ }
+
+ void removeClientTransactionsOnRemoteServer(Set<TXId> txIds) {
+ TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(),
txIds);
+ }
+
+ /** timer task for expiring the given TXStates */
+ public void scheduleToExpireDisconnectedClientTransactions(Set<TXId> txIds,
boolean distribute) {
+ // increase the client transaction timeout setting to avoid a late
in-flight client operation
+ // preventing the expiration of the client transaction.
+ long timeout = (long)
(TimeUnit.SECONDS.toMillis(getTransactionTimeToLive()) * 1.1);
+ if (timeout <= 0) {
+ removeHostedTXState(txIds);
+ }
+ synchronized (this.hostedTXStates) {
+ Iterator<Map.Entry<TXId, TXStateProxy>> iterator =
this.hostedTXStates.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TXId, TXStateProxy> entry = iterator.next();
+ if (txIds.contains(entry.getKey())) {
+ scheduleToRemoveClientTransaction(entry.getKey(), timeout);
+ }
+ }
+ }
+ if (distribute) {
+ expireClientTransactionsOnRemoteServer(txIds);
+ }
+ }
+
+ void expireClientTransactionsOnRemoteServer(Set<TXId> txIds) {
+ // tell other VMs to also add tasks to expire the transactions
+ ExpireDisconnectedClientTransactionsMessage.send(this.dm,
+ this.dm.getOtherDistributionManagerIds(), txIds);
+ }
+
+ /**
+ * expire the transaction states for the given client.
+ * If the timeout is non-positive we expire the states immediately
+ */
+ void scheduleToRemoveClientTransaction(TXId txId, long timeout) {
+ if (timeout <= 0) {
+ removeHostedTXState(txId);
+ } else {
+ if (scheduledToBeRemovedTx != null) {
+ scheduledToBeRemovedTx.add(txId);
+ }
+ SystemTimerTask task = new SystemTimerTask() {
+ @Override
+ public void run2() {
+ scheduleToRemoveExpiredClientTransction(txId);
+ if (scheduledToBeRemovedTx != null) {
+ scheduledToBeRemovedTx.remove(txId);
+ }
+ }
+ };
+ getCache().getCCPTimer().schedule(task, timeout);
+ }
+ }
+
+ void scheduleToRemoveExpiredClientTransction(TXId txId) {
+ synchronized (this.hostedTXStates) {
+ TXStateProxy result = hostedTXStates.get(txId);
+ if (result != null) {
+ if (((TXStateProxyImpl) result).isOverTransactionTimeoutLimit()) {
+ result.close();
+ hostedTXStates.remove(txId);
+ }
}
}
}
@@ -1799,7 +1976,7 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
return;
}
if (logger.isDebugEnabled()) {
- logger.debug("expiring the following transactions: {}", txIds);
+ logger.debug("expiring the following transactions: {}",
Arrays.toString(txIds.toArray()));
}
synchronized (this.hostedTXStates) {
Iterator<Map.Entry<TXId, TXStateProxy>> iterator =
this.hostedTXStates.entrySet().iterator();
@@ -1879,4 +2056,8 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
return hostedTXStates.isEmpty();
}
+ public Set<TXId> getScheduledToBeRemovedTx() {
+ return scheduledToBeRemovedTx;
+ }
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index 00f15c3..dc82fce 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -19,6 +19,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@@ -75,6 +76,7 @@ public class TXStateProxyImpl implements TXStateProxy {
protected InternalDistributedMember onBehalfOfClientMember = null;
private final InternalCache cache;
+ private long lastOperationTimeFromClient;
public TXStateProxyImpl(InternalCache cache, TXManagerImpl managerImpl, TXId
id,
InternalDistributedMember clientMember) {
@@ -956,4 +958,25 @@ public class TXStateProxyImpl implements TXStateProxy {
((TXState) this.realDeal).setProxyServer(proxy);
}
}
+
+ public boolean isOverTransactionTimeoutLimit() {
+ if (getCurrentTime() - getLastOperationTimeFromClient() > TimeUnit.SECONDS
+ .toMillis(txMgr.getTransactionTimeToLive())) {
+ return true;
+ }
+ return false;
+ }
+
+ long getCurrentTime() {
+ return System.currentTimeMillis();
+ }
+
+ synchronized long getLastOperationTimeFromClient() {
+ return lastOperationTimeFromClient;
+ }
+
+ public synchronized void setLastOperationTimeFromClient(long
lastOperationTimeFromClient) {
+ this.lastOperationTimeFromClient = lastOperationTimeFromClient;
+ }
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 72f4f6f..934d501 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -30,9 +30,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
-import org.apache.geode.distributed.internal.DistributionConfig;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheClientStatus;
import org.apache.geode.internal.cache.IncomingGatewayStatus;
@@ -40,7 +38,6 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
-import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -282,15 +279,12 @@ public class ClientHealthMonitor {
}
}
- private final Set<TXId> scheduledToBeRemovedTx =
- Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"trackScheduledToBeRemovedTx")
- ? new ConcurrentHashSet<TXId>() : null;
-
/**
* provide a test hook to track client transactions to be removed
*/
public Set<TXId> getScheduledToBeRemovedTx() {
- return scheduledToBeRemovedTx;
+ final TXManagerImpl txMgr = (TXManagerImpl)
this._cache.getCacheTransactionManager();
+ return txMgr.getScheduledToBeRemovedTx();
}
/**
@@ -301,33 +295,13 @@ public class ClientHealthMonitor {
*/
private void expireTXStates(ClientProxyMembershipID proxyID) {
final TXManagerImpl txMgr = (TXManagerImpl)
this._cache.getCacheTransactionManager();
- final Set<TXId> txids =
+ final Set<TXId> txIds =
txMgr.getTransactionsForClient((InternalDistributedMember)
proxyID.getDistributedMember());
if (this._cache.isClosed()) {
return;
}
- long timeout = txMgr.getTransactionTimeToLive() * 1000;
- if (!txids.isEmpty()) {
- if (logger.isDebugEnabled()) {
- logger.debug("expiring {} transaction contexts for {} timeout={}",
txids.size(), proxyID,
- timeout / 1000);
- }
-
- if (timeout <= 0) {
- txMgr.removeTransactions(txids, true);
- } else {
- if (scheduledToBeRemovedTx != null)
- scheduledToBeRemovedTx.addAll(txids);
- SystemTimerTask task = new SystemTimerTask() {
- @Override
- public void run2() {
- txMgr.removeTransactions(txids, true);
- if (scheduledToBeRemovedTx != null)
- scheduledToBeRemovedTx.removeAll(txids);
- }
- };
- this._cache.getCCPTimer().schedule(task, timeout);
- }
+ if (!txIds.isEmpty()) {
+ txMgr.expireDisconnectedClientTransactions(txIds, true);
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
index 91ba412..cf42bab 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
@@ -21,39 +21,58 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.partitioned.DestroyMessage;
import org.apache.geode.test.fake.Fakes;
public class TXManagerImplTest {
private TXManagerImpl txMgr;
- TXId txid;
- DestroyMessage msg;
- TXCommitMessage txCommitMsg;
- TXId completedTxid;
- TXId notCompletedTxid;
- InternalDistributedMember member;
- CountDownLatch latch;
- TXStateProxy tx1, tx2;
- ClusterDistributionManager dm;
- TXRemoteRollbackMessage rollbackMsg;
- TXRemoteCommitMessage commitMsg;
+ private TXId txid;
+ private DestroyMessage msg;
+ private TXCommitMessage txCommitMsg;
+ private TXId completedTxid;
+ private TXId notCompletedTxid;
+ private InternalDistributedMember member;
+ private CountDownLatch latch;
+ private TXStateProxy tx1, tx2;
+ private ClusterDistributionManager dm;
+ private TXRemoteRollbackMessage rollbackMsg;
+ private TXRemoteCommitMessage commitMsg;
+ private InternalCache cache;
+ private TXManagerImpl spyTxMgr;
+ private InternalCache spyCache;
+ private SystemTimer timer;
@Before
public void setUp() {
- InternalCache cache = Fakes.cache();
+ cache = Fakes.cache();
dm = mock(ClusterDistributionManager.class);
txMgr = new TXManagerImpl(mock(CachePerfStats.class), cache);
txid = new TXId(null, 0);
@@ -69,6 +88,13 @@ public class TXManagerImplTest {
when(this.msg.canStartRemoteTransaction()).thenReturn(true);
when(this.msg.canParticipateInTransaction()).thenReturn(true);
+ spyCache = spy(Fakes.cache());
+ InternalDistributedSystem distributedSystem =
mock(InternalDistributedSystem.class);
+ doReturn(distributedSystem).when(spyCache).getDistributedSystem();
+ when(distributedSystem.getDistributionManager()).thenReturn(dm);
+ spyTxMgr = spy(new TXManagerImpl(mock(CachePerfStats.class), spyCache));
+ timer = mock(SystemTimer.class);
+ doReturn(timer).when(spyCache).getCCPTimer();
}
@Test
@@ -349,7 +375,7 @@ public class TXManagerImplTest {
}
@Test
- public void txStateCleanedupIfRemovedFromHostedTxStatesMap() {
+ public void txStateCleanedUpIfRemovedFromHostedTxStatesMap() {
tx1 = txMgr.getOrSetHostedTXState(txid, msg);
TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1;
assertNotNull(txStateProxy);
@@ -361,4 +387,140 @@ public class TXManagerImplTest {
txMgr.unmasquerade(tx1);
assertTrue(txStateProxy.getLocalRealDeal().isClosed());
}
+
+ @Test
+ public void
clientTransactionWithIdleTimeLongerThanTransactionTimeoutIsRemoved()
+ throws Exception {
+
when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+ TXStateProxyImpl tx = spy((TXStateProxyImpl)
txMgr.getOrSetHostedTXState(txid, msg));
+ doReturn(true).when(tx).isOverTransactionTimeoutLimit();
+
+ txMgr.scheduleToRemoveExpiredClientTransction(txid);
+
+ assertTrue(txMgr.isHostedTXStatesEmpty());
+ }
+
+ @Test
+ public void
processExpireDisconnectedClientTransactionsMessageWillExpireDisconnectedClientTransactions()
{
+ TXManagerImpl.ExpireDisconnectedClientTransactionsMessage message =
+ new TXManagerImpl.ExpireDisconnectedClientTransactionsMessage();
+
+ InternalCache cache = mock(InternalCache.class);
+ TXManagerImpl txManager = mock(TXManagerImpl.class);
+ when(dm.getCache()).thenReturn(cache);
+ when(cache.getTXMgr()).thenReturn(txManager);
+
+ message.process(dm);
+
+ verify(txManager, times(1)).expireDisconnectedClientTransactions(any(),
eq(false));
+ }
+
+ @Test
+ public void
clientTransactionsToBeRemovedAndDistributedAreSentToRemoveServerIfWithNoTimeout()
{
+ Set<TXId> txIds = (Set<TXId>) mock(Set.class);
+ doReturn(0).when(spyTxMgr).getTransactionTimeToLive();
+ when(txIds.iterator()).thenAnswer(new Answer<Iterator<TXId>>() {
+ @Override
+ public Iterator<TXId> answer(InvocationOnMock invocation) throws
Throwable {
+ return Arrays.asList(txid, mock(TXId.class)).iterator();
+ }
+ });
+
+ spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
+
+ verify(spyTxMgr,
times(1)).removeClientTransactionsOnRemoteServer(eq(txIds));
+ verify(spyTxMgr,
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
+ }
+
+ @Test
+ public void
clientTransactionsToBeExpiredAreRemovedAndNotDistributedIfWithNoTimeout() {
+ doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
+ TXId txId1 = mock(TXId.class);
+ TXId txId2 = mock(TXId.class);
+ TXId txId3 = mock(TXId.class);
+ tx1 = spyTxMgr.getOrSetHostedTXState(txId1, msg);
+ tx2 = spyTxMgr.getOrSetHostedTXState(txId2, msg);
+ Set<TXId> txIds = spy(new HashSet<>());
+ txIds.add(txId1);
+ doReturn(0).when(spyTxMgr).getTransactionTimeToLive();
+ when(txIds.iterator()).thenAnswer(new Answer<Iterator<TXId>>() {
+ @Override
+ public Iterator<TXId> answer(InvocationOnMock invocation) throws
Throwable {
+ return Arrays.asList(txId1, txId3).iterator();
+ }
+ });
+ assertEquals(2, spyTxMgr.getHostedTXStates().size());
+
+ spyTxMgr.expireDisconnectedClientTransactions(txIds, false);
+
+ verify(spyTxMgr,
never()).removeClientTransactionsOnRemoteServer(eq(txIds));
+ verify(spyTxMgr,
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(false));
+ verify(spyTxMgr, times(1)).removeHostedTXState(eq(txIds));
+ verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId1));
+ verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId3));
+ assertEquals(tx2, spyTxMgr.getHostedTXStates().get(txId2));
+ assertEquals(1, spyTxMgr.getHostedTXStates().size());
+ }
+
+ @Test
+ public void
clientTransactionsToBeRemovedAndDistributedAreScheduledToSentToRemoveServerIfWithTimeout()
{
+ Set<TXId> txIds = mock(Set.class);
+ doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
+
+ spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
+
+ verify(timer, times(1)).schedule(any(), eq(1000L));
+ verify(spyTxMgr,
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
+ }
+
+ @Test
+ public void
clientTransactionsToBeExpiredAndDistributedAreSentToRemoveServer() {
+ Set<TXId> txIds = mock(Set.class);
+
+ spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, true);
+
+ verify(spyTxMgr,
times(1)).expireClientTransactionsOnRemoteServer(eq(txIds));
+ }
+
+ @Test
+ public void clientTransactionsNotToBeDistributedAreNotSentToRemoveServer() {
+ Set<TXId> txIds = mock(Set.class);
+
+ spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, false);
+
+ verify(spyTxMgr,
never()).expireClientTransactionsOnRemoteServer(eq(txIds));
+ }
+
+ @Test
+ public void clientTransactionsToBeExpiredIsScheduledToBeRemoved() {
+ doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
+ TXId txId1 = mock(TXId.class);
+ TXId txId2 = mock(TXId.class);
+ TXId txId3 = mock(TXId.class);
+ tx1 = spyTxMgr.getOrSetHostedTXState(txId1, msg);
+ tx2 = spyTxMgr.getOrSetHostedTXState(txId2, msg);
+ Set<TXId> set = new HashSet<>();
+ set.add(txId1);
+ set.add(txId2);
+
+ spyTxMgr.scheduleToExpireDisconnectedClientTransactions(set, false);
+
+ verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId1),
eq(1100L));
+ verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId2),
eq(1100L));
+ verify(spyTxMgr, never()).scheduleToRemoveClientTransaction(eq(txId3),
eq(1100L));
+ }
+
+ @Test
+ public void clientTransactionIsRemovedIfWithNoTimeout() {
+ spyTxMgr.scheduleToRemoveClientTransaction(txid, 0);
+
+ verify(spyTxMgr, times(1)).removeHostedTXState(eq(txid));
+ }
+
+ @Test
+ public void clientTransactionIsScheduledToBeReIfWithNoTimeout() {
+ spyTxMgr.scheduleToRemoveClientTransaction(txid, 1000);
+
+ verify(timer, times(1)).schedule(any(), eq(1000L));
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
index 1113246..547f5ce 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
@@ -15,7 +15,9 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.junit.Before;
@@ -77,4 +79,24 @@ public class TXStateProxyImplTest {
TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false);
assertThat(tx.getCache()).isSameAs(cache);
}
+
+ @Test
+ public void
isOverTransactionTimeoutLimitReturnsTrueIfHavingRecentOperation() {
+ TXStateProxyImpl tx = spy(new TXStateProxyImpl(cache, txManager, txId,
false));
+ doReturn(0L).when(tx).getLastOperationTimeFromClient();
+ doReturn(1001L).when(tx).getCurrentTime();
+ when(txManager.getTransactionTimeToLive()).thenReturn(1);
+
+ assertThat(tx.isOverTransactionTimeoutLimit()).isEqualTo(true);
+ }
+
+ @Test
+ public void
isOverTransactionTimeoutLimitReturnsFalseIfNotHavingRecentOperation() {
+ TXStateProxyImpl tx = spy(new TXStateProxyImpl(cache, txManager, txId,
false));
+ doReturn(0L).when(tx).getLastOperationTimeFromClient();
+ doReturn(1000L).when(tx).getCurrentTime();
+ when(txManager.getTransactionTimeToLive()).thenReturn(1);
+
+ assertThat(tx.isOverTransactionTimeoutLimit()).isEqualTo(false);
+ }
}