This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5401
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5401 by this 
push:
     new 40bcdbb  Fix review comments. Use 
ExpireDisconnectedClientTransactionsMessage instead of 
TXManagerImpl.TXRemovalMessage when expire client transactions. Only send new 
message to Geode 1.7.0 and later servers - assuming old version servers will be 
rolled soon. Handle differently when server receives this new message based on 
sender version.
40bcdbb is described below

commit 40bcdbb35f50b4ffc5b8d1e51b97418ca40f6ce8
Author: eshu <e...@pivotal.io>
AuthorDate: Thu Jul 26 16:34:09 2018 -0700

    Fix review comments.
    Use ExpireDisconnectedClientTransactionsMessage instead of 
TXManagerImpl.TXRemovalMessage when expire client transactions.
    Only send new message to Geode 1.7.0 and later servers - assuming old 
version servers will be rolled soon.
    Handle differently when server receives this new message based on sender 
version.
---
 ...ntServerTransactionFailoverDistributedTest.java |  18 +--
 ...overWithMixedVersionServersDistributedTest.java |  33 +----
 .../org/apache/geode/internal/DSFIDFactory.java    |   6 +-
 .../geode/internal/DataSerializableFixedID.java    |   4 +-
 ...xpireDisconnectedClientTransactionsMessage.java |  85 +++++++++++++
 .../apache/geode/internal/cache/TXManagerImpl.java | 140 +--------------------
 ...eDisconnectedClientTransactionsMessageTest.java |  67 ++++++++++
 .../geode/internal/cache/TXManagerImplTest.java    |  42 ++-----
 8 files changed, 185 insertions(+), 210 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
index 1d6f413..b8908ea 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
@@ -98,7 +98,7 @@ public class ClientServerTransactionFailoverDistributedTest 
implements Serializa
 
     int numOfOpertions = 5;
     createClientRegion(true, port2, port1);
-    TransactionId txId = suspendTransaction(numOfOpertions);
+    TransactionId txId = beginAndSuspendTransaction(numOfOpertions);
     server2.invoke(() -> {
       cacheRule.getCache().close();
     });
@@ -162,7 +162,7 @@ public class ClientServerTransactionFailoverDistributedTest 
implements Serializa
     return (PoolImpl) factory.create(uniqueName);
   }
 
-  private TransactionId suspendTransaction(int numOfOperations) {
+  private TransactionId beginAndSuspendTransaction(int numOfOperations) {
     Region region = clientCacheRule.getClientCache().getRegion(regionName);
     TXManagerImpl txManager =
         (TXManagerImpl) 
clientCacheRule.getClientCache().getCacheTransactionManager();
@@ -237,8 +237,9 @@ public class ClientServerTransactionFailoverDistributedTest 
implements Serializa
     FutureTask<TransactionId>[] futureTasks = new 
FutureTask[numOfTransactions];
     TransactionId[] txIds = new TransactionId[numOfTransactions];
 
-    // suspend transactions
-    suspendTransactions(numOfTransactions, numOfOperationsInTransaction, 
threads, futureTasks,
+    // begin and suspend transactions
+    beginAndSuspendTransactions(numOfTransactions, 
numOfOperationsInTransaction, threads,
+        futureTasks,
         txIds);
 
     // unregister client on 2 of the servers
@@ -264,8 +265,8 @@ public class ClientServerTransactionFailoverDistributedTest 
implements Serializa
     FutureTask<TransactionId>[] futureTasks = new 
FutureTask[numOfTransactions];
     TransactionId[] txIds = new TransactionId[numOfTransactions];
 
-    // suspend transactions
-    suspendTransactions(numOfTransactions, numOfOperations, threads, 
futureTasks, txIds);
+    // begin and suspend transactions
+    beginAndSuspendTransactions(numOfTransactions, numOfOperations, threads, 
futureTasks, txIds);
 
     // unregister client multiple times
     ClientProxyMembershipID clientProxyMembershipID = getClientId();
@@ -284,12 +285,13 @@ public class 
ClientServerTransactionFailoverDistributedTest implements Serializa
     }
   }
 
-  private void suspendTransactions(int numOfTransactions, int numOfOperations, 
Thread[] threads,
+  private void beginAndSuspendTransactions(int numOfTransactions, int 
numOfOperations,
+      Thread[] threads,
       FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
       throws InterruptedException, java.util.concurrent.ExecutionException {
     for (int i = 0; i < numOfTransactions; i++) {
       FutureTask<TransactionId> futureTask =
-          new FutureTask<>(() -> suspendTransaction(numOfOperations));
+          new FutureTask<>(() -> beginAndSuspendTransaction(numOfOperations));
       futureTasks[i] = futureTask;
       Thread thread = new Thread(futureTask);
       threads[i] = thread;
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
index 90e378c..6a96176 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
@@ -137,8 +137,8 @@ public class 
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
     Thread[] threads = new Thread[numOfTransactions];
     FutureTask<TransactionId>[] futureTasks = new 
FutureTask[numOfTransactions];
     TransactionId[] txIds = new TransactionId[numOfTransactions];
-    // suspend transactions
-    suspendTransactions(numOfTransactions, numOfOperations, threads, 
futureTasks, txIds);
+    // begin and suspend transactions
+    beginAndSuspendTransactions(numOfTransactions, numOfOperations, threads, 
futureTasks, txIds);
     // resume transactions
     resumeTransactions(numOfTransactions, numOfOperations, threads, txIds);
     waitForResumeTransactionsToComplete(numOfTransactions, threads);
@@ -235,12 +235,13 @@ public class 
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
     crf.create(regionName);
   }
 
-  private void suspendTransactions(int numOfTransactions, int numOfOperations, 
Thread[] threads,
+  private void beginAndSuspendTransactions(int numOfTransactions, int 
numOfOperations,
+      Thread[] threads,
       FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
       throws InterruptedException, java.util.concurrent.ExecutionException {
     for (int i = 0; i < numOfTransactions; i++) {
       FutureTask<TransactionId> futureTask =
-          new FutureTask<>(() -> suspendTransaction(numOfOperations));
+          new FutureTask<>(() -> beginAndSuspendTransaction(numOfOperations));
       futureTasks[i] = futureTask;
       Thread thread = new Thread(futureTask);
       threads[i] = thread;
@@ -252,7 +253,7 @@ public class 
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
     }
   }
 
-  private TransactionId suspendTransaction(int numOfOperations) {
+  private TransactionId beginAndSuspendTransaction(int numOfOperations) {
     Region region = clientCacheRule.getClientCache().getRegion(regionName);
     TXManagerImpl txManager =
         (TXManagerImpl) 
clientCacheRule.getClientCache().getCacheTransactionManager();
@@ -418,26 +419,4 @@ public class 
ClientServerTransactionFailoverWithMixedVersionServersDistributedTe
         .until(() -> 
assertThat(txManager.hostedTransactionsInProgressForTest()).isEqualTo(0));
   }
 
-  @Test
-  public void clientTransactionExpiredAreRemovedOnNotYetRolledServer() throws 
Exception {
-    setupPartiallyRolledVersion();
-
-    server1.invoke(() -> createServerRegion(1, true));
-    server2.invoke(() -> createServerRegion(1, true));
-    server3.invoke(() -> createServerRegion(1, true));
-    server4.invoke(() -> createServerRegion(1, false));
-    client.invoke(() -> createClientRegion());
-
-    ClientProxyMembershipID clientProxyMembershipID = client.invoke(() -> 
getClientId());
-
-    int numOfTransactions = 12;
-    int numOfOperations = 1;
-    client.invokeAsync(() -> doUnfinishedTransactions(numOfTransactions, 
numOfOperations));
-
-    server4.invoke(() -> verifyTransactionAreStarted(numOfTransactions));
-
-    unregisterClientMultipleTimes(clientProxyMembershipID);
-
-    server4.invoke(() -> verifyTransactionAreExpired(numOfTransactions));
-  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java 
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index a6ee477..96313b8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -215,6 +215,7 @@ import 
org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllMe
 import 
org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EventID;
+import 
org.apache.geode.internal.cache.ExpireDisconnectedClientTransactionsMessage;
 import org.apache.geode.internal.cache.FilterProfile;
 import 
org.apache.geode.internal.cache.FindDurableQueueProcessor.FindDurableQueueMessage;
 import 
org.apache.geode.internal.cache.FindDurableQueueProcessor.FindDurableQueueReply;
@@ -256,7 +257,6 @@ import 
org.apache.geode.internal.cache.TXCommitMessage.CommitProcessQueryMessage
 import 
org.apache.geode.internal.cache.TXCommitMessage.CommitProcessQueryReplyMessage;
 import org.apache.geode.internal.cache.TXEntryState;
 import org.apache.geode.internal.cache.TXId;
-import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXRemoteCommitMessage;
 import 
org.apache.geode.internal.cache.TXRemoteCommitMessage.TXRemoteCommitReplyMessage;
 import org.apache.geode.internal.cache.TXRemoteRollbackMessage;
@@ -699,7 +699,7 @@ public class DSFIDFactory implements 
DataSerializableFixedID {
     registerDSFID(PR_BECOME_PRIMARY_BUCKET_MESSAGE, 
BecomePrimaryBucketMessage.class);
     registerDSFID(PR_BECOME_PRIMARY_BUCKET_REPLY, 
BecomePrimaryBucketReplyMessage.class);
     registerDSFID(PR_REMOVE_BUCKET_MESSAGE, RemoveBucketMessage.class);
-    registerDSFID(TX_MANAGER_REMOVE_TRANSACTIONS, 
TXManagerImpl.TXRemovalMessage.class);
+    registerDSFID(EXPIRE_CLIENT_TRANSACTIONS, 
ExpireDisconnectedClientTransactionsMessage.class);
     registerDSFID(PR_REMOVE_BUCKET_REPLY, RemoveBucketReplyMessage.class);
     registerDSFID(PR_MOVE_BUCKET_MESSAGE, MoveBucketMessage.class);
     registerDSFID(PR_MOVE_BUCKET_REPLY, MoveBucketReplyMessage.class);
@@ -937,8 +937,6 @@ public class DSFIDFactory implements 
DataSerializableFixedID {
     registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
         
GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
     registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
-    registerDSFID(EXPIRE_CLIENT_TRANSACTIONS,
-        TXManagerImpl.ExpireDisconnectedClientTransactionsMessage.class);
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
 
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index c2a40d3..f1f6be1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -569,7 +569,8 @@ public interface DataSerializableFixedID extends 
SerializationVersions {
   short PR_REMOVE_BUCKET_REPLY = 135;
   short PR_MOVE_BUCKET_MESSAGE = 136;
   short PR_MOVE_BUCKET_REPLY = 137;
-  short TX_MANAGER_REMOVE_TRANSACTIONS = 138;
+  // Geode-5401, message changed from remove transaction to expire 
transactions.
+  short EXPIRE_CLIENT_TRANSACTIONS = 138;
 
   short REGION_VERSION_VECTOR = 139;
 
@@ -823,7 +824,6 @@ public interface DataSerializableFixedID extends 
SerializationVersions {
   short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181;
   short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
   short ABORT_BACKUP_REQUEST = 2183;
-  short EXPIRE_CLIENT_TRANSACTIONS = 2184;
 
   // NOTE, codes > 65535 will take 4 bytes to serialize
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java
new file mode 100644
index 0000000..ece29e1
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
+
+public class ExpireDisconnectedClientTransactionsMessage
+    extends HighPriorityDistributionMessage {
+  Set<TXId> txIds;
+
+  /** for deserialization */
+  public ExpireDisconnectedClientTransactionsMessage() {}
+
+  // only send to geode 1.7.0 and later servers
+  // for prior geode 1.7.0 servers, message won't be sent
+  // assuming these servers will be rolled to new version soon.
+  static void send(DistributionManager dm, Set<InternalDistributedMember> 
recipients,
+      Set<TXId> txIds) {
+    ExpireDisconnectedClientTransactionsMessage msg =
+        new ExpireDisconnectedClientTransactionsMessage();
+    msg.txIds = txIds;
+    Set newVersionRecipients = new HashSet();
+    for (InternalDistributedMember recipient : recipients) {
+      // to geode 1.7.0 and later version servers
+      if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
+        newVersionRecipients.add(recipient);
+      }
+    }
+    msg.setRecipients(newVersionRecipients);
+    dm.putOutgoing(msg);
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    this.txIds = DataSerializer.readHashSet(in);
+  }
+
+  public int getDSFID() {
+    return EXPIRE_CLIENT_TRANSACTIONS;
+  }
+
+  @Override
+  protected void process(ClusterDistributionManager dm) {
+    InternalCache cache = dm.getCache();
+    InternalDistributedMember sender = getSender();
+    if (cache != null) {
+      TXManagerImpl mgr = cache.getTXMgr();
+      if (sender.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
+        // schedule to expire disconnected client transaction.
+        mgr.expireDisconnectedClientTransactions(this.txIds, false);
+      } else {
+        // check if transaction has been updated before remove it
+        mgr.removeExpiredClientTransactions(this.txIds);
+      }
+    }
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 18cf5ef..9f6b87b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -14,9 +14,6 @@
  */
 package org.apache.geode.internal.cache;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -39,10 +36,10 @@ import java.util.concurrent.locks.LockSupport;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.DataSerializer;
 import org.apache.geode.GemFireException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.TestingOnly;
 import org.apache.geode.cache.CacheTransactionManager;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
@@ -52,15 +49,12 @@ import org.apache.geode.cache.TransactionListener;
 import org.apache.geode.cache.TransactionWriter;
 import org.apache.geode.cache.UnsupportedOperationInTransactionException;
 import org.apache.geode.distributed.TXManagerCancelledException;
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.MembershipListener;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.concurrent.ConcurrentHashSet;
@@ -120,6 +114,7 @@ public class TXManagerImpl implements 
CacheTransactionManager, MembershipListene
 
   private final Map<TXId, TXStateProxy> hostedTXStates;
 
+  // Used for testing only.
   private final Set<TXId> scheduledToBeRemovedTx =
       Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + 
"trackScheduledToBeRemovedTx")
           ? new ConcurrentHashSet<TXId>() : null;
@@ -1196,11 +1191,12 @@ public class TXManagerImpl implements 
CacheTransactionManager, MembershipListene
     synchronized (this.hostedTXStates) {
       for (TXId txId : txIds) {
         // only expire client transaction if no activity for the given 
transactionTimeToLive
-        scheduleToRemoveExpiredClientTransction(txId);
+        scheduleToRemoveExpiredClientTransaction(txId);
       }
     }
   }
 
+  @TestingOnly
   /** remove the given TXStates for test */
   public void removeTransactions(Set<TXId> txIds, boolean distribute) {
     synchronized (this.hostedTXStates) {
@@ -1334,132 +1330,8 @@ public class TXManagerImpl implements 
CacheTransactionManager, MembershipListene
     throw new InternalGemFireError("the parameter TXCommitMessage is not an 
exception token");
   }
 
-  public static class TXRemovalMessage extends HighPriorityDistributionMessage 
{
-
-    Set<TXId> txIds;
-
-    /** for deserialization */
-    public TXRemovalMessage() {}
-
-    static void send(DistributionManager dm, Set<InternalDistributedMember> 
recipients,
-        Set<TXId> txIds) {
-      TXRemovalMessage msg = new TXRemovalMessage();
-      msg.txIds = txIds;
-      // only send to servers with version earlier than geode 1.7.0
-      // newer version use ExpireDisconnectedClientTransactionsMessage
-      Set oldVersionRecipients = new HashSet();
-      for (InternalDistributedMember recipient : recipients) {
-        if (recipient.getVersionObject().compareTo(Version.GEODE_170) < 0) {
-          oldVersionRecipients.add(recipient);
-        }
-      }
-      msg.setRecipients(oldVersionRecipients);
-      dm.putOutgoing(msg);
-    }
-
-    @Override
-    public void toData(DataOutput out) throws IOException {
-      DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
-    }
-
-    @Override
-    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
-      this.txIds = DataSerializer.readHashSet(in);
-    }
-
-    public int getDSFID() {
-      return TX_MANAGER_REMOVE_TRANSACTIONS;
-    }
-
-    @Override
-    protected void process(ClusterDistributionManager dm) {
-      InternalCache cache = dm.getCache();
-      if (cache != null) {
-        TXManagerImpl mgr = cache.getTXMgr();
-        // check if transaction has been updated before remove it
-        mgr.removeExpiredClientTransactions(this.txIds);
-      }
-    }
-  }
-
-  public static class ExpireDisconnectedClientTransactionsMessage
-      extends HighPriorityDistributionMessage {
-    Set<TXId> txIds;
-
-    /** for deserialization */
-    public ExpireDisconnectedClientTransactionsMessage() {}
-
-    // only send to geode 1.7.0 and later servers
-    static void send(DistributionManager dm, Set<InternalDistributedMember> 
recipients,
-        Set<TXId> txIds) {
-      ExpireDisconnectedClientTransactionsMessage msg =
-          new ExpireDisconnectedClientTransactionsMessage();
-      msg.txIds = txIds;
-      Set newVersionRecipients = new HashSet();
-      for (InternalDistributedMember recipient : recipients) {
-        // to geode 1.7.0 and later version servers
-        if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
-          newVersionRecipients.add(recipient);
-        }
-      }
-      msg.setRecipients(newVersionRecipients);
-      dm.putOutgoing(msg);
-    }
-
-    @Override
-    public void toData(DataOutput out) throws IOException {
-      DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
-    }
-
-    @Override
-    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
-      this.txIds = DataSerializer.readHashSet(in);
-    }
-
-    public int getDSFID() {
-      return EXPIRE_CLIENT_TRANSACTIONS;
-    }
-
-    @Override
-    protected void process(ClusterDistributionManager dm) {
-      InternalCache cache = dm.getCache();
-      if (cache != null) {
-        TXManagerImpl mgr = cache.getTXMgr();
-        mgr.expireDisconnectedClientTransactions(this.txIds, false);
-      }
-    }
-  }
-
   /** timer task for expiring the given TXStates */
   public void expireDisconnectedClientTransactions(Set<TXId> txIds, boolean 
distribute) {
-    long timeout = TimeUnit.SECONDS.toMillis(getTransactionTimeToLive());
-    if (distribute) {
-      if (timeout <= 0) {
-        removeClientTransactionsOnRemoteServer(txIds);
-      } else {
-        if (logger.isDebugEnabled()) {
-          logger.debug("expiring the following transactions: {}", 
Arrays.toString(txIds.toArray()));
-        }
-        // schedule to send remove message to server with version earlier than 
geode 1.7.0
-        SystemTimerTask task = new SystemTimerTask() {
-          @Override
-          public void run2() {
-            removeClientTransactionsOnRemoteServer(txIds);
-          }
-        };
-        getCache().getCCPTimer().schedule(task, timeout);
-      }
-    }
-    // schedule to expire client transactions on server with version geode 
1.7.0 and after.
-    scheduleToExpireDisconnectedClientTransactions(txIds, distribute);
-  }
-
-  void removeClientTransactionsOnRemoteServer(Set<TXId> txIds) {
-    TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(), 
txIds);
-  }
-
-  /** timer task for expiring the given TXStates */
-  public void scheduleToExpireDisconnectedClientTransactions(Set<TXId> txIds, 
boolean distribute) {
     // increase the client transaction timeout setting to avoid a late 
in-flight client operation
     // preventing the expiration of the client transaction.
     long timeout = (long) 
(TimeUnit.SECONDS.toMillis(getTransactionTimeToLive()) * 1.1);
@@ -1500,7 +1372,7 @@ public class TXManagerImpl implements 
CacheTransactionManager, MembershipListene
       SystemTimerTask task = new SystemTimerTask() {
         @Override
         public void run2() {
-          scheduleToRemoveExpiredClientTransction(txId);
+          scheduleToRemoveExpiredClientTransaction(txId);
           if (scheduledToBeRemovedTx != null) {
             scheduledToBeRemovedTx.remove(txId);
           }
@@ -1510,7 +1382,7 @@ public class TXManagerImpl implements 
CacheTransactionManager, MembershipListene
     }
   }
 
-  void scheduleToRemoveExpiredClientTransction(TXId txId) {
+  void scheduleToRemoveExpiredClientTransaction(TXId txId) {
     synchronized (this.hostedTXStates) {
       TXStateProxy result = hostedTXStates.get(txId);
       if (result != null) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java
new file mode 100644
index 0000000..3c32620
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ExpireDisconnectedClientTransactionsMessageTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
+
+public class ExpireDisconnectedClientTransactionsMessageTest {
+  private final ClusterDistributionManager dm = 
mock(ClusterDistributionManager.class);
+  private final InternalCache cache = mock(InternalCache.class);
+  private final TXManagerImpl txManager = mock(TXManagerImpl.class);
+  private final InternalDistributedMember sender = 
mock(InternalDistributedMember.class);
+  private final ExpireDisconnectedClientTransactionsMessage message =
+      spy(new ExpireDisconnectedClientTransactionsMessage());
+  private Version version = mock(Version.class);
+
+  @Before
+  public void setup() {
+    when(dm.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txManager);
+    doReturn(sender).when(message).getSender();
+    when(sender.getVersionObject()).thenReturn(version);
+  }
+
+  @Test
+  public void 
processMessageFromServerOfGeode170AndLaterVersionWillExpireDisconnectedClientTransactions()
 {
+    when(version.compareTo(Version.GEODE_170)).thenReturn(1);
+
+    message.process(dm);
+
+    verify(txManager, times(1)).expireDisconnectedClientTransactions(any(), 
eq(false));
+  }
+
+  @Test
+  public void 
processMessageFromServerOfPriorGeode170VersionWillRemoveExpiredClientTransactions()
 {
+    when(version.compareTo(Version.GEODE_170)).thenReturn(-1);
+
+    message.process(dm);
+
+    verify(txManager, times(1)).removeExpiredClientTransactions(any());
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
index cf42bab..77c67e5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
@@ -395,27 +395,12 @@ public class TXManagerImplTest {
     TXStateProxyImpl tx = spy((TXStateProxyImpl) 
txMgr.getOrSetHostedTXState(txid, msg));
     doReturn(true).when(tx).isOverTransactionTimeoutLimit();
 
-    txMgr.scheduleToRemoveExpiredClientTransction(txid);
+    txMgr.scheduleToRemoveExpiredClientTransaction(txid);
 
     assertTrue(txMgr.isHostedTXStatesEmpty());
   }
 
   @Test
-  public void 
processExpireDisconnectedClientTransactionsMessageWillExpireDisconnectedClientTransactions()
 {
-    TXManagerImpl.ExpireDisconnectedClientTransactionsMessage message =
-        new TXManagerImpl.ExpireDisconnectedClientTransactionsMessage();
-
-    InternalCache cache = mock(InternalCache.class);
-    TXManagerImpl txManager = mock(TXManagerImpl.class);
-    when(dm.getCache()).thenReturn(cache);
-    when(cache.getTXMgr()).thenReturn(txManager);
-
-    message.process(dm);
-
-    verify(txManager, times(1)).expireDisconnectedClientTransactions(any(), 
eq(false));
-  }
-
-  @Test
   public void 
clientTransactionsToBeRemovedAndDistributedAreSentToRemoveServerIfWithNoTimeout()
 {
     Set<TXId> txIds = (Set<TXId>) mock(Set.class);
     doReturn(0).when(spyTxMgr).getTransactionTimeToLive();
@@ -428,8 +413,7 @@ public class TXManagerImplTest {
 
     spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
 
-    verify(spyTxMgr, 
times(1)).removeClientTransactionsOnRemoteServer(eq(txIds));
-    verify(spyTxMgr, 
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
+    verify(spyTxMgr, 
times(1)).expireClientTransactionsOnRemoteServer(eq(txIds));
   }
 
   @Test
@@ -453,8 +437,7 @@ public class TXManagerImplTest {
 
     spyTxMgr.expireDisconnectedClientTransactions(txIds, false);
 
-    verify(spyTxMgr, 
never()).removeClientTransactionsOnRemoteServer(eq(txIds));
-    verify(spyTxMgr, 
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(false));
+    verify(spyTxMgr, 
never()).expireClientTransactionsOnRemoteServer(eq(txIds));
     verify(spyTxMgr, times(1)).removeHostedTXState(eq(txIds));
     verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId1));
     verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId3));
@@ -463,21 +446,10 @@ public class TXManagerImplTest {
   }
 
   @Test
-  public void 
clientTransactionsToBeRemovedAndDistributedAreScheduledToSentToRemoveServerIfWithTimeout()
 {
-    Set<TXId> txIds = mock(Set.class);
-    doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
-
-    spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
-
-    verify(timer, times(1)).schedule(any(), eq(1000L));
-    verify(spyTxMgr, 
times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
-  }
-
-  @Test
   public void 
clientTransactionsToBeExpiredAndDistributedAreSentToRemoveServer() {
     Set<TXId> txIds = mock(Set.class);
 
-    spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, true);
+    spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
 
     verify(spyTxMgr, 
times(1)).expireClientTransactionsOnRemoteServer(eq(txIds));
   }
@@ -486,7 +458,7 @@ public class TXManagerImplTest {
   public void clientTransactionsNotToBeDistributedAreNotSentToRemoveServer() {
     Set<TXId> txIds = mock(Set.class);
 
-    spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, false);
+    spyTxMgr.expireDisconnectedClientTransactions(txIds, false);
 
     verify(spyTxMgr, 
never()).expireClientTransactionsOnRemoteServer(eq(txIds));
   }
@@ -503,7 +475,7 @@ public class TXManagerImplTest {
     set.add(txId1);
     set.add(txId2);
 
-    spyTxMgr.scheduleToExpireDisconnectedClientTransactions(set, false);
+    spyTxMgr.expireDisconnectedClientTransactions(set, false);
 
     verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId1), 
eq(1100L));
     verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId2), 
eq(1100L));
@@ -518,7 +490,7 @@ public class TXManagerImplTest {
   }
 
   @Test
-  public void clientTransactionIsScheduledToBeReIfWithNoTimeout() {
+  public void clientTransactionIsScheduledToBeRemovedIfWithTimeout() {
     spyTxMgr.scheduleToRemoveClientTransaction(txid, 1000);
 
     verify(timer, times(1)).schedule(any(), eq(1000L));

Reply via email to