This is an automated email from the ASF dual-hosted git repository. upthewaterspout pushed a commit to branch feature/redis-performance-testing in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3df53f9b56898bb0d5f538425f0de725fea67806 Author: Jens Deppe <[email protected]> AuthorDate: Wed Feb 17 11:57:30 2021 -0800 Fix correct reporting of hosts and ports for SLOT response --- .../cluster/RedisPartitionResolverDUnitTest.java | 23 ++++++++++- .../geode/redis/internal/GeodeRedisServer.java | 3 ++ .../geode/redis/internal/RedisCommandType.java | 2 +- .../internal/cluster/BucketRetrievalFunction.java | 47 ++++++++++++++++++---- .../internal/executor/cluster/ClusterExecutor.java | 31 ++++++++++---- 5 files changed, 89 insertions(+), 17 deletions(-) diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java index 3304932..e4df831 100644 --- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java +++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java @@ -17,7 +17,6 @@ package org.apache.geode.redis.internal.cluster; import static org.assertj.core.api.Assertions.assertThat; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -58,6 +57,7 @@ public class RedisPartitionResolverDUnitTest { private static int redisServerPort1; private static int redisServerPort2; + private static int redisServerPort3; @BeforeClass public static void classSetup() { @@ -68,6 +68,7 @@ public class RedisPartitionResolverDUnitTest { redisServerPort1 = cluster.getRedisPort(1); redisServerPort2 = cluster.getRedisPort(2); + redisServerPort3 = cluster.getRedisPort(3); jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT); } @@ -99,8 +100,28 @@ public class RedisPartitionResolverDUnitTest { assertThat(buckets1.size() + buckets2.size() + buckets3.size()) .isEqualTo(RegionProvider.REDIS_REGION_BUCKETS); + } + @Test + public void testClusterSlotsReferencesAllServers() { + int numKeys = 1000; + for (int i = 0; i < numKeys; i++) { + String key = "key-" + i; + jedis1.set(key, "value-" + i); + } + + List<Object> clusterSlots = jedis1.clusterSlots(); + + assertThat(clusterSlots).hasSize(RegionProvider.REDIS_REGION_BUCKETS); + + // Gather all unique ports + Set<Long> ports = new HashSet<>(); + for (Object slotObj : clusterSlots) { + ports.add((Long) (((List<Object>) ((List<Object>) slotObj).get(2))).get(1)); + } + assertThat(ports).containsExactlyInAnyOrder((long) redisServerPort1, (long) redisServerPort2, + (long) redisServerPort3); } private Map<ByteArrayWrapper, Integer> getKeyToBucketMap(MemberVM vm) { diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java index 06fa6d6..880fee2 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java @@ -27,6 +27,7 @@ import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.internal.statistics.StatisticsClockFactory; import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction; import org.apache.geode.redis.internal.executor.CommandFunction; import org.apache.geode.redis.internal.executor.StripedExecutor; import org.apache.geode.redis.internal.executor.SynchronizedStripedExecutor; @@ -102,6 +103,8 @@ public class GeodeRedisServer { regionProvider, pubSub, this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats, redisCommandExecutor); + + BucketRetrievalFunction.register(nettyRedisServer.getPort()); } @VisibleForTesting diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java index 33ca297..74ac471 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java @@ -277,7 +277,7 @@ public enum RedisCommandType { SLOWLOG(new SlowlogExecutor(), UNSUPPORTED, new SlowlogParameterRequirements()), TIME(new TimeExecutor(), UNSUPPORTED, new ExactParameterRequirements(1)), - /*********** CLUSTER **********/ + /*********** CLUSTER **********/ CLUSTER(new ClusterExecutor(), UNSUPPORTED, new MinimumParameterRequirements(1)), /////////// UNIMPLEMENTED ///////////////////// diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java index ed9144c..48c4fb6 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java @@ -19,16 +19,21 @@ import java.io.Serializable; import java.net.InetAddress; import java.util.Set; -import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.Region; import org.apache.geode.cache.execute.FunctionContext; -import org.apache.geode.cache.execute.RegionFunctionContext; +import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.internal.cache.LocalDataSet; +import org.apache.geode.internal.cache.execute.InternalFunction; import org.apache.geode.internal.inet.LocalHostUtil; +import org.apache.geode.redis.internal.RegionProvider; +import org.apache.geode.redis.internal.data.ByteArrayWrapper; -public class BucketRetrievalFunction implements Function<Void> { +public class BucketRetrievalFunction implements InternalFunction<Void> { + public static final String ID = "REDIS_BUCKET_SLOT_FUNCTION"; private static final String hostAddress; + private final int redisPort; static { InetAddress localhost = null; @@ -40,28 +45,56 @@ public class BucketRetrievalFunction implements Function<Void> { hostAddress = localhost == null ? "localhost" : localhost.getHostAddress(); } + public BucketRetrievalFunction(int redisPort) { + this.redisPort = redisPort; + } + + public static void register(int redisPort) { + FunctionService.registerFunction(new BucketRetrievalFunction(redisPort)); + } + @Override public void execute(FunctionContext<Void> context) { - LocalDataSet local = (LocalDataSet) PartitionRegionHelper - .getLocalDataForContext((RegionFunctionContext) context); + Region<ByteArrayWrapper, ByteArrayWrapper> region = + context.getCache().getRegion(RegionProvider.REDIS_DATA_REGION); + + LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region); - MemberBuckets mb = new MemberBuckets(hostAddress, local.getBucketSet()); + MemberBuckets mb = + new MemberBuckets(context.getMemberName(), hostAddress, redisPort, local.getBucketSet()); context.getResultSender().lastResult(mb); } + @Override + public String getId() { + return ID; + } + public static class MemberBuckets implements Serializable { + private final String memberId; private final String hostAddress; + private final int port; private final Set<Integer> bucketIds; - public MemberBuckets(String hostAddress, Set<Integer> bucketIds) { + public MemberBuckets(String memberId, String hostAddress, int port, Set<Integer> bucketIds) { + this.memberId = memberId; this.hostAddress = hostAddress; + this.port = port; this.bucketIds = bucketIds; } + public String getMemberId() { + return memberId; + } + public String getHostAddress() { return hostAddress; } + public int getPort() { + return port; + } + public Set<Integer> getBucketIds() { return bucketIds; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java index 7fac868..f7e8315 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java @@ -22,18 +22,27 @@ import static org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.Me import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.Logger; import org.apache.geode.cache.Region; -import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; +import org.apache.geode.cache.partition.PartitionMemberInfo; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.partition.PartitionRegionInfo; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction; +import org.apache.geode.redis.internal.data.ByteArrayWrapper; +import org.apache.geode.redis.internal.data.RedisData; import org.apache.geode.redis.internal.executor.AbstractExecutor; import org.apache.geode.redis.internal.executor.RedisResponse; import org.apache.geode.redis.internal.netty.Command; @@ -68,20 +77,25 @@ public class ClusterExecutor extends AbstractExecutor { return response; } - // @SuppressWarnings("unchecked") + @SuppressWarnings("unchecked") private RedisResponse getSlots(ExecutionHandlerContext ctx) { - Region<?, ?> region = ctx.getRegionProvider().getDataRegion(); + Region<ByteArrayWrapper, RedisData> dataRegion = ctx.getRegionProvider().getDataRegion(); + PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(dataRegion); + Set<DistributedMember> membersWithDataRegion = new HashSet<>(); + for (PartitionMemberInfo memberInfo : info.getPartitionMemberInfo()) { + membersWithDataRegion.add(memberInfo.getDistributedMember()); + } - Execution<Void, MemberBuckets, List<MemberBuckets>> execution = - FunctionService.onRegion(region); ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector = - execution.execute(new BucketRetrievalFunction()); + FunctionService.onMembers(membersWithDataRegion).execute(BucketRetrievalFunction.ID); SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>(); + Map<String, Pair<String, Integer>> memberToHostPortMap = new TreeMap<>(); int retrievedBucketCount = 0; for (MemberBuckets m : resultCollector.getResult()) { + memberToHostPortMap.put(m.getMemberId(), Pair.of(m.getHostAddress(), m.getPort())); for (Integer id : m.getBucketIds()) { - bucketToMemberMap.put(id, m.getHostAddress()); + bucketToMemberMap.put(id, m.getMemberId()); retrievedBucketCount++; } } @@ -96,10 +110,11 @@ public class ClusterExecutor extends AbstractExecutor { List<Object> slots = new ArrayList<>(); for (String member : bucketToMemberMap.values()) { + Pair<String, Integer> hostAndPort = memberToHostPortMap.get(member); List<?> entry = Arrays.asList( index * slotsPerBucket, ((index + 1) * slotsPerBucket) - 1, - Arrays.asList(member, ctx.getServerPort())); + Arrays.asList(hostAndPort.getLeft(), hostAndPort.getRight())); slots.add(entry); index++;
