This is an automated email from the ASF dual-hosted git repository.

ringles pushed a commit to branch GEODE-9375-Implement-ZRANGE-Radish-command
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9aa758f0c24a7bcaa0c4b2b76c64b6efbb2ddffc
Author: Alberto Gomez <alberto.go...@est.tech>
AuthorDate: Wed Jun 23 08:44:39 2021 +0200

    Revert "GEODE-8971: Add grace period when stopping gateway sender with 
group-… (#6052)" (#6634)
    
    This reverts commit 841fa06c7b34916c09df920b7029974a78255cd0.
    
    The reason is that this commit creates problems with ongoing
    put operations during the grace period (very long response
    times) and also, if operations rate is high, it could prevent
    that the gateway sender is stopped.
    
    Put operations can be very slow during the grace period
    because they need to traverse the gateway sender queue
    to find events with the same transactionId. If the queue
    size is big and events are evicted to disk, the time to
    process a request can be unacceptably long.
    At the same time, this can provoke that the gateway sender
    is never stopped because it is blocked trying to get
    a write lock and the lock is held by ongoing operations
    for a long time.
---
 .../sortedset/ZRangeNativeRedisAcceptanceTest.java |  38 ++--
 .../sortedset/AbstractZRangeIntegrationTest.java   |  70 +++++++
 .../executor/sortedset/ZRangeIntegrationTest.java} |  22 +--
 .../geode/redis/internal/RedisCommandType.java     |   3 +
 .../internal/collections/IndexibleTreeSet.java     |  11 ++
 .../internal/collections/OrderStatisticsSet.java   |  10 +
 .../internal/collections/OrderStatisticsTree.java  |  69 ++++++-
 .../redis/internal/data/NullRedisSortedSet.java    |   5 +
 .../geode/redis/internal/data/RedisSortedSet.java  |  48 ++++-
 .../RedisSortedSetCommandsFunctionExecutor.java    |   6 +
 .../executor/sortedset/RedisSortedSetCommands.java |   2 +
 .../executor/sortedset/ZRangeExecutor.java         |  49 +++++
 .../collections/OrderStatisticsTreeTest.java       |  22 +++
 .../redis/internal/data/RedisSortedSetTest.java    |  77 ++++++++
 .../org/apache/geode/cache/wan/GatewaySender.java  |  24 +--
 .../geode/internal/cache/BucketRegionQueue.java    |  41 +---
 .../internal/cache/wan/AbstractGatewaySender.java  |  57 ++----
 .../wan/AbstractGatewaySenderEventProcessor.java   |  25 +--
 .../internal/cache/wan/GatewaySenderEventImpl.java |   5 +-
 ...currentParallelGatewaySenderEventProcessor.java |  23 +--
 .../ParallelGatewaySenderEventProcessor.java       |  22 +--
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 115 ++++-------
 ...oncurrentSerialGatewaySenderEventProcessor.java |  34 ++--
 .../serial/SerialGatewaySenderEventProcessor.java  |  18 +-
 .../cache/wan/serial/SerialGatewaySenderQueue.java |  44 +----
 .../internal/cache/BucketRegionQueueJUnitTest.java |   5 +-
 .../geode/internal/cache/wan/WANTestBase.java      |  17 --
 .../wan/parallel/ParallelWANStatsDUnitTest.java    | 199 +------------------
 .../cache/wan/serial/SerialWANStatsDUnitTest.java  | 215 +--------------------
 .../parallel/ParallelGatewaySenderImpl.java        |   2 -
 .../internal/serial/SerialGatewaySenderImpl.java   |   2 -
 .../parallel/ParallelGatewaySenderImplTest.java    |  43 -----
 .../serial/SerialGatewaySenderImplTest.java        |  42 ----
 33 files changed, 523 insertions(+), 842 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewayQueueEvent.java
 
b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeNativeRedisAcceptanceTest.java
old mode 100644
new mode 100755
similarity index 52%
rename from 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewayQueueEvent.java
rename to 
geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeNativeRedisAcceptanceTest.java
index 79ad857..6f663eb
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewayQueueEvent.java
+++ 
b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeNativeRedisAcceptanceTest.java
@@ -12,27 +12,25 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache.wan;
+package org.apache.geode.redis.internal.executor.sortedset;
 
-import org.apache.geode.cache.TransactionId;
-import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.junit.ClassRule;
 
-/**
- * Represents <code>Cache</code> events going through 
<code>GatewaySender</code>s.
- *
- *
- * @since Geode 1.15
- */
-public interface InternalGatewayQueueEvent extends GatewayQueueEvent {
-  /**
-   * @return the transactionId to which the event belongs or null if the event 
does not belong
-   *         to a transaction.
-   */
-  TransactionId getTransactionId();
+import org.apache.geode.redis.NativeRedisClusterTestRule;
+
+public class ZRangeNativeRedisAcceptanceTest extends 
AbstractZRangeIntegrationTest {
+
+  @ClassRule
+  public static NativeRedisClusterTestRule server = new 
NativeRedisClusterTestRule();
+
+  @Override
+  public int getPort() {
+    return server.getExposedPorts().get(0);
+  }
+
+  @Override
+  public void flushAll() {
+    server.flushAll();
+  }
 
-  /**
-   * @return true if the event is the last one in the transaction it belongs 
to or if the
-   *         event does not belong to a transaction. false, otherwise.
-   */
-  boolean isLastEventInTransaction();
 }
diff --git 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRangeIntegrationTest.java
 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRangeIntegrationTest.java
new file mode 100755
index 0000000..62817a6
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRangeIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZRangeIntegrationTest implements 
RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String member = "member";
+  private final String incrOption = "INCR";
+  private final double initial = 355.681000005;
+  private final double increment = 9554257.921450001;
+  private final double expected = initial + increment;
+
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void shouldError_givenWrongKeyType() {
+    final String STRING_KEY = "stringKey";
+    jedis.set(STRING_KEY, "value");
+    assertThatThrownBy(
+        () -> jedis.sendCommand(STRING_KEY, Protocol.Command.ZRANGE, 
STRING_KEY, "1", "2"))
+            .hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void shouldReturnSyntaxError_givenWrongWithscoresFlag() {
+    jedis.zadd(SORTED_SET_KEY, 1.0, member);
+    assertThatThrownBy(
+        () -> jedis.sendCommand(SORTED_SET_KEY, Protocol.Command.ZRANGE, 
SORTED_SET_KEY, "1", "2",
+            "WITHSCOREZ"))
+                .hasMessageContaining(ERROR_SYNTAX);
+  }
+}
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeIntegrationTest.java
old mode 100644
new mode 100755
similarity index 65%
copy from 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
copy to 
geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeIntegrationTest.java
index 4e99a4b..a5579d5
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeIntegrationTest.java
@@ -12,24 +12,20 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-
 package org.apache.geode.redis.internal.executor.sortedset;
 
-import java.util.List;
-
-import org.apache.geode.redis.internal.data.RedisKey;
-
-public interface RedisSortedSetCommands {
-
-  Object zadd(RedisKey key, List<byte[]> scoresAndMembersToAdd, ZAddOptions 
options);
+import org.junit.ClassRule;
 
-  byte[] zscore(RedisKey key, byte[] member);
+import org.apache.geode.redis.GeodeRedisServerRule;
 
-  int zrank(RedisKey key, byte[] member);
+public class ZRangeIntegrationTest extends AbstractZRangeIntegrationTest {
 
-  long zrem(RedisKey key, List<byte[]> membersToRemove);
+  @ClassRule
+  public static GeodeRedisServerRule server = new GeodeRedisServerRule();
 
-  long zcard(RedisKey key);
+  @Override
+  public int getPort() {
+    return server.getPort();
+  }
 
-  byte[] zincrby(RedisKey key, byte[] increment, byte[] member);
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 9269132..5101de6 100755
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -97,6 +97,7 @@ import 
org.apache.geode.redis.internal.executor.set.SUnionStoreExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZAddExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZCardExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZIncrByExecutor;
+import org.apache.geode.redis.internal.executor.sortedset.ZRangeExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZRankExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZRemExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZScoreExecutor;
@@ -202,6 +203,8 @@ public enum RedisCommandType {
   ZADD(new ZAddExecutor(), SUPPORTED, new MinimumParameterRequirements(4)),
   ZCARD(new ZCardExecutor(), SUPPORTED, new ExactParameterRequirements(2)),
   ZINCRBY(new ZIncrByExecutor(), SUPPORTED, new ExactParameterRequirements(4)),
+  ZRANGE(new ZRangeExecutor(), SUPPORTED, new MinimumParameterRequirements(4)
+      .and(new MaximumParameterRequirements(5, ERROR_SYNTAX))),
   ZRANK(new ZRankExecutor(), SUPPORTED, new ExactParameterRequirements(3)),
   ZREM(new ZRemExecutor(), SUPPORTED, new MinimumParameterRequirements(3)),
   ZSCORE(new ZScoreExecutor(), SUPPORTED, new ExactParameterRequirements(3)),
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java
index e8c9c42..d89e04a 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.redis.internal.collections;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.TreeSet;
 
@@ -23,6 +24,7 @@ import java.util.TreeSet;
  * Only for testing and performance comparisons.
  */
 class IndexibleTreeSet<E> extends TreeSet<E> implements OrderStatisticsSet<E> {
+
   private static final long serialVersionUID = 521865987126101683L;
 
   @Override
@@ -47,4 +49,13 @@ class IndexibleTreeSet<E> extends TreeSet<E> implements 
OrderStatisticsSet<E> {
   public int indexOf(E element) {
     return headSet(element).size();
   }
+
+  @Override
+  public ArrayList<E> getIndexRange(int min, int max) {
+    return null;
+  }
+
+  public int findMin(E element) {
+    return 0;
+  }
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java
index 0e09f0f..8dcb4df 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java
@@ -26,6 +26,7 @@
 
 package org.apache.geode.redis.internal.collections;
 
+import java.util.ArrayList;
 import java.util.Set;
 
 /**
@@ -59,4 +60,13 @@ public interface OrderStatisticsSet<T> extends Set<T> {
    *         in this set.
    */
   int indexOf(T element);
+
+  /**
+   * Returns a range of <code>elements</code> between min and max.
+   *
+   * @param min the minimum element.
+   * @param max the maximum element.
+   * @return an ArrayList of <code>elements</code>.
+   */
+  ArrayList<T> getIndexRange(int min, int max);
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java
index 49ee9d7..b619136 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java
@@ -28,6 +28,7 @@ package org.apache.geode.redis.internal.collections;
 
 import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
@@ -48,7 +49,6 @@ import org.apache.geode.annotations.VisibleForTesting;
  */
 public class OrderStatisticsTree<E extends Comparable<? super E>>
     implements OrderStatisticsSet<E> {
-
   private Node<E> root;
   private int size;
   private int modCount;
@@ -205,6 +205,57 @@ public class OrderStatisticsTree<E extends Comparable<? 
super E>>
     return true;
   }
 
+  public ArrayList<E> getIndexRange(int min, int max) {
+    ArrayList<E> entryList = new ArrayList<>();
+    Node<E> current = getNode(min);
+    for (int i = min; current != null && i <= max; i++) {
+      entryList.add(current.key);
+      current = successorOf(current);
+    }
+    return entryList;
+  }
+
+  public Iterator<E> iterator(int index) {
+    Node<E> node = getNode(index);
+    return new TreeIterator(node);
+  }
+
+  private Node<E> getNode(int index) {
+    checkIndex(index);
+    Node<E> node = root;
+
+    while (true) {
+      if (index > node.count) {
+        index -= node.count + 1;
+        node = node.right;
+      } else if (index < node.count) {
+        node = node.left;
+      } else {
+        return node;
+      }
+    }
+  }
+
+  private Node<E> findMinNode(E o) {
+    Node<E> x = root;
+    int cmp;
+
+    while ((cmp = o.compareTo(x.key)) != 0) {
+      if (cmp < 0) {
+        if (x.left == null) {
+          break;
+        }
+        x = x.left;
+      } else {
+        if (x.right == null) {
+          break;
+        }
+        x = x.right;
+      }
+    }
+    return x;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public boolean contains(Object o) {
@@ -466,6 +517,14 @@ public class OrderStatisticsTree<E extends Comparable<? 
super E>>
     return node;
   }
 
+  private Node<E> maximumNode(Node<E> node) {
+    while (node.right != null) {
+      node = node.right;
+    }
+
+    return node;
+  }
+
   private int height(Node<E> node) {
     return node == null ? -1 : node.height;
   }
@@ -717,6 +776,14 @@ public class OrderStatisticsTree<E extends Comparable<? 
super E>>
       }
     }
 
+    public TreeIterator(Node<E> node) {
+      if (root == null) {
+        nextNode = null;
+      } else {
+        nextNode = node;
+      }
+    }
+
     @Override
     public boolean hasNext() {
       return nextNode != null;
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
index 8cbf0a1..b771be4 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
@@ -86,4 +86,9 @@ class NullRedisSortedSet extends RedisSortedSet {
     return -1;
   }
 
+  @Override
+  List<byte[]> zrange(byte[] min, byte[] max,
+      boolean withScores) {
+    return null;
+  }
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
index 8a9faf4..4a0f6a7 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
@@ -19,6 +19,7 @@ package org.apache.geode.redis.internal.data;
 import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_A_VALID_FLOAT;
 import static 
org.apache.geode.redis.internal.data.RedisDataType.REDIS_SORTED_SET;
 import static org.apache.geode.redis.internal.netty.Coder.bytesToDouble;
+import static org.apache.geode.redis.internal.netty.Coder.bytesToInt;
 import static org.apache.geode.redis.internal.netty.Coder.doubleToBytes;
 
 import java.io.DataInput;
@@ -312,6 +313,41 @@ public class RedisSortedSet extends AbstractRedisData {
     return membersRemoved;
   }
 
+  List<byte[]> zrange(byte[] min, byte[] max,
+      boolean withScores) {
+    ArrayList<byte[]> result = new ArrayList<>();
+    int start = getBoundedStartIndex(bytesToInt(min), getSortedSetSize());
+    int end = getBoundedEndIndex(bytesToInt(max), getSortedSetSize());
+    if (start > end || start == getSortedSetSize()) {
+      return result;
+    }
+
+    ArrayList<OrderedSetEntry> entries = scoreSet.getIndexRange(start, end);
+    entries.forEach(entry -> {
+      result.add(entry.member);
+      if (withScores) {
+        result.add(entry.scoreBytes);
+      }
+    });
+    return result;
+  }
+
+  private int getBoundedStartIndex(int index, int size) {
+    if (index >= 0L) {
+      return Math.min(index, size);
+    } else {
+      return Math.max(index + size, 0);
+    }
+  }
+
+  private int getBoundedEndIndex(long index, int size) {
+    if (index >= 0L) {
+      return (int) Math.min(index, size);
+    } else {
+      return (int) Math.max(index + size, -1);
+    }
+  }
+
   synchronized byte[] memberRemove(byte[] member) {
     byte[] oldValue = null;
     OrderedSetEntry orderedSetEntry = members.remove(member);
@@ -389,6 +425,8 @@ public class RedisSortedSet extends AbstractRedisData {
   }
 
   static class OrderedSetEntry implements Comparable<OrderedSetEntry> {
+    public static final byte[] MAX_BYTES = 
"RaAdIsH_SeCrEt_MaX_ByTeS_VaL".getBytes();
+
     private final byte[] member;
     private final byte[] scoreBytes;
     private final Double score;
@@ -397,11 +435,19 @@ public class RedisSortedSet extends AbstractRedisData {
       return scoreBytes;
     }
 
+    public byte[] getMember() {
+      return member;
+    }
+
     @Override
     public int compareTo(OrderedSetEntry o) {
       int comparison = score.compareTo(o.score);
       if (comparison == 0) {
-        // Scores equal, try lexical ordering
+        // MAX_BYTES always bigger than actual entry
+        if (member.equals(MAX_BYTES)) {
+          return -1;
+        }
+        // Scores equal, not MAX, try lexical ordering
         return javaImplementationOfAnsiCMemCmp(member, o.member);
       }
       return comparison;
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
index 3db7971..ae106d4 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
@@ -41,6 +41,12 @@ public class RedisSortedSetCommandsFunctionExecutor extends 
RedisDataCommandsFun
   }
 
   @Override
+  public List<byte[]> zrange(RedisKey key, byte[] min, byte[] max, boolean 
withScores) {
+    return stripedExecute(key,
+        () -> getRedisSortedSet(key, false).zrange(min, max, withScores));
+  }
+
+  @Override
   public byte[] zscore(RedisKey key, byte[] member) {
     return stripedExecute(key, () -> getRedisSortedSet(key, 
true).zscore(member));
   }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
index 4e99a4b..268d760 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
@@ -23,6 +23,8 @@ public interface RedisSortedSetCommands {
 
   Object zadd(RedisKey key, List<byte[]> scoresAndMembersToAdd, ZAddOptions 
options);
 
+  List<byte[]> zrange(RedisKey key, byte[] min, byte[] max, boolean 
withScores);
+
   byte[] zscore(RedisKey key, byte[] member);
 
   int zrank(RedisKey key, byte[] member);
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeExecutor.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeExecutor.java
new file mode 100644
index 0000000..d583c92
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.netty.Coder.equalsIgnoreCaseBytes;
+
+import java.util.List;
+
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZRangeExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    boolean withScores = false;
+    RedisSortedSetCommands redisSortedSetCommands = 
context.getSortedSetCommands();
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    byte[] min = commandElements.get(1);
+    byte[] max = commandElements.get(2);
+    if (commandElements.size() == 5) {
+      if (equalsIgnoreCaseBytes(commandElements.get(3), 
"WITHSCORES".getBytes())) {
+        withScores = true;
+      } else {
+        return RedisResponse.error(ERROR_SYNTAX);
+      }
+    }
+
+    List<byte[]> retVal = redisSortedSetCommands.zrange(command.getKey(), min, 
max, withScores);
+
+    return RedisResponse.array(retVal);
+  }
+}
diff --git 
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java
 
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java
index 7e93a3b..7cfec39 100644
--- 
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java
+++ 
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java
@@ -28,6 +28,7 @@ package org.apache.geode.redis.internal.collections;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
@@ -569,4 +570,25 @@ public class OrderStatisticsTreeTest {
     assertThat(array2after).isSameAs(array2before);
     assertThat(array2after).isEqualTo(array1after);
   }
+
+  @Test
+  public void testGetIndexRange() {
+    for (int i = 0; i < 100; ++i) {
+      tree.add(i);
+    }
+    ArrayList<Integer> subSet = tree.getIndexRange(0, 9);
+    assertThat(subSet.size()).isEqualTo(10);
+    assertThat(subSet).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+    subSet = tree.getIndexRange(95, 110);
+    assertThat(subSet.size()).isEqualTo(5);
+    assertThat(subSet).containsExactly(95, 96, 97, 98, 99);
+
+    subSet = tree.getIndexRange(45, 55);
+    assertThat(subSet.size()).isEqualTo(11);
+    assertThat(subSet).containsExactly(45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 
55);
+
+    subSet = tree.getIndexRange(10, 0);
+    assertThat(subSet.size()).isEqualTo(0);
+  }
 }
diff --git 
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
 
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
index ba859de..9ab3b0b 100644
--- 
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
+++ 
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
@@ -18,9 +18,11 @@ package org.apache.geode.redis.internal.data;
 
 import static java.lang.Math.round;
 import static 
org.apache.geode.redis.internal.data.RedisSortedSet.BASE_REDIS_SORTED_SET_OVERHEAD;
+import static org.apache.geode.redis.internal.netty.Coder.intToBytes;
 import static org.apache.geode.redis.internal.netty.Coder.stringToBytes;
 import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.in;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -32,6 +34,7 @@ import java.io.IOException;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -302,6 +305,80 @@ public class RedisSortedSetTest {
     assertThat(actual).isCloseTo(expected, offset);
   }
 
+  RedisSortedSet rangeSortedSet =
+      createRedisSortedSet(
+          "1.0", member1, "1.1", member2, "1.2", "member3", "1.3", "member4",
+          "1.4", "member5", "1.5", "member6", "1.6", "member7", "1.7", 
"member8",
+          "1.8", "member9", "1.9", "member10", "2.0", "member11", "2.1", 
"member12");
+
+  @Test
+  public void zrange_ShouldReturnEmptyList_GivenInvalidRanges() {
+    Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(5), 
intToBytes(0), false);
+    assertThat(rangeList).isEmpty();
+    rangeList = rangeSortedSet.zrange(intToBytes(13), intToBytes(15), false);
+    assertThat(rangeList).isEmpty();
+    rangeList = rangeSortedSet.zrange(intToBytes(17), intToBytes(-2), false);
+    assertThat(rangeList).isEmpty();
+    rangeList = rangeSortedSet.zrange(intToBytes(12), intToBytes(12), false);
+    assertThat(rangeList).isEmpty();
+  }
+
+
+  @Test
+  public void zrange_ShouldReturnSimpleRanges() {
+    Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(0), 
intToBytes(5), false);
+    assertThat(rangeList).hasSize(6);
+    assertThat(rangeList)
+        .containsExactly("member1".getBytes(), "member2".getBytes(), 
"member3".getBytes(),
+            "member4".getBytes(), "member5".getBytes(), "member6".getBytes());
+
+    rangeList = rangeSortedSet.zrange(intToBytes(5), intToBytes(10), false);
+    assertThat(rangeList).hasSize(6);
+    assertThat(rangeList)
+        .containsExactly("member6".getBytes(), "member7".getBytes(), 
"member8".getBytes(),
+            "member9".getBytes(), "member10".getBytes(), 
"member11".getBytes());
+
+    rangeList = rangeSortedSet.zrange(intToBytes(10), intToBytes(13), false);
+    assertThat(rangeList).hasSize(2);
+    assertThat(rangeList).containsExactly("member11".getBytes(), 
"member12".getBytes());
+  }
+
+  @Test
+  public void zrange_ShouldReturnRanges_SpecifiedWithNegativeOffsets() {
+    Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(-2), 
intToBytes(12), false);
+    assertThat(rangeList).hasSize(2);
+    assertThat(rangeList).containsExactly("member11".getBytes(), 
"member12".getBytes());
+
+    rangeList = rangeSortedSet.zrange(intToBytes(-6), intToBytes(-1), false);
+    assertThat(rangeList).hasSize(6);
+    assertThat(rangeList)
+        .containsExactly("member7".getBytes(), "member8".getBytes(),
+            "member9".getBytes(), "member10".getBytes(), "member11".getBytes(),
+            "member12".getBytes());
+
+    rangeList = rangeSortedSet.zrange(intToBytes(-11), intToBytes(-5), false);
+    assertThat(rangeList).hasSize(7);
+    assertThat(rangeList)
+        .containsExactly("member2".getBytes(), "member3".getBytes(),
+            "member4".getBytes(), "member5".getBytes(), "member6".getBytes(),
+            "member7".getBytes(), "member8".getBytes());
+
+    rangeList = rangeSortedSet.zrange(intToBytes(-12), intToBytes(-11), false);
+    assertThat(rangeList).hasSize(2);
+    assertThat(rangeList)
+        .containsExactly("member1".getBytes(), "member2".getBytes());
+  }
+
+  @Test
+  public void zrange_shouldAlsoReturnScores_whenWithScoresSpecified() {
+    Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(0), 
intToBytes(5), true);
+    assertThat(rangeList).hasSize(12);
+    assertThat(rangeList).containsExactly("member1".getBytes(), 
"1.0".getBytes(),
+        "member2".getBytes(), "1.1".getBytes(), "member3".getBytes(), 
"1.2".getBytes(),
+        "member4".getBytes(), "1.3".getBytes(), "member5".getBytes(), 
"1.4".getBytes(),
+        "member6".getBytes(), "1.5".getBytes());
+  }
+
   private RedisSortedSet createRedisSortedSet(String... membersAndScores) {
     final List<byte[]> membersAndScoresList = Arrays
         .stream(membersAndScores)
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java 
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
index 8e498c3..1f696f6 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
@@ -174,29 +174,7 @@ public interface GatewaySender {
    */
   int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES =
       Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + 
"get-transaction-events-from-queue-retries",
-          2);
-  /**
-   * Milliseconds to wait before retrying to get events for a transaction from 
the
-   * gateway sender queue when group-transaction-events is true.
-   */
-  int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS =
-      Integer.getInteger(
-          GeodeGlossary.GEMFIRE_PREFIX + 
"get-transaction-events-from-queue-wait-time-ms",
-          1);
-
-  /**
-   * When group-transaction-events is true and the gateway sender is stopped,
-   * addition to the queue of a group of transaction events might be 
interrupted.
-   * To ensure that the queue does not contain incomplete transactions, this 
parameter
-   * allows for a grace period, specified in milliseconds, before the gateway 
sender is actually
-   * stopped, allowing complete transaction event groups to be queued. Any 
event received
-   * during the grace period that is not part of a transaction event group in 
the queue
-   * is dropped.
-   */
-  int TIME_TO_COMPLETE_TRANSACTIONS_BEFORE_STOP_MS =
-      Integer.getInteger(
-          GeodeGlossary.GEMFIRE_PREFIX + 
"time-to-complete-transactions-before-stop-ms",
-          1000);
+          10);
 
   /**
    * The order policy. This enum is applicable only when concurrency-level is 
> 1.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index fd5b724..9069830 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -39,13 +39,13 @@ import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.internal.cache.execute.BucketMovedException;
 import org.apache.geode.internal.cache.persistence.query.mock.ByteComparator;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import 
org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException;
 import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.concurrent.Atomics;
@@ -471,25 +471,21 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
    * If a matching object also fulfills the endPredicate then the method
    * stops looking for more matching objects.
    */
-  public List<Object> getElementsMatching(Predicate<InternalGatewayQueueEvent> 
matchingPredicate,
-      Predicate<InternalGatewayQueueEvent> endPredicate) {
+  public List<Object> getElementsMatching(Predicate matchingPredicate, 
Predicate endPredicate) {
     getInitializationLock().readLock().lock();
     try {
       if (this.getPartitionedRegion().isDestroyed()) {
         throw new BucketRegionQueueUnavailableException();
       }
-      List<Object> elementsMatching = new ArrayList<>();
+      List<Object> elementsMatching = new ArrayList<Object>();
       Iterator<Object> it = this.eventSeqNumDeque.iterator();
       while (it.hasNext()) {
         Object key = it.next();
-        Object event = optimalGet(key);
-        if (!(event instanceof InternalGatewayQueueEvent)) {
-          continue;
-        }
-        if (matchingPredicate.test((InternalGatewayQueueEvent) event)) {
-          elementsMatching.add(event);
+        Object object = optimalGet(key);
+        if (matchingPredicate.test(object)) {
+          elementsMatching.add(object);
           this.eventSeqNumDeque.remove(key);
-          if (endPredicate.test((InternalGatewayQueueEvent) event)) {
+          if (endPredicate.test(object)) {
             break;
           }
         }
@@ -500,27 +496,6 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     }
   }
 
-  public boolean hasEventsMatching(Predicate<InternalGatewayQueueEvent> 
matchingPredicate) {
-    getInitializationLock().readLock().lock();
-    try {
-      if (this.getPartitionedRegion().isDestroyed()) {
-        throw new BucketRegionQueueUnavailableException();
-      }
-      for (Object o : eventSeqNumDeque) {
-        Object event = optimalGet(o);
-        if (!(event instanceof InternalGatewayQueueEvent)) {
-          continue;
-        }
-        if (matchingPredicate.test((InternalGatewayQueueEvent) event)) {
-          return true;
-        }
-      }
-      return false;
-    } finally {
-      getInitializationLock().readLock().unlock();
-    }
-  }
-
   @Override
   protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl 
event) {
     if (didPut) {
@@ -694,7 +669,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
       }
       return eventSeqNumDeque.stream()
           .map(this::optimalGet)
-          .filter(o -> o instanceof InternalGatewayQueueEvent)
+          .filter(o -> o instanceof GatewayQueueEvent)
           .collect(Collectors.toList());
 
     } finally {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 6760727..4fad440 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Predicate;
 
 import org.apache.logging.log4j.Logger;
 
@@ -128,8 +127,6 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
 
   protected boolean groupTransactionEvents;
 
-  protected volatile boolean isStopping = false;
-
   protected boolean isForInternalUse;
 
   protected boolean isDiskSynchronous;
@@ -726,21 +723,6 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
     return enqueue;
   }
 
-  protected void preStop() {
-    isStopping = true;
-    if (mustGroupTransactionEvents()) {
-      try {
-        Thread.sleep(TIME_TO_COMPLETE_TRANSACTIONS_BEFORE_STOP_MS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  protected void postStop() {
-    isStopping = false;
-  }
-
   protected void stopProcessing() {
     // Stop the dispatcher
     AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
@@ -1096,8 +1078,14 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
         clonedEvent.setCallbackArgument(geCallbackArg);
       }
 
+      // If this gateway is not running, return
       if (!isRunning()) {
-        recordDroppedEvent(clonedEvent);
+        if (this.isPrimary()) {
+          recordDroppedEvent(clonedEvent);
+        }
+        if (isDebugEnabled) {
+          logger.debug("Returning back without putting into the gateway sender 
queue:" + event);
+        }
         return;
       }
 
@@ -1124,7 +1112,12 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
         // If this gateway is not running, return
         // The sender may have stopped, after we have checked the status in 
the beginning.
         if (!isRunning()) {
-          recordDroppedEvent(clonedEvent);
+          if (isDebugEnabled) {
+            logger.debug("Returning back without putting into the gateway 
sender queue:" + event);
+          }
+          if (this.isPrimary()) {
+            recordDroppedEvent(clonedEvent);
+          }
           return;
         }
 
@@ -1144,21 +1137,7 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
           // Get substitution value to enqueue if necessary
           Object substituteValue = getSubstituteValue(clonedEvent, operation);
 
-          Predicate<InternalGatewayQueueEvent> hasSameTransactionIdPredicate = 
null;
-          // In case the sender is about to be stopped, the event will only
-          // be queued if there is any event in the queue with the same
-          // transactionId as the one of this event
-          if (isStopping && mustGroupTransactionEvents()
-              && clonedEvent.getTransactionId() != null) {
-            hasSameTransactionIdPredicate =
-                x -> x != null && clonedEvent.getTransactionId() != null
-                    && clonedEvent.getTransactionId()
-                        .equals((x).getTransactionId());
-          }
-          if (!ev.enqueueEvent(operation, clonedEvent, substituteValue, 
isLastEventInTransaction,
-              hasSameTransactionIdPredicate)) {
-            recordDroppedEvent(event);
-          }
+          ev.enqueueEvent(operation, clonedEvent, substituteValue, 
isLastEventInTransaction);
         } catch (CancelException e) {
           logger.debug("caught cancel exception", e);
           throw e;
@@ -1184,9 +1163,6 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
   }
 
   private void recordDroppedEvent(EntryEventImpl event) {
-    if (!this.isPrimary()) {
-      return;
-    }
     if (this.eventProcessor != null) {
       this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
     } else {
@@ -1195,9 +1171,6 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
         logger.debug("added to tmpDroppedEvents event: {}", event);
       }
     }
-    if (logger.isDebugEnabled()) {
-      logger.debug("Returning without putting into the gateway sender queue:" 
+ event);
-    }
   }
 
   @VisibleForTesting
@@ -1594,7 +1567,7 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
       try {
         logger.info("{}: Enqueueing synchronization event: {}",
             this, event);
-        this.eventProcessor.enqueueEvent(event, null);
+        this.eventProcessor.enqueueEvent(event);
         this.statistics.incSynchronizationEventsEnqueued();
       } catch (Throwable t) {
         logger.warn(String.format(
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index ca22c1e..a02f929 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -26,7 +26,6 @@ import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
 
 import org.apache.logging.log4j.Logger;
 
@@ -176,25 +175,14 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends LoggingThread
 
   protected abstract void initializeMessageQueue(String id, boolean 
cleanQueues);
 
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event,
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event,
       Object substituteValue) throws IOException, CacheException {
-    return enqueueEvent(operation, event, substituteValue, false, null);
+    enqueueEvent(operation, event, substituteValue, false);
   }
 
-  /**
-   *
-   * @param operation The operation
-   * @param event The event to be put in the queue
-   * @param substituteValue The substitute value
-   * @param isLastEventInTransaction True if this event is the last one in the
-   *        transaction it belongs to
-   * @param condition If not null, the event will be enqueued only if at least
-   *        one element in the queue matches the predicate
-   * @return False only if the condition is not null and no element in the 
queue matches it
-   */
-  public abstract boolean enqueueEvent(EnumListenerEvent operation, EntryEvent 
event,
-      Object substituteValue, boolean isLastEventInTransaction,
-      Predicate<InternalGatewayQueueEvent> condition) throws IOException, 
CacheException;
+  public abstract void enqueueEvent(EnumListenerEvent operation, EntryEvent 
event,
+      Object substituteValue, boolean isLastEventInTransaction) throws 
IOException, CacheException;
+
 
   protected abstract void rebalance();
 
@@ -1401,8 +1389,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends LoggingThread
     }
   }
 
-  protected abstract boolean enqueueEvent(GatewayQueueEvent event,
-      Predicate<InternalGatewayQueueEvent> condition);
+  protected abstract void enqueueEvent(GatewayQueueEvent event);
 
   protected class SenderStopperCallable implements Callable<Boolean> {
     private final AbstractGatewaySenderEventProcessor p;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 1626a5e..0a76c06 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -67,8 +67,7 @@ import org.apache.geode.internal.size.Sizeable;
  *
  */
 public class GatewaySenderEventImpl
-    implements InternalGatewayQueueEvent, AsyncEvent, DataSerializableFixedID, 
Conflatable,
-    Sizeable, Releasable {
+    implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, 
Releasable {
   private static final long serialVersionUID = -5690172020872255422L;
   protected static final Object TOKEN_NULL = new Object();
 
@@ -1260,12 +1259,10 @@ public class GatewaySenderEventImpl
     return this.shadowKey;
   }
 
-  @Override
   public boolean isLastEventInTransaction() {
     return isLastEventInTransaction;
   }
 
-  @Override
   public TransactionId getTransactionId() {
     return transactionId;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 4b8d641..0ef1fd3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.function.Predicate;
 
 import org.apache.logging.log4j.Logger;
 
@@ -48,7 +47,6 @@ import 
org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -134,22 +132,22 @@ public class ConcurrentParallelGatewaySenderEventProcessor
   }
 
   @Override
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue)
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue)
       throws IOException, CacheException {
-    return enqueueEvent(operation, event, substituteValue, false, null);
+    enqueueEvent(operation, event, substituteValue, false);
   }
 
   @Override
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
-      boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> 
condition)
-      throws IOException, CacheException {
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
+      boolean isLastEventInTransaction) throws IOException, CacheException {
+    Region region = event.getRegion();
+    // int bucketId = 
PartitionedRegionHelper.getHashKey((EntryOperation)event);
     int bucketId = ((EntryEventImpl) event).getEventId().getBucketID();
     if (bucketId < 0) {
-      return true;
+      return;
     }
     int pId = bucketId % this.nDispatcher;
-    return this.processors[pId].enqueueEvent(operation, event, substituteValue,
-        isLastEventInTransaction, condition);
+    this.processors[pId].enqueueEvent(operation, event, substituteValue, 
isLastEventInTransaction);
   }
 
   @Override
@@ -351,10 +349,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor
   }
 
   @Override
-  protected boolean enqueueEvent(GatewayQueueEvent event,
-      Predicate<InternalGatewayQueueEvent> condition) {
+  protected void enqueueEvent(GatewayQueueEvent event) {
     int pId = ((GatewaySenderEventImpl) event).getBucketId() % 
this.nDispatcher;
-    return this.processors[pId].enqueueEvent(event, condition);
+    this.processors[pId].enqueueEvent(event);
   }
 
   private ThreadsMonitoring getThreadMonitorObj() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index ab3a458..65567f0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -18,7 +18,6 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.function.Predicate;
 
 import org.apache.logging.log4j.Logger;
 
@@ -37,7 +36,6 @@ import 
org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import 
org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
@@ -99,10 +97,10 @@ public class ParallelGatewaySenderEventProcessor extends 
AbstractGatewaySenderEv
   }
 
   @Override
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
-      boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> 
condition)
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
+      boolean isLastEventInTransaction)
       throws IOException, CacheException {
-    GatewaySenderEventImpl gatewayQueueEvent;
+    GatewaySenderEventImpl gatewayQueueEvent = null;
     Region region = event.getRegion();
 
     if (!(region instanceof DistributedRegion) && ((EntryEventImpl) 
event).getTailKey() == -1) {
@@ -115,7 +113,7 @@ public class ParallelGatewaySenderEventProcessor extends 
AbstractGatewaySenderEv
             "ParallelGatewaySenderEventProcessor not enqueing the following 
event since tailKey is not set. {}",
             event);
       }
-      return true;
+      return;
     }
 
     // TODO: Looks like for PDX region bucket id is set to -1.
@@ -127,23 +125,16 @@ public class ParallelGatewaySenderEventProcessor extends 
AbstractGatewaySenderEv
         new GatewaySenderEventImpl(operation, event, substituteValue, true, 
eventID.getBucketID(),
             isLastEventInTransaction);
 
-    return enqueueEvent(gatewayQueueEvent, condition);
+    enqueueEvent(gatewayQueueEvent);
   }
 
   @Override
-  protected boolean enqueueEvent(GatewayQueueEvent gatewayQueueEvent,
-      Predicate<InternalGatewayQueueEvent> condition) {
+  protected void enqueueEvent(GatewayQueueEvent gatewayQueueEvent) {
     boolean queuedEvent = false;
     try {
       if (getSender().beforeEnqueue(gatewayQueueEvent)) {
         long start = getSender().getStatistics().startTime();
         try {
-          if (condition != null &&
-              !((ParallelGatewaySenderQueue) this.queue).hasEventsMatching(
-                  (GatewaySenderEventImpl) gatewayQueueEvent,
-                  condition)) {
-            return false;
-          }
           queuedEvent = this.queue.put(gatewayQueueEvent);
         } catch (InterruptedException e) {
           e.printStackTrace();
@@ -161,7 +152,6 @@ public class ParallelGatewaySenderEventProcessor extends 
AbstractGatewaySenderEv
         ((GatewaySenderEventImpl) gatewayQueueEvent).release();
       }
     }
-    return true;
   }
 
   @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 848ed80..d3f8868 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.apache.geode.cache.Region.SEPARATOR;
 import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_SIZE;
 import static 
org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES;
-import static 
org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS;
 import static 
org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
 
 import java.util.ArrayList;
@@ -89,7 +88,6 @@ import 
org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import org.apache.geode.internal.size.SingleObjectSizer;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.internal.util.concurrent.StoppableCondition;
@@ -712,16 +710,44 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
     boolean isDREvent = isDREvent(sender.getCache(), value);
 
-    String regionPath = getRegionPathForEventAndType(value, isDREvent);
-    if (regionPath == null) {
+    String regionPath = value.getRegionPath();
+    if (!isDREvent) {
+      Region region = sender.getCache().getRegion(regionPath, true);
+      regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion) 
region).getFullPath();
+    }
+    if (isDebugEnabled) {
+      logger.debug("Put is for the region {}", regionPath);
+    }
+    if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
+      if (isDebugEnabled) {
+        logger.debug("The userRegionNameToshadowPRMap is {}", 
userRegionNameToShadowPRMap);
+      }
+      logger.warn(
+          "GatewaySender: Not queuing the event {}, as the region for which 
this event originated is not yet configured in the GatewaySender",
+          value);
+      // does not put into queue
       return false;
     }
 
     PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath);
     int bucketId = value.getBucketId();
-    Object key = getKeyForEventAndType(value, isDREvent);
-    if (key == null) {
-      return false;
+    Object key = null;
+    if (!isDREvent) {
+      key = value.getShadowKey();
+
+      if ((Long) key == -1) {
+        // In case of parallel we don't expect
+        // the key to be not set. If it is the case then the event must be 
coming
+        // through listener, so return.
+        if (isDebugEnabled) {
+          logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : 
Value : {}", key,
+              value);
+        }
+        // does not put into queue
+        return false;
+      }
+    } else {
+      key = value.getEventId();
     }
 
     if (isDebugEnabled) {
@@ -853,49 +879,6 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return putDone;
   }
 
-  private String getRegionPathForEventAndType(GatewaySenderEventImpl event, 
boolean isDREvent) {
-    boolean isDebugEnabled = logger.isDebugEnabled();
-    String regionPath = event.getRegionPath();
-    if (!isDREvent) {
-      PartitionedRegion region = (PartitionedRegion) 
sender.getCache().getRegion(regionPath, true);
-      regionPath = ColocationHelper.getLeaderRegion(region).getFullPath();
-    }
-    if (isDebugEnabled) {
-      logger.debug("Put is for region {}", regionPath);
-    }
-    if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
-      if (isDebugEnabled) {
-        logger.debug("The userRegionNameToShadowPRMap is {}", 
userRegionNameToShadowPRMap);
-      }
-      logger.warn(
-          "GatewaySender: Not queuing the event {}, as the region for which 
this event originated is not yet configured in the GatewaySender",
-          event);
-      return null;
-    }
-    return regionPath;
-  }
-
-  private Object getKeyForEventAndType(GatewaySenderEventImpl event, boolean 
isDREvent) {
-    Object key;
-    if (!isDREvent) {
-      key = event.getShadowKey();
-
-      if ((Long) key == -1) {
-        // In the case of parallel we expect the key to be set.
-        // If the key is not set, then the event must be coming
-        // through listener, so return.
-        if (logger.isDebugEnabled()) {
-          logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : 
Value : {}", key,
-              event);
-        }
-        return null;
-      }
-    } else {
-      key = event.getEventId();
-    }
-    return key;
-  }
-
   public void notifyEventProcessorIfRequired() {
     // putter thread should not take lock every time
     if (isQueueEmpty) {
@@ -1424,15 +1407,10 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
           retries++ == GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES) {
         break;
       }
-      try {
-        Thread.sleep(GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
     }
     if (incompleteTransactionIdsInBatch.size() > 0) {
-      logger.warn("Not able to retrieve all events for transactions: {} after 
{} retries of {}ms",
-          incompleteTransactionIdsInBatch, retries, 
GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS);
+      logger.warn("Not able to retrieve all events for transactions: {} after 
{} retries",
+          incompleteTransactionIdsInBatch, retries);
       stats.incBatchesWithIncompleteTransactions();
     }
   }
@@ -1604,9 +1582,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
 
     try {
-      Predicate<InternalGatewayQueueEvent> hasTransactionIdPredicate =
+      Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
           getHasTransactionIdPredicate(transactionId);
-      Predicate<InternalGatewayQueueEvent> isLastEventInTransactionPredicate =
+      Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
           getIsLastEventInTransactionPredicate();
       objects =
           brq.getElementsMatching(hasTransactionIdPredicate, 
isLastEventInTransactionPredicate);
@@ -1620,12 +1598,12 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
   }
 
   @VisibleForTesting
-  public static Predicate<InternalGatewayQueueEvent> 
getIsLastEventInTransactionPredicate() {
+  public static Predicate<GatewaySenderEventImpl> 
getIsLastEventInTransactionPredicate() {
     return x -> x.isLastEventInTransaction();
   }
 
   @VisibleForTesting
-  public static Predicate<InternalGatewayQueueEvent> 
getHasTransactionIdPredicate(
+  public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate(
       TransactionId transactionId) {
     return x -> transactionId.equals(x.getTransactionId());
   }
@@ -1832,21 +1810,6 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     return regionName.substring(1, queueStringStart);
   }
 
-  public boolean hasEventsMatching(GatewaySenderEventImpl event,
-      Predicate<InternalGatewayQueueEvent> condition) {
-    boolean isDREvent = isDREvent(sender.getCache(), event);
-
-    String regionPath = getRegionPathForEventAndType(event, isDREvent);
-    if (regionPath == null) {
-      return true;
-    }
-    PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath);
-    int bucketId = event.getBucketId();
-    BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
-
-    return brq.hasEventsMatching(condition);
-  }
-
   // TODO:REF: Name for this class should be appropriate?
   private class BatchRemovalThread extends Thread {
     /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index aefd8cf..7adf996 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
@@ -47,7 +46,6 @@ import 
org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.internal.offheap.annotations.Released;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
@@ -111,22 +109,19 @@ public class ConcurrentSerialGatewaySenderEventProcessor
 
   // based on the fix for old wan Bug#46992 .revision is 39437
   @Override
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue)
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue)
       throws IOException, CacheException {
-    enqueueEvent(operation, event, substituteValue, false, null);
-    return true;
+    enqueueEvent(operation, event, substituteValue, false);
 
   }
 
   @Override
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
-      boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> 
condition)
-      throws IOException, CacheException {
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
+      boolean isLastEventInTransaction) throws IOException, CacheException {
     // Get the appropriate index into the gateways
     int index = Math.abs(getHashCode(((EntryEventImpl) event)) % 
this.processors.size());
     // Distribute the event to the gateway
-    return enqueueEvent(operation, event, substituteValue, index, 
isLastEventInTransaction,
-        condition);
+    enqueueEvent(operation, event, substituteValue, index, 
isLastEventInTransaction);
   }
 
   public void setModifiedEventId(EntryEventImpl clonedEvent, int index) {
@@ -158,9 +153,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor
     clonedEvent.setEventId(newEventId);
   }
 
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
-      int index, boolean isLastEventInTransaction, 
Predicate<InternalGatewayQueueEvent> condition)
-      throws CacheException, IOException {
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
+      int index, boolean isLastEventInTransaction) throws CacheException, 
IOException {
     // Get the appropriate gateway
     SerialGatewaySenderEventProcessor serialProcessor = 
this.processors.get(index);
 
@@ -172,15 +166,15 @@ public class ConcurrentSerialGatewaySenderEventProcessor
       EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event);
       try {
         setModifiedEventId(clonedEvent, index);
-        return serialProcessor.enqueueEvent(operation, clonedEvent, 
substituteValue,
-            isLastEventInTransaction, condition);
+        serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue,
+            isLastEventInTransaction);
       } finally {
         clonedEvent.release();
       }
     } else {
-      return serialProcessor.enqueueEvent(operation, event, substituteValue,
-          isLastEventInTransaction, condition);
+      serialProcessor.enqueueEvent(operation, event, substituteValue);
     }
+
   }
 
   @Override
@@ -424,12 +418,10 @@ public class ConcurrentSerialGatewaySenderEventProcessor
   }
 
   @Override
-  protected boolean enqueueEvent(GatewayQueueEvent event,
-      Predicate<InternalGatewayQueueEvent> condition) {
+  protected void enqueueEvent(GatewayQueueEvent event) {
     for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
-      serialProcessor.enqueueEvent(event, condition);
+      serialProcessor.enqueueEvent(event);
     }
-    return true;
   }
 
   protected ThreadsMonitoring getThreadMonitorObj() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 6a0d589..69142ce 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -25,7 +25,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
@@ -54,7 +53,6 @@ import 
org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
 import 
org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -397,9 +395,8 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
    * Add the input object to the event queue
    */
   @Override
-  public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
-      boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> 
condition)
-      throws IOException, CacheException {
+  public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
+      boolean isLastEventInTransaction) throws IOException, CacheException {
     // There is a case where the event is serialized for processing. The
     // region is not
     // serialized along with the event since it is a transient field. I
@@ -408,12 +405,7 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
     // name is
     // used in the sendBatch method, and it can't be null. See EntryEventImpl
     // for details.
-    GatewaySenderEventImpl senderEvent;
-
-    if (condition != null &&
-        !((SerialGatewaySenderQueue) queue).hasEventsMatching(condition)) {
-      return false;
-    }
+    GatewaySenderEventImpl senderEvent = null;
 
     boolean isPrimary = sender.isPrimary();
     if (!isPrimary) {
@@ -468,7 +460,6 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
         }
       }
     }
-    return true;
   }
 
   private boolean queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent)
@@ -881,8 +872,7 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
   }
 
   @Override
-  protected boolean enqueueEvent(GatewayQueueEvent event,
-      Predicate<InternalGatewayQueueEvent> condition) {
+  protected void enqueueEvent(GatewayQueueEvent event) {
     // @TODO This API hasn't been implemented yet
     throw new UnsupportedOperationException();
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 0d322d0..6fb0331 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -16,7 +16,6 @@ package org.apache.geode.internal.cache.wan.serial;
 
 import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_SIZE;
 import static 
org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES;
-import static 
org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS;
 
 import java.util.ArrayList;
 import java.util.Deque;
@@ -75,7 +74,6 @@ import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import org.apache.geode.internal.offheap.OffHeapClearRequired;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -501,15 +499,10 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
           retries++ == GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES) {
         break;
       }
-      try {
-        Thread.sleep(GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
     }
     if (incompleteTransactionIdsInBatch.size() > 0) {
-      logger.warn("Not able to retrieve all events for transactions: {} after 
{} retries of {}ms",
-          incompleteTransactionIdsInBatch, retries, 
GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS);
+      logger.warn("Not able to retrieve all events for transactions: {} after 
{} retries",
+          incompleteTransactionIdsInBatch, retries);
       stats.incBatchesWithIncompleteTransactions();
     }
   }
@@ -840,9 +833,9 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
 
   private List<KeyAndEventPair> peekEventsWithTransactionId(TransactionId 
transactionId,
       long lastKey) {
-    Predicate<InternalGatewayQueueEvent> hasTransactionIdPredicate =
+    Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
         x -> transactionId.equals(x.getTransactionId());
-    Predicate<InternalGatewayQueueEvent> isLastEventInTransactionPredicate =
+    Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
         x -> x.isLastEventInTransaction();
 
     return getElementsMatching(hasTransactionIdPredicate, 
isLastEventInTransactionPredicate,
@@ -854,10 +847,10 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
    * If a matching object also fulfills the endPredicate then the method
    * stops looking for more matching objects.
    */
-  List<KeyAndEventPair> 
getElementsMatching(Predicate<InternalGatewayQueueEvent> condition,
-      Predicate<InternalGatewayQueueEvent> stopCondition,
+  List<KeyAndEventPair> getElementsMatching(Predicate<GatewaySenderEventImpl> 
condition,
+      Predicate<GatewaySenderEventImpl> stopCondition,
       long lastKey) {
-    InternalGatewayQueueEvent event;
+    GatewaySenderEventImpl event;
     List<KeyAndEventPair> elementsMatching = new ArrayList<>();
 
     long currentKey = lastKey;
@@ -866,13 +859,13 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
       if (extraPeekedIds.contains(currentKey)) {
         continue;
       }
-      event = (InternalGatewayQueueEvent) optimalGet(currentKey);
+      event = (GatewaySenderEventImpl) optimalGet(currentKey);
       if (event == null) {
         continue;
       }
 
       if (condition.test(event)) {
-        elementsMatching.add(new KeyAndEventPair(currentKey, 
(GatewaySenderEventImpl) event));
+        elementsMatching.add(new KeyAndEventPair(currentKey, event));
 
         if (stopCondition.test(event)) {
           break;
@@ -883,25 +876,6 @@ public class SerialGatewaySenderQueue implements 
RegionQueue {
     return elementsMatching;
   }
 
-  public boolean hasEventsMatching(Predicate<InternalGatewayQueueEvent> 
condition) {
-    InternalGatewayQueueEvent event;
-    long currentKey = getHeadKey();
-    if (currentKey == getTailKey()) {
-      return false;
-    }
-    while ((currentKey = inc(currentKey)) != getTailKey()) {
-      event = (InternalGatewayQueueEvent) optimalGet(currentKey);
-
-      if (event == null) {
-        continue;
-      }
-      if (condition.test(event)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /**
    * Returns the value of the tail key. The tail key points to an empty where 
the next queue entry
    * will be stored.
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
index 8bcfa1d..530dec5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
@@ -38,7 +38,6 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent;
 import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
 import org.apache.geode.internal.statistics.DummyStatisticsFactory;
@@ -169,9 +168,9 @@ public class BucketRegionQueueJUnitTest {
     this.bucketRegionQueue.addToQueue(7L, event6);
     this.bucketRegionQueue.addToQueue(8L, event7);
 
-    Predicate<InternalGatewayQueueEvent> hasTransactionIdPredicate =
+    Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
         ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
-    Predicate<InternalGatewayQueueEvent> isLastEventInTransactionPredicate =
+    Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
         ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
     List<Object> objects = 
this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
         isLastEventInTransactionPredicate);
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index ad49e9c..f31b46b 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1320,23 +1320,6 @@ public class WANTestBase extends DistributedTestCase {
     assertEquals(creates, gatewayReceiverStats.getCreateRequest());
   }
 
-  public static List<Integer> getReceiverStats() {
-    Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
-    GatewayReceiver receiver = gatewayReceivers.iterator().next();
-    CacheServerStats stats = ((CacheServerImpl) 
receiver.getServer()).getAcceptor().getStats();
-    assertTrue(stats instanceof GatewayReceiverStats);
-    GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats;
-    ArrayList<Integer> statsList = new ArrayList<>();
-    statsList.add(gatewayReceiverStats.getEventsReceived());
-    statsList.add(gatewayReceiverStats.getEventsRetried());
-    statsList.add(gatewayReceiverStats.getProcessBatchRequests());
-    statsList.add(gatewayReceiverStats.getDuplicateBatchesReceived());
-    statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived());
-    statsList.add(gatewayReceiverStats.getEarlyAcks());
-    statsList.add(gatewayReceiverStats.getExceptionsOccurred());
-    return statsList;
-  }
-
   public static void checkMinimumGatewayReceiverStats(int processBatches, int 
eventsReceived) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
     GatewayReceiver receiver = gatewayReceivers.iterator().next();
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index acebff8..2138342 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -474,200 +473,6 @@ public class ParallelWANStatsDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void 
testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
-      throws InterruptedException {
-    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
-
-    createCacheInVMs(nyPort, vm2);
-    createReceiverCustomerOrderShipmentPR(vm2);
-    createReceiverInVMs(vm2);
-
-    createCacheInVMs(lnPort, vm4, vm5);
-    createSenderCustomerOrderShipmentPRs(vm4);
-    createSenderCustomerOrderShipmentPRs(vm5);
-
-    int batchSize = 10;
-    vm4.invoke(
-        () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
true, null, false,
-            true));
-    vm5.invoke(
-        () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
true, null, false,
-            true));
-
-    int customers = 4;
-    int transactionsPerCustomer = 100;
-    // Each transaction will contain one order plus the following shipments
-    int shipmentsPerTransaction = batchSize;
-    AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, 
customers,
-        transactionsPerCustomer, shipmentsPerTransaction);
-
-    // wait for some batches to be distributed and then stop the sender
-    vm4.invoke(() -> await()
-        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
-
-    stopSenderInVMsAsync("ln", vm4, vm5);
-
-    // Wait for customer transactions to finish
-    inv1.await();
-    int orderEntries = transactionsPerCustomer * customers;
-    int shipmentEntries = orderEntries * shipmentsPerTransaction;
-    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
-    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
-    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
-    vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
-
-    checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(
-        shipmentsPerTransaction);
-
-    // Start sender to validate that queued events do not contain incomplete 
transactions after
-    // restart
-    startSenderInVMsAsync("ln", vm4, vm5);
-
-    
checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction);
-  }
-
-  @Test
-  public void 
testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()
-      throws InterruptedException {
-    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
-
-    createCacheInVMs(nyPort, vm2);
-    createReceiverCustomerOrderShipmentPR(vm2);
-    createReceiverInVMs(vm2);
-    vm2.invoke(WANTestBase::stopReceivers);
-
-    createCacheInVMs(lnPort, vm4, vm5);
-    createSenderCustomerOrderShipmentPRs(vm4);
-    createSenderCustomerOrderShipmentPRs(vm5);
-
-    int batchSize = 10;
-    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, false,
-        true));
-    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, false,
-        true));
-
-    int customers = 4;
-    int transactionsPerCustomer = 100;
-    // Each transaction will contain one order plus the following shipments
-    int shipmentsPerTransaction = batchSize;
-
-    AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, 
customers,
-        transactionsPerCustomer, shipmentsPerTransaction);
-
-    // wait for some batches to be redistributed and then stop the sender
-    vm4.invoke(() -> await()
-        .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
-
-    stopSenderInVMsAsync("ln", vm4, vm5);
-
-    // Wait for the customer transactions to finish
-    inv1.await();
-    int orderEntries = transactionsPerCustomer * customers;
-    int shipmentEntries = orderEntries * shipmentsPerTransaction;
-    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
-    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
-    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
-    vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
-
-    // Start receiver and sender
-    vm2.invoke(WANTestBase::startReceivers);
-    startSenderInVMsAsync("ln", vm4, vm5);
-
-    
checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction);
-  }
-
-  private AsyncInvocation<Void> asyncExecuteCustomerTransactions(VM vm, int 
customers,
-      int transactionsPerCustomer, int shipmentsPerTransaction) {
-    final Map<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
-    for (int custId = 0; custId < customers; custId++) {
-      for (int i = 0; i < transactionsPerCustomer; i++) {
-        CustId custIdObject = new CustId(custId);
-        OrderId orderId = new OrderId(i, custIdObject);
-        keyValuesInTransactions.put(orderId, new Order());
-        for (int j = 0; j < shipmentsPerTransaction; j++) {
-          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
-          keyValuesInTransactions.put(shipmentId, new Shipment());
-        }
-      }
-    }
-    int eventsPerTransaction = 1 + shipmentsPerTransaction;
-    return vm.invokeAsync(
-        () -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
-            eventsPerTransaction));
-  }
-
-  private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(
-      int shipmentsPerTransaction) {
-    waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction);
-
-    List<Integer> v4List =
-        vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    List<Integer> v5List =
-        vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-
-    // batches with incomplete transactions must be 0
-    assertEquals(0, (int) v4List.get(13));
-    assertEquals(0, (int) v5List.get(13));
-
-    // Check the entries replicated against the number of batches distributed
-    int batchesDistributed = v4List.get(4) + v5List.get(4);
-    checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, 
batchesDistributed);
-  }
-
-  private void checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(
-      int shipmentsPerTransaction) {
-    // Wait for sender queues to be drained
-    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
-    List<Integer> v4List =
-        vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v5List =
-        vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    assertEquals(0, v4List.get(0) + v5List.get(0));
-
-    // batches with incomplete transactions must be 0
-    assertEquals(0, (int) v4List.get(13));
-    assertEquals(0, (int) v5List.get(13));
-
-    waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction);
-
-    // Check the entries replicated against the number of batches distributed
-    int batchesDistributed = v4List.get(4) + v5List.get(4);
-    checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, 
batchesDistributed);
-  }
-
-  private void checkOnlyCompleteTransactionsAreReplicated(int 
shipmentsPerTransaction,
-      int batchesDistributed) {
-    // Only complete transactions (1 order + 10 shipments) must be replicated
-    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
-    int shipmentRegionSize = vm2.invoke(() -> 
getRegionSize(shipmentRegionName));
-    assertEquals(shipmentRegionSize, shipmentsPerTransaction * 
orderRegionSize);
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
batchesDistributed));
-    vm2.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName,
-        batchesDistributed * shipmentsPerTransaction));
-  }
-
-  private void waitForBatchesToBeAppliedInTheReceiver(int 
shipmentsPerTransaction) {
-    int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
-1)).get(4) +
-        vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4);
-
-    // Wait for all batches to be received by the receiver
-    vm2.invoke(() -> await()
-        .until(() -> WANTestBase.getReceiverStats().get(2) == 
batchesSentTotal));
-
-    // Wait for all orderEntries to be written by the receiver
-    vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
batchesSentTotal));
-
-    // Wait for all shipmentEntries to be written by the receiver
-    vm2.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName,
-        batchesSentTotal * shipmentsPerTransaction));
-  }
-
-  @Test
   public void 
testPRParallelPropagationWithGroupTransactionEventsWithIncompleteTransactions() 
{
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
@@ -703,8 +508,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> WANTestBase.validateRegionSize(testName, entries));
 
-    List<Integer> v4List =
-        vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
 
     // The number of batches will be 4 because each
     // dispatcher thread (there are 2) will send half the number of entries,
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index fc2a304..2cbdc35 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -21,10 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.RejectedExecutionException;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -54,7 +51,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void testReplicatedSerialPropagation() {
+  public void testReplicatedSerialPropagation() throws Exception {
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
@@ -214,7 +211,8 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void 
testReplicatedSerialPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions()
 {
+  public void 
testReplicatedSerialPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions()
+      throws Exception {
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
@@ -276,7 +274,8 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void 
testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions()
 {
+  public void 
testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions()
+      throws Exception {
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
@@ -349,201 +348,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void 
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
-      throws InterruptedException {
-    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
-
-    String regionName = testName + "_RR";
-
-    createCacheInVMs(nyPort, vm2);
-    createReceiverInVMs(vm2);
-    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, 
isOffHeap()));
-
-    createCacheInVMs(lnPort, vm4, vm5);
-    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
-    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
-
-    boolean groupTransactionEvents = true;
-    int batchSize = 10;
-    vm4.invoke(
-        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
-            groupTransactionEvents));
-    vm5.invoke(
-        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
-            groupTransactionEvents));
-
-    int eventsPerTransaction = batchSize + 1;
-    // The number of entries must be big enough so that not all entries
-    // are replicated before the sender is stopped and also divisible by 
eventsPerTransaction
-    int entries = eventsPerTransaction * 200;
-    // Execute some transactions
-    AsyncInvocation<Void> inv1 =
-        asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
-
-    // wait for batches to be distributed and then stop the sender
-    vm4.invoke(() -> await()
-        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
-
-    // These exceptions are ignored here because it could happen that when an 
event
-    // is to be handled, the sender is stopped. The sender, when stopped, 
shuts down
-    // the thread pool that would handle the event and this could provoke the 
exception.
-    addIgnoredException("Exception occurred in CacheListener");
-    addIgnoredException(RejectedExecutionException.class);
-
-    // Stop the sender
-    stopSenderInVMsAsync("ln", vm4, vm5);
-
-    // Wait for transactions to finish
-    inv1.await();
-    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
-    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
-
-    // Check
-    checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(regionName,
-        eventsPerTransaction);
-
-    // Start the sender
-    startSenderInVMsAsync("ln", vm4, vm5);
-
-    // Check
-    checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
-        eventsPerTransaction);
-  }
-
-  @Test
-  public void 
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()
-      throws InterruptedException {
-    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
-
-    String regionName = testName + "_RR";
-
-    createCacheInVMs(nyPort, vm2);
-    createReceiverInVMs(vm2);
-    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, 
isOffHeap()));
-    vm2.invoke(WANTestBase::stopReceivers);
-
-    createCacheInVMs(lnPort, vm4, vm5);
-    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
-    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
-
-    boolean groupTransactionEvents = true;
-    int batchSize = 10;
-    vm4.invoke(
-        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
-            groupTransactionEvents));
-    vm5.invoke(
-        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
-            groupTransactionEvents));
-
-    int eventsPerTransaction = batchSize + 1;
-    // The number of entries must be big enough so that not all entries
-    // are replicated before the sender is stopped and also divisible by 
eventsPerTransaction
-    int entries = eventsPerTransaction * 200;
-    // Execute some transactions
-    AsyncInvocation<Void> inv1 =
-        asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
-
-    // wait for batches to be redistributed and then stop the sender
-    vm4.invoke(() -> await()
-        .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
-
-    // Stop the sender
-    stopSenderInVMsAsync("ln", vm4, vm5);
-
-    // Wait for transactions to finish
-    inv1.await();
-    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
-    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
-
-    // Start the receiver and the sender
-    vm2.invoke(WANTestBase::startReceivers);
-    startSenderInVMsAsync("ln", vm4, vm5);
-
-    // Check
-    checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
-        eventsPerTransaction);
-  }
-
-  private void 
checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(String regionName,
-      int eventsPerTransaction) {
-    waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
-
-    List<Integer> v4List =
-        vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-    List<Integer> v5List =
-        vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
-
-    // batches with incomplete transactions must be 0
-    assertEquals(0, (int) v4List.get(13));
-    assertEquals(0, (int) v5List.get(13));
-
-    int batchesDistributed = v4List.get(4) + v5List.get(4);
-    checkOnlyCompleteTransactionsAreReplicated(regionName, 
eventsPerTransaction,
-        batchesDistributed);
-  }
-
-  private void 
checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(String 
regionName,
-      int eventsPerTransaction) {
-    // Wait for sender queues to be empty
-    List<Integer> v4List =
-        vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-    List<Integer> v5List =
-        vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
-
-    assertEquals(0, v4List.get(0) + v5List.get(0));
-
-    // batches with incomplete transactions must be 0
-    assertEquals(0, (int) v4List.get(13));
-    assertEquals(0, (int) v5List.get(13));
-
-    waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
-
-    int batchesDistributed = v4List.get(4) + v5List.get(4);
-    checkOnlyCompleteTransactionsAreReplicated(regionName, 
eventsPerTransaction,
-        batchesDistributed);
-  }
-
-  private void checkOnlyCompleteTransactionsAreReplicated(String regionName,
-      int eventsPerTransaction, int batchesDistributed) {
-    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
-
-    // The number of entries must be divisible by the number of events per 
transaction
-    assertEquals(0, regionSize % eventsPerTransaction);
-
-    // Check the entries replicated against the number of batches distributed
-    vm2.invoke(() -> WANTestBase.validateRegionSize(regionName,
-        batchesDistributed * eventsPerTransaction));
-  }
-
-  private AsyncInvocation<Void> asyncExecuteTransactions(String regionName,
-      int eventsPerTransaction, int entries) {
-    final Map<Object, Object> keyValues = new LinkedHashMap<>();
-    for (int i = 0; i < entries; i++) {
-      keyValues.put(i, i + "_Value");
-    }
-
-    return vm4.invokeAsync(
-        () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
-            eventsPerTransaction));
-  }
-
-  private void waitForBatchesToBeAppliedInTheReceiver(String regionName, int 
eventsPerTransaction) {
-    int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
-1)).get(4) +
-        vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4);
-
-    // Wait for all batches to be received by the sender
-    vm2.invoke(() -> await()
-        .until(() -> WANTestBase.getReceiverStats().get(2) == 
batchesSentTotal));
-
-    // Wait for all entries to be written by the receiver
-    vm2.invoke(
-        () -> WANTestBase.validateRegionSize(regionName, batchesSentTotal * 
eventsPerTransaction));
-  }
-
-  @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() {
+  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws 
Exception {
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
@@ -582,7 +387,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void testWANStatsTwoWanSites() {
+  public void testWANStatsTwoWanSites() throws Exception {
 
     Integer lnPort = createFirstLocatorWithDSId(1);
     Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
@@ -845,7 +650,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
    *
    */
   @Test
-  public void testReplicatedSerialPropagationWithRemoteRegionDestroy() {
+  public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws 
Exception {
     int numEntries = 2000;
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
@@ -917,7 +722,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void testSerialPropagationWithFilter() {
+  public void testSerialPropagationWithFilter() throws Exception {
 
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
@@ -959,7 +764,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
-  public void testSerialPropagationConflation() {
+  public void testSerialPropagationConflation() throws Exception {
     Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
index 78e36c1..5d9241a 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
@@ -110,7 +110,6 @@ public class ParallelGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
 
   @Override
   public void stop() {
-    preStop();
     this.getLifeCycleLock().writeLock().lock();
     try {
       if (!this.isRunning()) {
@@ -140,7 +139,6 @@ public class ParallelGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
       // Keep the eventProcessor around so we can ask it for the regionQueues 
later.
       // Tests expect to be able to do this.
     } finally {
-      postStop();
       this.getLifeCycleLock().writeLock().unlock();
     }
   }
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
index 2dfef04..13f6e96 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java
@@ -134,7 +134,6 @@ public class SerialGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
     if (logger.isDebugEnabled()) {
       logger.debug("Stopping Gateway Sender : {}", this);
     }
-    preStop();
     this.getLifeCycleLock().writeLock().lock();
     try {
       // Stop the dispatcher
@@ -150,7 +149,6 @@ public class SerialGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
 
       clearTempEventsAfterSenderStopped();
     } finally {
-      postStop();
       this.getLifeCycleLock().writeLock().unlock();
     }
 
diff --git 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java
 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java
index 84f2261..56c5b72 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.cache.wan.internal.parallel;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -26,8 +25,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.Statistics;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -56,15 +53,10 @@ public class ParallelGatewaySenderImplTest {
     attrs.setParallel(true);
     attrs.setId("sender");
     InternalDistributedSystem system = mock(InternalDistributedSystem.class);
-    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
-    Statistics stats = mock(Statistics.class);
     when(cache.getInternalDistributedSystem()).thenReturn(system);
     when(cache.getDistributedSystem()).thenReturn(system);
-    when(cache.getCancelCriterion()).thenReturn(cancelCriterion);
-    when(cancelCriterion.isCancelInProgress()).thenReturn(false);
     ClusterDistributionManager distributionManager = 
mock(ClusterDistributionManager.class);
     when(system.getDistributionManager()).thenReturn(distributionManager);
-    when(system.createAtomicStatistics(any(), any())).thenReturn(stats);
     when(distributionManager.getDistributedSystemId()).thenReturn(-1);
 
     DistributedLockService distributedLockService = 
mock(DistributedLockService.class);
@@ -96,39 +88,4 @@ public class ParallelGatewaySenderImplTest {
     RegionQueue queue = gatewaysender.getQueue();
     assertTrue(((ConcurrentParallelGatewaySenderQueue) 
queue).getCleanQueues());
   }
-
-  @Test
-  public void 
whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() {
-    attrs.setGroupTransactionEvents(true);
-    gatewaysender = new ParallelGatewaySenderImpl(cache, statisticsClock, 
attrs);
-    gatewaysender.start();
-
-    long start = System.currentTimeMillis();
-
-    Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime);
-    Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime);
-    t1.start();
-    t2.start();
-    try {
-      t1.join();
-      t2.join();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-
-    long finish = System.currentTimeMillis();
-    long timeElapsed = finish - start;
-    // Each call to preStop waits for 1 second but these waits execute in 
parallel
-    assertThat(timeElapsed).isGreaterThan(1000);
-
-    assertThat(gatewaysender.isRunning()).isEqualTo(false);
-  }
-
-  private void stopGatewaySenderAndCheckTime() {
-    long start = System.currentTimeMillis();
-    gatewaysender.stop();
-    long finish = System.currentTimeMillis();
-    long timeElapsed = finish - start;
-    assertThat(timeElapsed).isGreaterThan(1000);
-  }
 }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java
 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java
index cc6e2a1..d0e85a8 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java
@@ -81,11 +81,6 @@ public class SerialGatewaySenderImplTest {
   }
 
   private SerialGatewaySenderImpl createSerialGatewaySenderImplSpy() {
-    return createSerialGatewaySenderImplSpy(false);
-  }
-
-  private SerialGatewaySenderImpl createSerialGatewaySenderImplSpy(
-      boolean mustGroupTransactionEvents) {
     GatewaySenderAdvisor gatewaySenderAdvisor = 
mock(GatewaySenderAdvisor.class);
     when(gatewaySenderAdvisor.isPrimary()).thenReturn(true);
 
@@ -107,10 +102,6 @@ public class SerialGatewaySenderImplTest {
 
     doReturn(null).when(spySerialGatewaySender).getQueues();
 
-    if (mustGroupTransactionEvents) {
-      doReturn(true).when(spySerialGatewaySender).mustGroupTransactionEvents();
-    }
-
     return spySerialGatewaySender;
   }
 
@@ -141,37 +132,4 @@ public class SerialGatewaySenderImplTest {
     assertThat(serialGatewaySender.getEventProcessor()).isNull();
   }
 
-  @Test
-  public void 
whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() {
-    serialGatewaySender = createSerialGatewaySenderImplSpy(true);
-
-    long start = System.currentTimeMillis();
-
-    Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime);
-    Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime);
-    t1.start();
-    t2.start();
-    try {
-      t1.join();
-      t2.join();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-
-    long finish = System.currentTimeMillis();
-    long timeElapsed = finish - start;
-
-    // Each call to preStop waits for 1 second but these waits execute in 
parallel
-    assertThat(timeElapsed).isGreaterThan(1000);
-
-    assertThat(serialGatewaySender.getEventProcessor()).isNull();
-  }
-
-  private void stopGatewaySenderAndCheckTime() {
-    long start = System.currentTimeMillis();
-    serialGatewaySender.stop();
-    long finish = System.currentTimeMillis();
-    long timeElapsed = finish - start;
-    assertThat(timeElapsed).isGreaterThan(1000);
-  }
 }

Reply via email to