This is an automated email from the ASF dual-hosted git repository. ringles pushed a commit to branch GEODE-9375-Implement-ZRANGE-Radish-command in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9aa758f0c24a7bcaa0c4b2b76c64b6efbb2ddffc Author: Alberto Gomez <alberto.go...@est.tech> AuthorDate: Wed Jun 23 08:44:39 2021 +0200 Revert "GEODE-8971: Add grace period when stopping gateway sender with group-… (#6052)" (#6634) This reverts commit 841fa06c7b34916c09df920b7029974a78255cd0. The reason is that this commit creates problems with ongoing put operations during the grace period (very long response times) and also, if operations rate is high, it could prevent that the gateway sender is stopped. Put operations can be very slow during the grace period because they need to traverse the gateway sender queue to find events with the same transactionId. If the queue size is big and events are evicted to disk, the time to process a request can be unacceptably long. At the same time, this can provoke that the gateway sender is never stopped because it is blocked trying to get a write lock and the lock is held by ongoing operations for a long time. --- .../sortedset/ZRangeNativeRedisAcceptanceTest.java | 38 ++-- .../sortedset/AbstractZRangeIntegrationTest.java | 70 +++++++ .../executor/sortedset/ZRangeIntegrationTest.java} | 22 +-- .../geode/redis/internal/RedisCommandType.java | 3 + .../internal/collections/IndexibleTreeSet.java | 11 ++ .../internal/collections/OrderStatisticsSet.java | 10 + .../internal/collections/OrderStatisticsTree.java | 69 ++++++- .../redis/internal/data/NullRedisSortedSet.java | 5 + .../geode/redis/internal/data/RedisSortedSet.java | 48 ++++- .../RedisSortedSetCommandsFunctionExecutor.java | 6 + .../executor/sortedset/RedisSortedSetCommands.java | 2 + .../executor/sortedset/ZRangeExecutor.java | 49 +++++ .../collections/OrderStatisticsTreeTest.java | 22 +++ .../redis/internal/data/RedisSortedSetTest.java | 77 ++++++++ .../org/apache/geode/cache/wan/GatewaySender.java | 24 +-- .../geode/internal/cache/BucketRegionQueue.java | 41 +--- .../internal/cache/wan/AbstractGatewaySender.java | 57 ++---- .../wan/AbstractGatewaySenderEventProcessor.java | 25 +-- .../internal/cache/wan/GatewaySenderEventImpl.java | 5 +- ...currentParallelGatewaySenderEventProcessor.java | 23 +-- .../ParallelGatewaySenderEventProcessor.java | 22 +-- .../wan/parallel/ParallelGatewaySenderQueue.java | 115 ++++------- ...oncurrentSerialGatewaySenderEventProcessor.java | 34 ++-- .../serial/SerialGatewaySenderEventProcessor.java | 18 +- .../cache/wan/serial/SerialGatewaySenderQueue.java | 44 +---- .../internal/cache/BucketRegionQueueJUnitTest.java | 5 +- .../geode/internal/cache/wan/WANTestBase.java | 17 -- .../wan/parallel/ParallelWANStatsDUnitTest.java | 199 +------------------ .../cache/wan/serial/SerialWANStatsDUnitTest.java | 215 +-------------------- .../parallel/ParallelGatewaySenderImpl.java | 2 - .../internal/serial/SerialGatewaySenderImpl.java | 2 - .../parallel/ParallelGatewaySenderImplTest.java | 43 ----- .../serial/SerialGatewaySenderImplTest.java | 42 ---- 33 files changed, 523 insertions(+), 842 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewayQueueEvent.java b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeNativeRedisAcceptanceTest.java old mode 100644 new mode 100755 similarity index 52% rename from geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewayQueueEvent.java rename to geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeNativeRedisAcceptanceTest.java index 79ad857..6f663eb --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewayQueueEvent.java +++ b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeNativeRedisAcceptanceTest.java @@ -12,27 +12,25 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.geode.internal.cache.wan; +package org.apache.geode.redis.internal.executor.sortedset; -import org.apache.geode.cache.TransactionId; -import org.apache.geode.cache.wan.GatewayQueueEvent; +import org.junit.ClassRule; -/** - * Represents <code>Cache</code> events going through <code>GatewaySender</code>s. - * - * - * @since Geode 1.15 - */ -public interface InternalGatewayQueueEvent extends GatewayQueueEvent { - /** - * @return the transactionId to which the event belongs or null if the event does not belong - * to a transaction. - */ - TransactionId getTransactionId(); +import org.apache.geode.redis.NativeRedisClusterTestRule; + +public class ZRangeNativeRedisAcceptanceTest extends AbstractZRangeIntegrationTest { + + @ClassRule + public static NativeRedisClusterTestRule server = new NativeRedisClusterTestRule(); + + @Override + public int getPort() { + return server.getExposedPorts().get(0); + } + + @Override + public void flushAll() { + server.flushAll(); + } - /** - * @return true if the event is the last one in the transaction it belongs to or if the - * event does not belong to a transaction. false, otherwise. - */ - boolean isLastEventInTransaction(); } diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRangeIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRangeIntegrationTest.java new file mode 100755 index 0000000..62817a6 --- /dev/null +++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRangeIntegrationTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Protocol; + +import org.apache.geode.redis.RedisIntegrationTest; +import org.apache.geode.redis.internal.RedisConstants; + +public abstract class AbstractZRangeIntegrationTest implements RedisIntegrationTest { + private JedisCluster jedis; + private final String member = "member"; + private final String incrOption = "INCR"; + private final double initial = 355.681000005; + private final double increment = 9554257.921450001; + private final double expected = initial + increment; + + private static final String SORTED_SET_KEY = "ss_key"; + private static final int INITIAL_MEMBER_COUNT = 5; + + @Before + public void setUp() { + jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), REDIS_CLIENT_TIMEOUT); + } + + @After + public void tearDown() { + flushAll(); + jedis.close(); + } + + @Test + public void shouldError_givenWrongKeyType() { + final String STRING_KEY = "stringKey"; + jedis.set(STRING_KEY, "value"); + assertThatThrownBy( + () -> jedis.sendCommand(STRING_KEY, Protocol.Command.ZRANGE, STRING_KEY, "1", "2")) + .hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE); + } + + @Test + public void shouldReturnSyntaxError_givenWrongWithscoresFlag() { + jedis.zadd(SORTED_SET_KEY, 1.0, member); + assertThatThrownBy( + () -> jedis.sendCommand(SORTED_SET_KEY, Protocol.Command.ZRANGE, SORTED_SET_KEY, "1", "2", + "WITHSCOREZ")) + .hasMessageContaining(ERROR_SYNTAX); + } +} diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeIntegrationTest.java old mode 100644 new mode 100755 similarity index 65% copy from geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java copy to geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeIntegrationTest.java index 4e99a4b..a5579d5 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java +++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeIntegrationTest.java @@ -12,24 +12,20 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.redis.internal.executor.sortedset; -import java.util.List; - -import org.apache.geode.redis.internal.data.RedisKey; - -public interface RedisSortedSetCommands { - - Object zadd(RedisKey key, List<byte[]> scoresAndMembersToAdd, ZAddOptions options); +import org.junit.ClassRule; - byte[] zscore(RedisKey key, byte[] member); +import org.apache.geode.redis.GeodeRedisServerRule; - int zrank(RedisKey key, byte[] member); +public class ZRangeIntegrationTest extends AbstractZRangeIntegrationTest { - long zrem(RedisKey key, List<byte[]> membersToRemove); + @ClassRule + public static GeodeRedisServerRule server = new GeodeRedisServerRule(); - long zcard(RedisKey key); + @Override + public int getPort() { + return server.getPort(); + } - byte[] zincrby(RedisKey key, byte[] increment, byte[] member); } diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java index 9269132..5101de6 100755 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java @@ -97,6 +97,7 @@ import org.apache.geode.redis.internal.executor.set.SUnionStoreExecutor; import org.apache.geode.redis.internal.executor.sortedset.ZAddExecutor; import org.apache.geode.redis.internal.executor.sortedset.ZCardExecutor; import org.apache.geode.redis.internal.executor.sortedset.ZIncrByExecutor; +import org.apache.geode.redis.internal.executor.sortedset.ZRangeExecutor; import org.apache.geode.redis.internal.executor.sortedset.ZRankExecutor; import org.apache.geode.redis.internal.executor.sortedset.ZRemExecutor; import org.apache.geode.redis.internal.executor.sortedset.ZScoreExecutor; @@ -202,6 +203,8 @@ public enum RedisCommandType { ZADD(new ZAddExecutor(), SUPPORTED, new MinimumParameterRequirements(4)), ZCARD(new ZCardExecutor(), SUPPORTED, new ExactParameterRequirements(2)), ZINCRBY(new ZIncrByExecutor(), SUPPORTED, new ExactParameterRequirements(4)), + ZRANGE(new ZRangeExecutor(), SUPPORTED, new MinimumParameterRequirements(4) + .and(new MaximumParameterRequirements(5, ERROR_SYNTAX))), ZRANK(new ZRankExecutor(), SUPPORTED, new ExactParameterRequirements(3)), ZREM(new ZRemExecutor(), SUPPORTED, new MinimumParameterRequirements(3)), ZSCORE(new ZScoreExecutor(), SUPPORTED, new ExactParameterRequirements(3)), diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java index e8c9c42..d89e04a 100644 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/IndexibleTreeSet.java @@ -14,6 +14,7 @@ */ package org.apache.geode.redis.internal.collections; +import java.util.ArrayList; import java.util.Iterator; import java.util.TreeSet; @@ -23,6 +24,7 @@ import java.util.TreeSet; * Only for testing and performance comparisons. */ class IndexibleTreeSet<E> extends TreeSet<E> implements OrderStatisticsSet<E> { + private static final long serialVersionUID = 521865987126101683L; @Override @@ -47,4 +49,13 @@ class IndexibleTreeSet<E> extends TreeSet<E> implements OrderStatisticsSet<E> { public int indexOf(E element) { return headSet(element).size(); } + + @Override + public ArrayList<E> getIndexRange(int min, int max) { + return null; + } + + public int findMin(E element) { + return 0; + } } diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java index 0e09f0f..8dcb4df 100644 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsSet.java @@ -26,6 +26,7 @@ package org.apache.geode.redis.internal.collections; +import java.util.ArrayList; import java.util.Set; /** @@ -59,4 +60,13 @@ public interface OrderStatisticsSet<T> extends Set<T> { * in this set. */ int indexOf(T element); + + /** + * Returns a range of <code>elements</code> between min and max. + * + * @param min the minimum element. + * @param max the maximum element. + * @return an ArrayList of <code>elements</code>. + */ + ArrayList<T> getIndexRange(int min, int max); } diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java index 49ee9d7..b619136 100644 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/OrderStatisticsTree.java @@ -28,6 +28,7 @@ package org.apache.geode.redis.internal.collections; import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.ConcurrentModificationException; @@ -48,7 +49,6 @@ import org.apache.geode.annotations.VisibleForTesting; */ public class OrderStatisticsTree<E extends Comparable<? super E>> implements OrderStatisticsSet<E> { - private Node<E> root; private int size; private int modCount; @@ -205,6 +205,57 @@ public class OrderStatisticsTree<E extends Comparable<? super E>> return true; } + public ArrayList<E> getIndexRange(int min, int max) { + ArrayList<E> entryList = new ArrayList<>(); + Node<E> current = getNode(min); + for (int i = min; current != null && i <= max; i++) { + entryList.add(current.key); + current = successorOf(current); + } + return entryList; + } + + public Iterator<E> iterator(int index) { + Node<E> node = getNode(index); + return new TreeIterator(node); + } + + private Node<E> getNode(int index) { + checkIndex(index); + Node<E> node = root; + + while (true) { + if (index > node.count) { + index -= node.count + 1; + node = node.right; + } else if (index < node.count) { + node = node.left; + } else { + return node; + } + } + } + + private Node<E> findMinNode(E o) { + Node<E> x = root; + int cmp; + + while ((cmp = o.compareTo(x.key)) != 0) { + if (cmp < 0) { + if (x.left == null) { + break; + } + x = x.left; + } else { + if (x.right == null) { + break; + } + x = x.right; + } + } + return x; + } + @Override @SuppressWarnings("unchecked") public boolean contains(Object o) { @@ -466,6 +517,14 @@ public class OrderStatisticsTree<E extends Comparable<? super E>> return node; } + private Node<E> maximumNode(Node<E> node) { + while (node.right != null) { + node = node.right; + } + + return node; + } + private int height(Node<E> node) { return node == null ? -1 : node.height; } @@ -717,6 +776,14 @@ public class OrderStatisticsTree<E extends Comparable<? super E>> } } + public TreeIterator(Node<E> node) { + if (root == null) { + nextNode = null; + } else { + nextNode = node; + } + } + @Override public boolean hasNext() { return nextNode != null; diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java index 8cbf0a1..b771be4 100644 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java @@ -86,4 +86,9 @@ class NullRedisSortedSet extends RedisSortedSet { return -1; } + @Override + List<byte[]> zrange(byte[] min, byte[] max, + boolean withScores) { + return null; + } } diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java index 8a9faf4..4a0f6a7 100644 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java @@ -19,6 +19,7 @@ package org.apache.geode.redis.internal.data; import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_A_VALID_FLOAT; import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_SORTED_SET; import static org.apache.geode.redis.internal.netty.Coder.bytesToDouble; +import static org.apache.geode.redis.internal.netty.Coder.bytesToInt; import static org.apache.geode.redis.internal.netty.Coder.doubleToBytes; import java.io.DataInput; @@ -312,6 +313,41 @@ public class RedisSortedSet extends AbstractRedisData { return membersRemoved; } + List<byte[]> zrange(byte[] min, byte[] max, + boolean withScores) { + ArrayList<byte[]> result = new ArrayList<>(); + int start = getBoundedStartIndex(bytesToInt(min), getSortedSetSize()); + int end = getBoundedEndIndex(bytesToInt(max), getSortedSetSize()); + if (start > end || start == getSortedSetSize()) { + return result; + } + + ArrayList<OrderedSetEntry> entries = scoreSet.getIndexRange(start, end); + entries.forEach(entry -> { + result.add(entry.member); + if (withScores) { + result.add(entry.scoreBytes); + } + }); + return result; + } + + private int getBoundedStartIndex(int index, int size) { + if (index >= 0L) { + return Math.min(index, size); + } else { + return Math.max(index + size, 0); + } + } + + private int getBoundedEndIndex(long index, int size) { + if (index >= 0L) { + return (int) Math.min(index, size); + } else { + return (int) Math.max(index + size, -1); + } + } + synchronized byte[] memberRemove(byte[] member) { byte[] oldValue = null; OrderedSetEntry orderedSetEntry = members.remove(member); @@ -389,6 +425,8 @@ public class RedisSortedSet extends AbstractRedisData { } static class OrderedSetEntry implements Comparable<OrderedSetEntry> { + public static final byte[] MAX_BYTES = "RaAdIsH_SeCrEt_MaX_ByTeS_VaL".getBytes(); + private final byte[] member; private final byte[] scoreBytes; private final Double score; @@ -397,11 +435,19 @@ public class RedisSortedSet extends AbstractRedisData { return scoreBytes; } + public byte[] getMember() { + return member; + } + @Override public int compareTo(OrderedSetEntry o) { int comparison = score.compareTo(o.score); if (comparison == 0) { - // Scores equal, try lexical ordering + // MAX_BYTES always bigger than actual entry + if (member.equals(MAX_BYTES)) { + return -1; + } + // Scores equal, not MAX, try lexical ordering return javaImplementationOfAnsiCMemCmp(member, o.member); } return comparison; diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java index 3db7971..ae106d4 100644 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java @@ -41,6 +41,12 @@ public class RedisSortedSetCommandsFunctionExecutor extends RedisDataCommandsFun } @Override + public List<byte[]> zrange(RedisKey key, byte[] min, byte[] max, boolean withScores) { + return stripedExecute(key, + () -> getRedisSortedSet(key, false).zrange(min, max, withScores)); + } + + @Override public byte[] zscore(RedisKey key, byte[] member) { return stripedExecute(key, () -> getRedisSortedSet(key, true).zscore(member)); } diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java index 4e99a4b..268d760 100644 --- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java @@ -23,6 +23,8 @@ public interface RedisSortedSetCommands { Object zadd(RedisKey key, List<byte[]> scoresAndMembersToAdd, ZAddOptions options); + List<byte[]> zrange(RedisKey key, byte[] min, byte[] max, boolean withScores); + byte[] zscore(RedisKey key, byte[] member); int zrank(RedisKey key, byte[] member); diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeExecutor.java new file mode 100644 index 0000000..d583c92 --- /dev/null +++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRangeExecutor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.sortedset; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX; +import static org.apache.geode.redis.internal.netty.Coder.equalsIgnoreCaseBytes; + +import java.util.List; + +import org.apache.geode.redis.internal.executor.AbstractExecutor; +import org.apache.geode.redis.internal.executor.RedisResponse; +import org.apache.geode.redis.internal.netty.Command; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class ZRangeExecutor extends AbstractExecutor { + + @Override + public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { + boolean withScores = false; + RedisSortedSetCommands redisSortedSetCommands = context.getSortedSetCommands(); + List<byte[]> commandElements = command.getProcessedCommand(); + + byte[] min = commandElements.get(1); + byte[] max = commandElements.get(2); + if (commandElements.size() == 5) { + if (equalsIgnoreCaseBytes(commandElements.get(3), "WITHSCORES".getBytes())) { + withScores = true; + } else { + return RedisResponse.error(ERROR_SYNTAX); + } + } + + List<byte[]> retVal = redisSortedSetCommands.zrange(command.getKey(), min, max, withScores); + + return RedisResponse.array(retVal); + } +} diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java index 7e93a3b..7cfec39 100644 --- a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java +++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/OrderStatisticsTreeTest.java @@ -28,6 +28,7 @@ package org.apache.geode.redis.internal.collections; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.ConcurrentModificationException; @@ -569,4 +570,25 @@ public class OrderStatisticsTreeTest { assertThat(array2after).isSameAs(array2before); assertThat(array2after).isEqualTo(array1after); } + + @Test + public void testGetIndexRange() { + for (int i = 0; i < 100; ++i) { + tree.add(i); + } + ArrayList<Integer> subSet = tree.getIndexRange(0, 9); + assertThat(subSet.size()).isEqualTo(10); + assertThat(subSet).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + subSet = tree.getIndexRange(95, 110); + assertThat(subSet.size()).isEqualTo(5); + assertThat(subSet).containsExactly(95, 96, 97, 98, 99); + + subSet = tree.getIndexRange(45, 55); + assertThat(subSet.size()).isEqualTo(11); + assertThat(subSet).containsExactly(45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55); + + subSet = tree.getIndexRange(10, 0); + assertThat(subSet.size()).isEqualTo(0); + } } diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java index ba859de..9ab3b0b 100644 --- a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java +++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java @@ -18,9 +18,11 @@ package org.apache.geode.redis.internal.data; import static java.lang.Math.round; import static org.apache.geode.redis.internal.data.RedisSortedSet.BASE_REDIS_SORTED_SET_OVERHEAD; +import static org.apache.geode.redis.internal.netty.Coder.intToBytes; import static org.apache.geode.redis.internal.netty.Coder.stringToBytes; import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.in; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -32,6 +34,7 @@ import java.io.IOException; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; @@ -302,6 +305,80 @@ public class RedisSortedSetTest { assertThat(actual).isCloseTo(expected, offset); } + RedisSortedSet rangeSortedSet = + createRedisSortedSet( + "1.0", member1, "1.1", member2, "1.2", "member3", "1.3", "member4", + "1.4", "member5", "1.5", "member6", "1.6", "member7", "1.7", "member8", + "1.8", "member9", "1.9", "member10", "2.0", "member11", "2.1", "member12"); + + @Test + public void zrange_ShouldReturnEmptyList_GivenInvalidRanges() { + Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(5), intToBytes(0), false); + assertThat(rangeList).isEmpty(); + rangeList = rangeSortedSet.zrange(intToBytes(13), intToBytes(15), false); + assertThat(rangeList).isEmpty(); + rangeList = rangeSortedSet.zrange(intToBytes(17), intToBytes(-2), false); + assertThat(rangeList).isEmpty(); + rangeList = rangeSortedSet.zrange(intToBytes(12), intToBytes(12), false); + assertThat(rangeList).isEmpty(); + } + + + @Test + public void zrange_ShouldReturnSimpleRanges() { + Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(0), intToBytes(5), false); + assertThat(rangeList).hasSize(6); + assertThat(rangeList) + .containsExactly("member1".getBytes(), "member2".getBytes(), "member3".getBytes(), + "member4".getBytes(), "member5".getBytes(), "member6".getBytes()); + + rangeList = rangeSortedSet.zrange(intToBytes(5), intToBytes(10), false); + assertThat(rangeList).hasSize(6); + assertThat(rangeList) + .containsExactly("member6".getBytes(), "member7".getBytes(), "member8".getBytes(), + "member9".getBytes(), "member10".getBytes(), "member11".getBytes()); + + rangeList = rangeSortedSet.zrange(intToBytes(10), intToBytes(13), false); + assertThat(rangeList).hasSize(2); + assertThat(rangeList).containsExactly("member11".getBytes(), "member12".getBytes()); + } + + @Test + public void zrange_ShouldReturnRanges_SpecifiedWithNegativeOffsets() { + Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(-2), intToBytes(12), false); + assertThat(rangeList).hasSize(2); + assertThat(rangeList).containsExactly("member11".getBytes(), "member12".getBytes()); + + rangeList = rangeSortedSet.zrange(intToBytes(-6), intToBytes(-1), false); + assertThat(rangeList).hasSize(6); + assertThat(rangeList) + .containsExactly("member7".getBytes(), "member8".getBytes(), + "member9".getBytes(), "member10".getBytes(), "member11".getBytes(), + "member12".getBytes()); + + rangeList = rangeSortedSet.zrange(intToBytes(-11), intToBytes(-5), false); + assertThat(rangeList).hasSize(7); + assertThat(rangeList) + .containsExactly("member2".getBytes(), "member3".getBytes(), + "member4".getBytes(), "member5".getBytes(), "member6".getBytes(), + "member7".getBytes(), "member8".getBytes()); + + rangeList = rangeSortedSet.zrange(intToBytes(-12), intToBytes(-11), false); + assertThat(rangeList).hasSize(2); + assertThat(rangeList) + .containsExactly("member1".getBytes(), "member2".getBytes()); + } + + @Test + public void zrange_shouldAlsoReturnScores_whenWithScoresSpecified() { + Collection<byte[]> rangeList = rangeSortedSet.zrange(intToBytes(0), intToBytes(5), true); + assertThat(rangeList).hasSize(12); + assertThat(rangeList).containsExactly("member1".getBytes(), "1.0".getBytes(), + "member2".getBytes(), "1.1".getBytes(), "member3".getBytes(), "1.2".getBytes(), + "member4".getBytes(), "1.3".getBytes(), "member5".getBytes(), "1.4".getBytes(), + "member6".getBytes(), "1.5".getBytes()); + } + private RedisSortedSet createRedisSortedSet(String... membersAndScores) { final List<byte[]> membersAndScoresList = Arrays .stream(membersAndScores) diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java index 8e498c3..1f696f6 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java @@ -174,29 +174,7 @@ public interface GatewaySender { */ int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES = Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-retries", - 2); - /** - * Milliseconds to wait before retrying to get events for a transaction from the - * gateway sender queue when group-transaction-events is true. - */ - int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS = - Integer.getInteger( - GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-wait-time-ms", - 1); - - /** - * When group-transaction-events is true and the gateway sender is stopped, - * addition to the queue of a group of transaction events might be interrupted. - * To ensure that the queue does not contain incomplete transactions, this parameter - * allows for a grace period, specified in milliseconds, before the gateway sender is actually - * stopped, allowing complete transaction event groups to be queued. Any event received - * during the grace period that is not part of a transaction event group in the queue - * is dropped. - */ - int TIME_TO_COMPLETE_TRANSACTIONS_BEFORE_STOP_MS = - Integer.getInteger( - GeodeGlossary.GEMFIRE_PREFIX + "time-to-complete-transactions-before-stop-ms", - 1000); + 10); /** * The order policy. This enum is applicable only when concurrency-level is > 1. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index fd5b724..9069830 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -39,13 +39,13 @@ import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.TimeoutException; +import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.internal.cache.execute.BucketMovedException; import org.apache.geode.internal.cache.persistence.query.mock.ByteComparator; import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.cache.wan.parallel.BucketRegionQueueUnavailableException; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.concurrent.Atomics; @@ -471,25 +471,21 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { * If a matching object also fulfills the endPredicate then the method * stops looking for more matching objects. */ - public List<Object> getElementsMatching(Predicate<InternalGatewayQueueEvent> matchingPredicate, - Predicate<InternalGatewayQueueEvent> endPredicate) { + public List<Object> getElementsMatching(Predicate matchingPredicate, Predicate endPredicate) { getInitializationLock().readLock().lock(); try { if (this.getPartitionedRegion().isDestroyed()) { throw new BucketRegionQueueUnavailableException(); } - List<Object> elementsMatching = new ArrayList<>(); + List<Object> elementsMatching = new ArrayList<Object>(); Iterator<Object> it = this.eventSeqNumDeque.iterator(); while (it.hasNext()) { Object key = it.next(); - Object event = optimalGet(key); - if (!(event instanceof InternalGatewayQueueEvent)) { - continue; - } - if (matchingPredicate.test((InternalGatewayQueueEvent) event)) { - elementsMatching.add(event); + Object object = optimalGet(key); + if (matchingPredicate.test(object)) { + elementsMatching.add(object); this.eventSeqNumDeque.remove(key); - if (endPredicate.test((InternalGatewayQueueEvent) event)) { + if (endPredicate.test(object)) { break; } } @@ -500,27 +496,6 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } } - public boolean hasEventsMatching(Predicate<InternalGatewayQueueEvent> matchingPredicate) { - getInitializationLock().readLock().lock(); - try { - if (this.getPartitionedRegion().isDestroyed()) { - throw new BucketRegionQueueUnavailableException(); - } - for (Object o : eventSeqNumDeque) { - Object event = optimalGet(o); - if (!(event instanceof InternalGatewayQueueEvent)) { - continue; - } - if (matchingPredicate.test((InternalGatewayQueueEvent) event)) { - return true; - } - } - return false; - } finally { - getInitializationLock().readLock().unlock(); - } - } - @Override protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) { if (didPut) { @@ -694,7 +669,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } return eventSeqNumDeque.stream() .map(this::optimalGet) - .filter(o -> o instanceof InternalGatewayQueueEvent) + .filter(o -> o instanceof GatewayQueueEvent) .collect(Collectors.toList()); } finally { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 6760727..4fad440 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Predicate; import org.apache.logging.log4j.Logger; @@ -128,8 +127,6 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di protected boolean groupTransactionEvents; - protected volatile boolean isStopping = false; - protected boolean isForInternalUse; protected boolean isDiskSynchronous; @@ -726,21 +723,6 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di return enqueue; } - protected void preStop() { - isStopping = true; - if (mustGroupTransactionEvents()) { - try { - Thread.sleep(TIME_TO_COMPLETE_TRANSACTIONS_BEFORE_STOP_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - protected void postStop() { - isStopping = false; - } - protected void stopProcessing() { // Stop the dispatcher AbstractGatewaySenderEventProcessor ev = this.eventProcessor; @@ -1096,8 +1078,14 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di clonedEvent.setCallbackArgument(geCallbackArg); } + // If this gateway is not running, return if (!isRunning()) { - recordDroppedEvent(clonedEvent); + if (this.isPrimary()) { + recordDroppedEvent(clonedEvent); + } + if (isDebugEnabled) { + logger.debug("Returning back without putting into the gateway sender queue:" + event); + } return; } @@ -1124,7 +1112,12 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di // If this gateway is not running, return // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { - recordDroppedEvent(clonedEvent); + if (isDebugEnabled) { + logger.debug("Returning back without putting into the gateway sender queue:" + event); + } + if (this.isPrimary()) { + recordDroppedEvent(clonedEvent); + } return; } @@ -1144,21 +1137,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di // Get substitution value to enqueue if necessary Object substituteValue = getSubstituteValue(clonedEvent, operation); - Predicate<InternalGatewayQueueEvent> hasSameTransactionIdPredicate = null; - // In case the sender is about to be stopped, the event will only - // be queued if there is any event in the queue with the same - // transactionId as the one of this event - if (isStopping && mustGroupTransactionEvents() - && clonedEvent.getTransactionId() != null) { - hasSameTransactionIdPredicate = - x -> x != null && clonedEvent.getTransactionId() != null - && clonedEvent.getTransactionId() - .equals((x).getTransactionId()); - } - if (!ev.enqueueEvent(operation, clonedEvent, substituteValue, isLastEventInTransaction, - hasSameTransactionIdPredicate)) { - recordDroppedEvent(event); - } + ev.enqueueEvent(operation, clonedEvent, substituteValue, isLastEventInTransaction); } catch (CancelException e) { logger.debug("caught cancel exception", e); throw e; @@ -1184,9 +1163,6 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } private void recordDroppedEvent(EntryEventImpl event) { - if (!this.isPrimary()) { - return; - } if (this.eventProcessor != null) { this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } else { @@ -1195,9 +1171,6 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di logger.debug("added to tmpDroppedEvents event: {}", event); } } - if (logger.isDebugEnabled()) { - logger.debug("Returning without putting into the gateway sender queue:" + event); - } } @VisibleForTesting @@ -1594,7 +1567,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di try { logger.info("{}: Enqueueing synchronization event: {}", this, event); - this.eventProcessor.enqueueEvent(event, null); + this.eventProcessor.enqueueEvent(event); this.statistics.incSynchronizationEventsEnqueued(); } catch (Throwable t) { logger.warn(String.format( diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index ca22c1e..a02f929 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -26,7 +26,6 @@ import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; import org.apache.logging.log4j.Logger; @@ -176,25 +175,14 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread protected abstract void initializeMessageQueue(String id, boolean cleanQueues); - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) throws IOException, CacheException { - return enqueueEvent(operation, event, substituteValue, false, null); + enqueueEvent(operation, event, substituteValue, false); } - /** - * - * @param operation The operation - * @param event The event to be put in the queue - * @param substituteValue The substitute value - * @param isLastEventInTransaction True if this event is the last one in the - * transaction it belongs to - * @param condition If not null, the event will be enqueued only if at least - * one element in the queue matches the predicate - * @return False only if the condition is not null and no element in the queue matches it - */ - public abstract boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, - Object substituteValue, boolean isLastEventInTransaction, - Predicate<InternalGatewayQueueEvent> condition) throws IOException, CacheException; + public abstract void enqueueEvent(EnumListenerEvent operation, EntryEvent event, + Object substituteValue, boolean isLastEventInTransaction) throws IOException, CacheException; + protected abstract void rebalance(); @@ -1401,8 +1389,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread } } - protected abstract boolean enqueueEvent(GatewayQueueEvent event, - Predicate<InternalGatewayQueueEvent> condition); + protected abstract void enqueueEvent(GatewayQueueEvent event); protected class SenderStopperCallable implements Callable<Boolean> { private final AbstractGatewaySenderEventProcessor p; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 1626a5e..0a76c06 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -67,8 +67,7 @@ import org.apache.geode.internal.size.Sizeable; * */ public class GatewaySenderEventImpl - implements InternalGatewayQueueEvent, AsyncEvent, DataSerializableFixedID, Conflatable, - Sizeable, Releasable { + implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable { private static final long serialVersionUID = -5690172020872255422L; protected static final Object TOKEN_NULL = new Object(); @@ -1260,12 +1259,10 @@ public class GatewaySenderEventImpl return this.shadowKey; } - @Override public boolean isLastEventInTransaction() { return isLastEventInTransaction; } - @Override public TransactionId getTransactionId() { return transactionId; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index 4b8d641..0ef1fd3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.function.Predicate; import org.apache.logging.log4j.Logger; @@ -48,7 +47,6 @@ import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderException; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.monitoring.ThreadsMonitoring; import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -134,22 +132,22 @@ public class ConcurrentParallelGatewaySenderEventProcessor } @Override - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) throws IOException, CacheException { - return enqueueEvent(operation, event, substituteValue, false, null); + enqueueEvent(operation, event, substituteValue, false); } @Override - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, - boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> condition) - throws IOException, CacheException { + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + boolean isLastEventInTransaction) throws IOException, CacheException { + Region region = event.getRegion(); + // int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation)event); int bucketId = ((EntryEventImpl) event).getEventId().getBucketID(); if (bucketId < 0) { - return true; + return; } int pId = bucketId % this.nDispatcher; - return this.processors[pId].enqueueEvent(operation, event, substituteValue, - isLastEventInTransaction, condition); + this.processors[pId].enqueueEvent(operation, event, substituteValue, isLastEventInTransaction); } @Override @@ -351,10 +349,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor } @Override - protected boolean enqueueEvent(GatewayQueueEvent event, - Predicate<InternalGatewayQueueEvent> condition) { + protected void enqueueEvent(GatewayQueueEvent event) { int pId = ((GatewaySenderEventImpl) event).getBucketId() % this.nDispatcher; - return this.processors[pId].enqueueEvent(event, condition); + this.processors[pId].enqueueEvent(event); } private ThreadsMonitoring getThreadMonitorObj() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java index ab3a458..65567f0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.function.Predicate; import org.apache.logging.log4j.Logger; @@ -37,7 +36,6 @@ import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.monitoring.ThreadsMonitoring; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -99,10 +97,10 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv } @Override - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, - boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> condition) + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + boolean isLastEventInTransaction) throws IOException, CacheException { - GatewaySenderEventImpl gatewayQueueEvent; + GatewaySenderEventImpl gatewayQueueEvent = null; Region region = event.getRegion(); if (!(region instanceof DistributedRegion) && ((EntryEventImpl) event).getTailKey() == -1) { @@ -115,7 +113,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv "ParallelGatewaySenderEventProcessor not enqueing the following event since tailKey is not set. {}", event); } - return true; + return; } // TODO: Looks like for PDX region bucket id is set to -1. @@ -127,23 +125,16 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID(), isLastEventInTransaction); - return enqueueEvent(gatewayQueueEvent, condition); + enqueueEvent(gatewayQueueEvent); } @Override - protected boolean enqueueEvent(GatewayQueueEvent gatewayQueueEvent, - Predicate<InternalGatewayQueueEvent> condition) { + protected void enqueueEvent(GatewayQueueEvent gatewayQueueEvent) { boolean queuedEvent = false; try { if (getSender().beforeEnqueue(gatewayQueueEvent)) { long start = getSender().getStatistics().startTime(); try { - if (condition != null && - !((ParallelGatewaySenderQueue) this.queue).hasEventsMatching( - (GatewaySenderEventImpl) gatewayQueueEvent, - condition)) { - return false; - } queuedEvent = this.queue.put(gatewayQueueEvent); } catch (InterruptedException e) { e.printStackTrace(); @@ -161,7 +152,6 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv ((GatewaySenderEventImpl) gatewayQueueEvent).release(); } } - return true; } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 848ed80..d3f8868 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -17,7 +17,6 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.apache.geode.cache.Region.SEPARATOR; import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_SIZE; import static org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES; -import static org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS; import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE; import java.util.ArrayList; @@ -89,7 +88,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderException; import org.apache.geode.internal.cache.wan.GatewaySenderStats; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.size.SingleObjectSizer; import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.internal.util.concurrent.StoppableCondition; @@ -712,16 +710,44 @@ public class ParallelGatewaySenderQueue implements RegionQueue { boolean isDREvent = isDREvent(sender.getCache(), value); - String regionPath = getRegionPathForEventAndType(value, isDREvent); - if (regionPath == null) { + String regionPath = value.getRegionPath(); + if (!isDREvent) { + Region region = sender.getCache().getRegion(regionPath, true); + regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath(); + } + if (isDebugEnabled) { + logger.debug("Put is for the region {}", regionPath); + } + if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) { + if (isDebugEnabled) { + logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToShadowPRMap); + } + logger.warn( + "GatewaySender: Not queuing the event {}, as the region for which this event originated is not yet configured in the GatewaySender", + value); + // does not put into queue return false; } PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath); int bucketId = value.getBucketId(); - Object key = getKeyForEventAndType(value, isDREvent); - if (key == null) { - return false; + Object key = null; + if (!isDREvent) { + key = value.getShadowKey(); + + if ((Long) key == -1) { + // In case of parallel we don't expect + // the key to be not set. If it is the case then the event must be coming + // through listener, so return. + if (isDebugEnabled) { + logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {}", key, + value); + } + // does not put into queue + return false; + } + } else { + key = value.getEventId(); } if (isDebugEnabled) { @@ -853,49 +879,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return putDone; } - private String getRegionPathForEventAndType(GatewaySenderEventImpl event, boolean isDREvent) { - boolean isDebugEnabled = logger.isDebugEnabled(); - String regionPath = event.getRegionPath(); - if (!isDREvent) { - PartitionedRegion region = (PartitionedRegion) sender.getCache().getRegion(regionPath, true); - regionPath = ColocationHelper.getLeaderRegion(region).getFullPath(); - } - if (isDebugEnabled) { - logger.debug("Put is for region {}", regionPath); - } - if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) { - if (isDebugEnabled) { - logger.debug("The userRegionNameToShadowPRMap is {}", userRegionNameToShadowPRMap); - } - logger.warn( - "GatewaySender: Not queuing the event {}, as the region for which this event originated is not yet configured in the GatewaySender", - event); - return null; - } - return regionPath; - } - - private Object getKeyForEventAndType(GatewaySenderEventImpl event, boolean isDREvent) { - Object key; - if (!isDREvent) { - key = event.getShadowKey(); - - if ((Long) key == -1) { - // In the case of parallel we expect the key to be set. - // If the key is not set, then the event must be coming - // through listener, so return. - if (logger.isDebugEnabled()) { - logger.debug("ParallelGatewaySenderOrderedQueue not putting key {} : Value : {}", key, - event); - } - return null; - } - } else { - key = event.getEventId(); - } - return key; - } - public void notifyEventProcessorIfRequired() { // putter thread should not take lock every time if (isQueueEmpty) { @@ -1424,15 +1407,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { retries++ == GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES) { break; } - try { - Thread.sleep(GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } } if (incompleteTransactionIdsInBatch.size() > 0) { - logger.warn("Not able to retrieve all events for transactions: {} after {} retries of {}ms", - incompleteTransactionIdsInBatch, retries, GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS); + logger.warn("Not able to retrieve all events for transactions: {} after {} retries", + incompleteTransactionIdsInBatch, retries); stats.incBatchesWithIncompleteTransactions(); } } @@ -1604,9 +1582,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); try { - Predicate<InternalGatewayQueueEvent> hasTransactionIdPredicate = + Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = getHasTransactionIdPredicate(transactionId); - Predicate<InternalGatewayQueueEvent> isLastEventInTransactionPredicate = + Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = getIsLastEventInTransactionPredicate(); objects = brq.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); @@ -1620,12 +1598,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } @VisibleForTesting - public static Predicate<InternalGatewayQueueEvent> getIsLastEventInTransactionPredicate() { + public static Predicate<GatewaySenderEventImpl> getIsLastEventInTransactionPredicate() { return x -> x.isLastEventInTransaction(); } @VisibleForTesting - public static Predicate<InternalGatewayQueueEvent> getHasTransactionIdPredicate( + public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate( TransactionId transactionId) { return x -> transactionId.equals(x.getTransactionId()); } @@ -1832,21 +1810,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return regionName.substring(1, queueStringStart); } - public boolean hasEventsMatching(GatewaySenderEventImpl event, - Predicate<InternalGatewayQueueEvent> condition) { - boolean isDREvent = isDREvent(sender.getCache(), event); - - String regionPath = getRegionPathForEventAndType(event, isDREvent); - if (regionPath == null) { - return true; - } - PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath); - int bucketId = event.getBucketId(); - BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId); - - return brq.hasEventsMatching(condition); - } - // TODO:REF: Name for this class should be appropriate? private class BatchRemovalThread extends Thread { /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index aefd8cf..7adf996 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; @@ -47,7 +46,6 @@ import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderException; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.monitoring.ThreadsMonitoring; import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.logging.internal.executors.LoggingExecutors; @@ -111,22 +109,19 @@ public class ConcurrentSerialGatewaySenderEventProcessor // based on the fix for old wan Bug#46992 .revision is 39437 @Override - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) throws IOException, CacheException { - enqueueEvent(operation, event, substituteValue, false, null); - return true; + enqueueEvent(operation, event, substituteValue, false); } @Override - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, - boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> condition) - throws IOException, CacheException { + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + boolean isLastEventInTransaction) throws IOException, CacheException { // Get the appropriate index into the gateways int index = Math.abs(getHashCode(((EntryEventImpl) event)) % this.processors.size()); // Distribute the event to the gateway - return enqueueEvent(operation, event, substituteValue, index, isLastEventInTransaction, - condition); + enqueueEvent(operation, event, substituteValue, index, isLastEventInTransaction); } public void setModifiedEventId(EntryEventImpl clonedEvent, int index) { @@ -158,9 +153,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor clonedEvent.setEventId(newEventId); } - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, - int index, boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> condition) - throws CacheException, IOException { + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + int index, boolean isLastEventInTransaction) throws CacheException, IOException { // Get the appropriate gateway SerialGatewaySenderEventProcessor serialProcessor = this.processors.get(index); @@ -172,15 +166,15 @@ public class ConcurrentSerialGatewaySenderEventProcessor EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event); try { setModifiedEventId(clonedEvent, index); - return serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue, - isLastEventInTransaction, condition); + serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue, + isLastEventInTransaction); } finally { clonedEvent.release(); } } else { - return serialProcessor.enqueueEvent(operation, event, substituteValue, - isLastEventInTransaction, condition); + serialProcessor.enqueueEvent(operation, event, substituteValue); } + } @Override @@ -424,12 +418,10 @@ public class ConcurrentSerialGatewaySenderEventProcessor } @Override - protected boolean enqueueEvent(GatewayQueueEvent event, - Predicate<InternalGatewayQueueEvent> condition) { + protected void enqueueEvent(GatewayQueueEvent event) { for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { - serialProcessor.enqueueEvent(event, condition); + serialProcessor.enqueueEvent(event); } - return true; } protected ThreadsMonitoring getThreadMonitorObj() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 6a0d589..69142ce 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -25,7 +25,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; @@ -54,7 +53,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument; import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderStats; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.monitoring.ThreadsMonitoring; import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -397,9 +395,8 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven * Add the input object to the event queue */ @Override - public boolean enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, - boolean isLastEventInTransaction, Predicate<InternalGatewayQueueEvent> condition) - throws IOException, CacheException { + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + boolean isLastEventInTransaction) throws IOException, CacheException { // There is a case where the event is serialized for processing. The // region is not // serialized along with the event since it is a transient field. I @@ -408,12 +405,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // name is // used in the sendBatch method, and it can't be null. See EntryEventImpl // for details. - GatewaySenderEventImpl senderEvent; - - if (condition != null && - !((SerialGatewaySenderQueue) queue).hasEventsMatching(condition)) { - return false; - } + GatewaySenderEventImpl senderEvent = null; boolean isPrimary = sender.isPrimary(); if (!isPrimary) { @@ -468,7 +460,6 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven } } } - return true; } private boolean queuePrimaryEvent(GatewaySenderEventImpl gatewayEvent) @@ -881,8 +872,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven } @Override - protected boolean enqueueEvent(GatewayQueueEvent event, - Predicate<InternalGatewayQueueEvent> condition) { + protected void enqueueEvent(GatewayQueueEvent event) { // @TODO This API hasn't been implemented yet throw new UnsupportedOperationException(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index 0d322d0..6fb0331 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -16,7 +16,6 @@ package org.apache.geode.internal.cache.wan.serial; import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_SIZE; import static org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES; -import static org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS; import java.util.ArrayList; import java.util.Deque; @@ -75,7 +74,6 @@ import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderStats; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.offheap.OffHeapClearRequired; import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -501,15 +499,10 @@ public class SerialGatewaySenderQueue implements RegionQueue { retries++ == GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES) { break; } - try { - Thread.sleep(GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } } if (incompleteTransactionIdsInBatch.size() > 0) { - logger.warn("Not able to retrieve all events for transactions: {} after {} retries of {}ms", - incompleteTransactionIdsInBatch, retries, GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS); + logger.warn("Not able to retrieve all events for transactions: {} after {} retries", + incompleteTransactionIdsInBatch, retries); stats.incBatchesWithIncompleteTransactions(); } } @@ -840,9 +833,9 @@ public class SerialGatewaySenderQueue implements RegionQueue { private List<KeyAndEventPair> peekEventsWithTransactionId(TransactionId transactionId, long lastKey) { - Predicate<InternalGatewayQueueEvent> hasTransactionIdPredicate = + Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = x -> transactionId.equals(x.getTransactionId()); - Predicate<InternalGatewayQueueEvent> isLastEventInTransactionPredicate = + Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = x -> x.isLastEventInTransaction(); return getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate, @@ -854,10 +847,10 @@ public class SerialGatewaySenderQueue implements RegionQueue { * If a matching object also fulfills the endPredicate then the method * stops looking for more matching objects. */ - List<KeyAndEventPair> getElementsMatching(Predicate<InternalGatewayQueueEvent> condition, - Predicate<InternalGatewayQueueEvent> stopCondition, + List<KeyAndEventPair> getElementsMatching(Predicate<GatewaySenderEventImpl> condition, + Predicate<GatewaySenderEventImpl> stopCondition, long lastKey) { - InternalGatewayQueueEvent event; + GatewaySenderEventImpl event; List<KeyAndEventPair> elementsMatching = new ArrayList<>(); long currentKey = lastKey; @@ -866,13 +859,13 @@ public class SerialGatewaySenderQueue implements RegionQueue { if (extraPeekedIds.contains(currentKey)) { continue; } - event = (InternalGatewayQueueEvent) optimalGet(currentKey); + event = (GatewaySenderEventImpl) optimalGet(currentKey); if (event == null) { continue; } if (condition.test(event)) { - elementsMatching.add(new KeyAndEventPair(currentKey, (GatewaySenderEventImpl) event)); + elementsMatching.add(new KeyAndEventPair(currentKey, event)); if (stopCondition.test(event)) { break; @@ -883,25 +876,6 @@ public class SerialGatewaySenderQueue implements RegionQueue { return elementsMatching; } - public boolean hasEventsMatching(Predicate<InternalGatewayQueueEvent> condition) { - InternalGatewayQueueEvent event; - long currentKey = getHeadKey(); - if (currentKey == getTailKey()) { - return false; - } - while ((currentKey = inc(currentKey)) != getTailKey()) { - event = (InternalGatewayQueueEvent) optimalGet(currentKey); - - if (event == null) { - continue; - } - if (condition.test(event)) { - return true; - } - } - return false; - } - /** * Returns the value of the tail key. The tail key points to an empty where the next queue entry * will be stored. diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java index 8bcfa1d..530dec5 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java @@ -38,7 +38,6 @@ import org.apache.geode.distributed.DistributedMember; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.GatewaySenderStats; -import org.apache.geode.internal.cache.wan.InternalGatewayQueueEvent; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.geode.internal.statistics.DummyStatisticsFactory; @@ -169,9 +168,9 @@ public class BucketRegionQueueJUnitTest { this.bucketRegionQueue.addToQueue(7L, event6); this.bucketRegionQueue.addToQueue(8L, event7); - Predicate<InternalGatewayQueueEvent> hasTransactionIdPredicate = + Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1); - Predicate<InternalGatewayQueueEvent> isLastEventInTransactionPredicate = + Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate(); List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index ad49e9c..f31b46b 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -1320,23 +1320,6 @@ public class WANTestBase extends DistributedTestCase { assertEquals(creates, gatewayReceiverStats.getCreateRequest()); } - public static List<Integer> getReceiverStats() { - Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); - GatewayReceiver receiver = gatewayReceivers.iterator().next(); - CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats(); - assertTrue(stats instanceof GatewayReceiverStats); - GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats) stats; - ArrayList<Integer> statsList = new ArrayList<>(); - statsList.add(gatewayReceiverStats.getEventsReceived()); - statsList.add(gatewayReceiverStats.getEventsRetried()); - statsList.add(gatewayReceiverStats.getProcessBatchRequests()); - statsList.add(gatewayReceiverStats.getDuplicateBatchesReceived()); - statsList.add(gatewayReceiverStats.getOutoforderBatchesReceived()); - statsList.add(gatewayReceiverStats.getEarlyAcks()); - statsList.add(gatewayReceiverStats.getExceptionsOccurred()); - return statsList; - } - public static void checkMinimumGatewayReceiverStats(int processBatches, int eventsReceived) { Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); GatewayReceiver receiver = gatewayReceivers.iterator().next(); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java index acebff8..2138342 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -474,200 +473,6 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { } @Test - public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted() - throws InterruptedException { - Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - createCacheInVMs(nyPort, vm2); - createReceiverCustomerOrderShipmentPR(vm2); - createReceiverInVMs(vm2); - - createCacheInVMs(lnPort, vm4, vm5); - createSenderCustomerOrderShipmentPRs(vm4); - createSenderCustomerOrderShipmentPRs(vm5); - - int batchSize = 10; - vm4.invoke( - () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, true, null, false, - true)); - vm5.invoke( - () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, true, null, false, - true)); - - int customers = 4; - int transactionsPerCustomer = 100; - // Each transaction will contain one order plus the following shipments - int shipmentsPerTransaction = batchSize; - AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, customers, - transactionsPerCustomer, shipmentsPerTransaction); - - // wait for some batches to be distributed and then stop the sender - vm4.invoke(() -> await() - .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0)); - - stopSenderInVMsAsync("ln", vm4, vm5); - - // Wait for customer transactions to finish - inv1.await(); - int orderEntries = transactionsPerCustomer * customers; - int shipmentEntries = orderEntries * shipmentsPerTransaction; - vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); - vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); - vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); - vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); - - checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped( - shipmentsPerTransaction); - - // Start sender to validate that queued events do not contain incomplete transactions after - // restart - startSenderInVMsAsync("ln", vm4, vm5); - - checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction); - } - - @Test - public void testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped() - throws InterruptedException { - Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - createCacheInVMs(nyPort, vm2); - createReceiverCustomerOrderShipmentPR(vm2); - createReceiverInVMs(vm2); - vm2.invoke(WANTestBase::stopReceivers); - - createCacheInVMs(lnPort, vm4, vm5); - createSenderCustomerOrderShipmentPRs(vm4); - createSenderCustomerOrderShipmentPRs(vm5); - - int batchSize = 10; - vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false, - true)); - vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, false, - true)); - - int customers = 4; - int transactionsPerCustomer = 100; - // Each transaction will contain one order plus the following shipments - int shipmentsPerTransaction = batchSize; - - AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, customers, - transactionsPerCustomer, shipmentsPerTransaction); - - // wait for some batches to be redistributed and then stop the sender - vm4.invoke(() -> await() - .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0)); - - stopSenderInVMsAsync("ln", vm4, vm5); - - // Wait for the customer transactions to finish - inv1.await(); - int orderEntries = transactionsPerCustomer * customers; - int shipmentEntries = orderEntries * shipmentsPerTransaction; - vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); - vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, orderEntries)); - vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); - vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, shipmentEntries)); - - // Start receiver and sender - vm2.invoke(WANTestBase::startReceivers); - startSenderInVMsAsync("ln", vm4, vm5); - - checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction); - } - - private AsyncInvocation<Void> asyncExecuteCustomerTransactions(VM vm, int customers, - int transactionsPerCustomer, int shipmentsPerTransaction) { - final Map<Object, Object> keyValuesInTransactions = new LinkedHashMap<>(); - for (int custId = 0; custId < customers; custId++) { - for (int i = 0; i < transactionsPerCustomer; i++) { - CustId custIdObject = new CustId(custId); - OrderId orderId = new OrderId(i, custIdObject); - keyValuesInTransactions.put(orderId, new Order()); - for (int j = 0; j < shipmentsPerTransaction; j++) { - ShipmentId shipmentId = new ShipmentId(i + j, orderId); - keyValuesInTransactions.put(shipmentId, new Shipment()); - } - } - } - int eventsPerTransaction = 1 + shipmentsPerTransaction; - return vm.invokeAsync( - () -> WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions, - eventsPerTransaction)); - } - - private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped( - int shipmentsPerTransaction) { - waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction); - - List<Integer> v4List = - vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - List<Integer> v5List = - vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - - // batches with incomplete transactions must be 0 - assertEquals(0, (int) v4List.get(13)); - assertEquals(0, (int) v5List.get(13)); - - // Check the entries replicated against the number of batches distributed - int batchesDistributed = v4List.get(4) + v5List.get(4); - checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, batchesDistributed); - } - - private void checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted( - int shipmentsPerTransaction) { - // Wait for sender queues to be drained - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - List<Integer> v4List = - vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v5List = - vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - assertEquals(0, v4List.get(0) + v5List.get(0)); - - // batches with incomplete transactions must be 0 - assertEquals(0, (int) v4List.get(13)); - assertEquals(0, (int) v5List.get(13)); - - waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction); - - // Check the entries replicated against the number of batches distributed - int batchesDistributed = v4List.get(4) + v5List.get(4); - checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, batchesDistributed); - } - - private void checkOnlyCompleteTransactionsAreReplicated(int shipmentsPerTransaction, - int batchesDistributed) { - // Only complete transactions (1 order + 10 shipments) must be replicated - int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName)); - int shipmentRegionSize = vm2.invoke(() -> getRegionSize(shipmentRegionName)); - assertEquals(shipmentRegionSize, shipmentsPerTransaction * orderRegionSize); - - vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, batchesDistributed)); - vm2.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, - batchesDistributed * shipmentsPerTransaction)); - } - - private void waitForBatchesToBeAppliedInTheReceiver(int shipmentsPerTransaction) { - int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4) + - vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4); - - // Wait for all batches to be received by the receiver - vm2.invoke(() -> await() - .until(() -> WANTestBase.getReceiverStats().get(2) == batchesSentTotal)); - - // Wait for all orderEntries to be written by the receiver - vm2.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, batchesSentTotal)); - - // Wait for all shipmentEntries to be written by the receiver - vm2.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, - batchesSentTotal * shipmentsPerTransaction)); - } - - @Test public void testPRParallelPropagationWithGroupTransactionEventsWithIncompleteTransactions() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -703,8 +508,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.validateRegionSize(testName, entries)); - List<Integer> v4List = - vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); + ArrayList<Integer> v4List = + (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); // The number of batches will be 4 because each // dispatcher thread (there are 2) will send half the number of entries, diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java index fc2a304..2cbdc35 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java @@ -21,10 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.RejectedExecutionException; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -54,7 +51,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testReplicatedSerialPropagation() { + public void testReplicatedSerialPropagation() throws Exception { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -214,7 +211,8 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testReplicatedSerialPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() { + public void testReplicatedSerialPropagationWithBatchRedistWithoutGroupTransactionEventsSendsBatchesWithIncompleteTransactions() + throws Exception { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -276,7 +274,8 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() { + public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() + throws Exception { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -349,201 +348,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted() - throws InterruptedException { - Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - String regionName = testName + "_RR"; - - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); - - createCacheInVMs(lnPort, vm4, vm5); - vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); - vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); - - boolean groupTransactionEvents = true; - int batchSize = 10; - vm4.invoke( - () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, - groupTransactionEvents)); - vm5.invoke( - () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, - groupTransactionEvents)); - - int eventsPerTransaction = batchSize + 1; - // The number of entries must be big enough so that not all entries - // are replicated before the sender is stopped and also divisible by eventsPerTransaction - int entries = eventsPerTransaction * 200; - // Execute some transactions - AsyncInvocation<Void> inv1 = - asyncExecuteTransactions(regionName, eventsPerTransaction, entries); - - // wait for batches to be distributed and then stop the sender - vm4.invoke(() -> await() - .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0)); - - // These exceptions are ignored here because it could happen that when an event - // is to be handled, the sender is stopped. The sender, when stopped, shuts down - // the thread pool that would handle the event and this could provoke the exception. - addIgnoredException("Exception occurred in CacheListener"); - addIgnoredException(RejectedExecutionException.class); - - // Stop the sender - stopSenderInVMsAsync("ln", vm4, vm5); - - // Wait for transactions to finish - inv1.await(); - vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); - vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); - - // Check - checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(regionName, - eventsPerTransaction); - - // Start the sender - startSenderInVMsAsync("ln", vm4, vm5); - - // Check - checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName, - eventsPerTransaction); - } - - @Test - public void testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped() - throws InterruptedException { - Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - String regionName = testName + "_RR"; - - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); - vm2.invoke(WANTestBase::stopReceivers); - - createCacheInVMs(lnPort, vm4, vm5); - vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); - vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); - - boolean groupTransactionEvents = true; - int batchSize = 10; - vm4.invoke( - () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, - groupTransactionEvents)); - vm5.invoke( - () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, true, null, false, - groupTransactionEvents)); - - int eventsPerTransaction = batchSize + 1; - // The number of entries must be big enough so that not all entries - // are replicated before the sender is stopped and also divisible by eventsPerTransaction - int entries = eventsPerTransaction * 200; - // Execute some transactions - AsyncInvocation<Void> inv1 = - asyncExecuteTransactions(regionName, eventsPerTransaction, entries); - - // wait for batches to be redistributed and then stop the sender - vm4.invoke(() -> await() - .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0)); - - // Stop the sender - stopSenderInVMsAsync("ln", vm4, vm5); - - // Wait for transactions to finish - inv1.await(); - vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); - vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries)); - - // Start the receiver and the sender - vm2.invoke(WANTestBase::startReceivers); - startSenderInVMsAsync("ln", vm4, vm5); - - // Check - checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName, - eventsPerTransaction); - } - - private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(String regionName, - int eventsPerTransaction) { - waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction); - - List<Integer> v4List = - vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - List<Integer> v5List = - vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)); - - // batches with incomplete transactions must be 0 - assertEquals(0, (int) v4List.get(13)); - assertEquals(0, (int) v5List.get(13)); - - int batchesDistributed = v4List.get(4) + v5List.get(4); - checkOnlyCompleteTransactionsAreReplicated(regionName, eventsPerTransaction, - batchesDistributed); - } - - private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(String regionName, - int eventsPerTransaction) { - // Wait for sender queues to be empty - List<Integer> v4List = - vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - List<Integer> v5List = - vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - - assertEquals(0, v4List.get(0) + v5List.get(0)); - - // batches with incomplete transactions must be 0 - assertEquals(0, (int) v4List.get(13)); - assertEquals(0, (int) v5List.get(13)); - - waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction); - - int batchesDistributed = v4List.get(4) + v5List.get(4); - checkOnlyCompleteTransactionsAreReplicated(regionName, eventsPerTransaction, - batchesDistributed); - } - - private void checkOnlyCompleteTransactionsAreReplicated(String regionName, - int eventsPerTransaction, int batchesDistributed) { - int regionSize = vm2.invoke(() -> getRegionSize(regionName)); - - // The number of entries must be divisible by the number of events per transaction - assertEquals(0, regionSize % eventsPerTransaction); - - // Check the entries replicated against the number of batches distributed - vm2.invoke(() -> WANTestBase.validateRegionSize(regionName, - batchesDistributed * eventsPerTransaction)); - } - - private AsyncInvocation<Void> asyncExecuteTransactions(String regionName, - int eventsPerTransaction, int entries) { - final Map<Object, Object> keyValues = new LinkedHashMap<>(); - for (int i = 0; i < entries; i++) { - keyValues.put(i, i + "_Value"); - } - - return vm4.invokeAsync( - () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues, - eventsPerTransaction)); - } - - private void waitForBatchesToBeAppliedInTheReceiver(String regionName, int eventsPerTransaction) { - int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4) + - vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4); - - // Wait for all batches to be received by the sender - vm2.invoke(() -> await() - .until(() -> WANTestBase.getReceiverStats().get(2) == batchesSentTotal)); - - // Wait for all entries to be written by the receiver - vm2.invoke( - () -> WANTestBase.validateRegionSize(regionName, batchesSentTotal * eventsPerTransaction)); - } - - @Test - public void testReplicatedSerialPropagationWithMultipleDispatchers() { + public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -582,7 +387,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testWANStatsTwoWanSites() { + public void testWANStatsTwoWanSites() throws Exception { Integer lnPort = createFirstLocatorWithDSId(1); Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -845,7 +650,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { * */ @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy() { + public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { int numEntries = 2000; Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -917,7 +722,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testSerialPropagationWithFilter() { + public void testSerialPropagationWithFilter() throws Exception { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -959,7 +764,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testSerialPropagationConflation() { + public void testSerialPropagationConflation() throws Exception { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java index 78e36c1..5d9241a 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java @@ -110,7 +110,6 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender { @Override public void stop() { - preStop(); this.getLifeCycleLock().writeLock().lock(); try { if (!this.isRunning()) { @@ -140,7 +139,6 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender { // Keep the eventProcessor around so we can ask it for the regionQueues later. // Tests expect to be able to do this. } finally { - postStop(); this.getLifeCycleLock().writeLock().unlock(); } } diff --git a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java index 2dfef04..13f6e96 100644 --- a/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImpl.java @@ -134,7 +134,6 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { if (logger.isDebugEnabled()) { logger.debug("Stopping Gateway Sender : {}", this); } - preStop(); this.getLifeCycleLock().writeLock().lock(); try { // Stop the dispatcher @@ -150,7 +149,6 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { clearTempEventsAfterSenderStopped(); } finally { - postStop(); this.getLifeCycleLock().writeLock().unlock(); } diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java index 84f2261..56c5b72 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImplTest.java @@ -14,7 +14,6 @@ */ package org.apache.geode.cache.wan.internal.parallel; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -26,8 +25,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.CancelCriterion; -import org.apache.geode.Statistics; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -56,15 +53,10 @@ public class ParallelGatewaySenderImplTest { attrs.setParallel(true); attrs.setId("sender"); InternalDistributedSystem system = mock(InternalDistributedSystem.class); - CancelCriterion cancelCriterion = mock(CancelCriterion.class); - Statistics stats = mock(Statistics.class); when(cache.getInternalDistributedSystem()).thenReturn(system); when(cache.getDistributedSystem()).thenReturn(system); - when(cache.getCancelCriterion()).thenReturn(cancelCriterion); - when(cancelCriterion.isCancelInProgress()).thenReturn(false); ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); when(system.getDistributionManager()).thenReturn(distributionManager); - when(system.createAtomicStatistics(any(), any())).thenReturn(stats); when(distributionManager.getDistributedSystemId()).thenReturn(-1); DistributedLockService distributedLockService = mock(DistributedLockService.class); @@ -96,39 +88,4 @@ public class ParallelGatewaySenderImplTest { RegionQueue queue = gatewaysender.getQueue(); assertTrue(((ConcurrentParallelGatewaySenderQueue) queue).getCleanQueues()); } - - @Test - public void whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() { - attrs.setGroupTransactionEvents(true); - gatewaysender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs); - gatewaysender.start(); - - long start = System.currentTimeMillis(); - - Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime); - Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime); - t1.start(); - t2.start(); - try { - t1.join(); - t2.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - long finish = System.currentTimeMillis(); - long timeElapsed = finish - start; - // Each call to preStop waits for 1 second but these waits execute in parallel - assertThat(timeElapsed).isGreaterThan(1000); - - assertThat(gatewaysender.isRunning()).isEqualTo(false); - } - - private void stopGatewaySenderAndCheckTime() { - long start = System.currentTimeMillis(); - gatewaysender.stop(); - long finish = System.currentTimeMillis(); - long timeElapsed = finish - start; - assertThat(timeElapsed).isGreaterThan(1000); - } } diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java index cc6e2a1..d0e85a8 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/serial/SerialGatewaySenderImplTest.java @@ -81,11 +81,6 @@ public class SerialGatewaySenderImplTest { } private SerialGatewaySenderImpl createSerialGatewaySenderImplSpy() { - return createSerialGatewaySenderImplSpy(false); - } - - private SerialGatewaySenderImpl createSerialGatewaySenderImplSpy( - boolean mustGroupTransactionEvents) { GatewaySenderAdvisor gatewaySenderAdvisor = mock(GatewaySenderAdvisor.class); when(gatewaySenderAdvisor.isPrimary()).thenReturn(true); @@ -107,10 +102,6 @@ public class SerialGatewaySenderImplTest { doReturn(null).when(spySerialGatewaySender).getQueues(); - if (mustGroupTransactionEvents) { - doReturn(true).when(spySerialGatewaySender).mustGroupTransactionEvents(); - } - return spySerialGatewaySender; } @@ -141,37 +132,4 @@ public class SerialGatewaySenderImplTest { assertThat(serialGatewaySender.getEventProcessor()).isNull(); } - @Test - public void whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() { - serialGatewaySender = createSerialGatewaySenderImplSpy(true); - - long start = System.currentTimeMillis(); - - Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime); - Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime); - t1.start(); - t2.start(); - try { - t1.join(); - t2.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - long finish = System.currentTimeMillis(); - long timeElapsed = finish - start; - - // Each call to preStop waits for 1 second but these waits execute in parallel - assertThat(timeElapsed).isGreaterThan(1000); - - assertThat(serialGatewaySender.getEventProcessor()).isNull(); - } - - private void stopGatewaySenderAndCheckTime() { - long start = System.currentTimeMillis(); - serialGatewaySender.stop(); - long finish = System.currentTimeMillis(); - long timeElapsed = finish - start; - assertThat(timeElapsed).isGreaterThan(1000); - } }