This is an automated email from the ASF dual-hosted git repository. upthewaterspout pushed a commit to branch feature/redis-performance-testing in repository https://gitbox.apache.org/repos/asf/geode.git
commit c48fdc50e64bbfdbcb44d8e69b7ccafc6bd03be7 Author: Jens Deppe <[email protected]> AuthorDate: Thu Feb 25 15:37:48 2021 -0800 Add secondaries to CLUSTER SLOTS output --- geode-redis/build.gradle | 1 + .../internal/cluster/BucketRetrievalFunction.java | 42 +++++++++++++++++----- .../internal/executor/cluster/ClusterExecutor.java | 39 +++++++++++++------- 3 files changed, 62 insertions(+), 20 deletions(-) diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle index 11b61f9..8d0eb70 100644 --- a/geode-redis/build.gradle +++ b/geode-redis/build.gradle @@ -31,6 +31,7 @@ facets { dependencies { compile(platform(project(':boms:geode-all-bom'))) implementation(project(':geode-serialization')) + implementation(project(':geode-membership')) implementation(project(':geode-logging')) implementation(project(':geode-core')) implementation(project(':geode-gfsh')) diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java index 5faf700..4605d33 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java @@ -17,14 +17,21 @@ package org.apache.geode.redis.internal.cluster; import java.io.Serializable; import java.net.InetAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.geode.cache.Region; import org.apache.geode.cache.execute.FunctionContext; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.LocalDataSet; +import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.execute.InternalFunction; +import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.inet.LocalHostUtil; import org.apache.geode.redis.internal.RegionProvider; import org.apache.geode.redis.internal.data.ByteArrayWrapper; @@ -60,10 +67,22 @@ public class BucketRetrievalFunction implements InternalFunction<Void> { Region<RedisKey, ByteArrayWrapper> region = context.getCache().getRegion(RegionProvider.REDIS_DATA_REGION); - LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region); + String memberId = + context.getCache().getDistributedSystem().getDistributedMember().getUniqueId(); + LocalDataSet localPrimary = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region); + RegionAdvisor advisor = ((PartitionedRegion) region).getRegionAdvisor(); + Map<Integer, List<String>> bucketToSecondaries = new HashMap<>(); + + for (Integer bucketId : localPrimary.getBucketSet()) { + List<String> a = advisor.getBucketOwners(bucketId).stream() + .map(InternalDistributedMember::getId) + .filter(x -> !x.equals(memberId)) + .collect(Collectors.toList()); + bucketToSecondaries.put(bucketId, a); + } - MemberBuckets mb = - new MemberBuckets(context.getMemberName(), hostAddress, redisPort, local.getBucketSet()); + MemberBuckets mb = new MemberBuckets(memberId, hostAddress, redisPort, + localPrimary.getBucketSet(), bucketToSecondaries); context.getResultSender().lastResult(mb); } @@ -76,13 +95,16 @@ public class BucketRetrievalFunction implements InternalFunction<Void> { private final String memberId; private final String hostAddress; private final int port; - private final Set<Integer> bucketIds; + private final Set<Integer> primaryBucketIds; + private final Map<Integer, List<String>> secondaryBucketMembers; - public MemberBuckets(String memberId, String hostAddress, int port, Set<Integer> bucketIds) { + public MemberBuckets(String memberId, String hostAddress, int port, + Set<Integer> primaryBucketIds, Map<Integer, List<String>> secondaryBucketIds) { this.memberId = memberId; this.hostAddress = hostAddress; this.port = port; - this.bucketIds = bucketIds; + this.primaryBucketIds = primaryBucketIds; + this.secondaryBucketMembers = secondaryBucketIds; } public String getMemberId() { @@ -97,8 +119,12 @@ public class BucketRetrievalFunction implements InternalFunction<Void> { return port; } - public Set<Integer> getBucketIds() { - return bucketIds; + public Set<Integer> getPrimaryBucketIds() { + return primaryBucketIds; + } + + public Map<Integer, List<String>> getSecondaryBucketMembers() { + return secondaryBucketMembers; } } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java index be2f708..3dde5aa 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java @@ -17,16 +17,16 @@ package org.apache.geode.redis.internal.executor.cluster; import static org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_COMMAND; import static org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS; -import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS; +import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET; import static org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.MemberBuckets; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedMap; import java.util.TreeMap; import org.apache.commons.lang3.tuple.Pair; @@ -94,15 +94,21 @@ public class ClusterExecutor extends AbstractExecutor { ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector = FunctionService.onMembers(membersWithDataRegion).execute(BucketRetrievalFunction.ID); - SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>(); + Map<Integer, String> primaryBucketToMemberMap = new HashMap<>(); + Map<Integer, List<String>> secondaryBucketToMemberMap = new HashMap<>(); Map<String, Pair<String, Integer>> memberToHostPortMap = new TreeMap<>(); int retrievedBucketCount = 0; + for (MemberBuckets m : resultCollector.getResult()) { memberToHostPortMap.put(m.getMemberId(), Pair.of(m.getHostAddress(), m.getPort())); - for (Integer id : m.getBucketIds()) { - bucketToMemberMap.put(id, m.getMemberId()); + for (Integer id : m.getPrimaryBucketIds()) { + primaryBucketToMemberMap.put(id, m.getMemberId()); retrievedBucketCount++; } + + for (Map.Entry<Integer, List<String>> entry : m.getSecondaryBucketMembers().entrySet()) { + secondaryBucketToMemberMap.put(entry.getKey(), entry.getValue()); + } } if (retrievedBucketCount != REDIS_REGION_BUCKETS) { @@ -110,16 +116,25 @@ public class ClusterExecutor extends AbstractExecutor { + " != " + REDIS_REGION_BUCKETS); } - int slotsPerBucket = REDIS_SLOTS / REDIS_REGION_BUCKETS; int index = 0; List<Object> slots = new ArrayList<>(); - for (String member : bucketToMemberMap.values()) { - Pair<String, Integer> hostAndPort = memberToHostPortMap.get(member); - List<?> entry = Arrays.asList( - index * slotsPerBucket, - ((index + 1) * slotsPerBucket) - 1, - Arrays.asList(hostAndPort.getLeft(), hostAndPort.getRight())); + for (int i = 0; i < REDIS_REGION_BUCKETS; i++) { + Pair<String, Integer> primaryHostAndPort = + memberToHostPortMap.get(primaryBucketToMemberMap.get(i)); + + List<Object> entry = new ArrayList<>(); + entry.add(index * REDIS_SLOTS_PER_BUCKET); + entry.add(((index + 1) * REDIS_SLOTS_PER_BUCKET) - 1); + entry.add(Arrays.asList(primaryHostAndPort.getLeft(), primaryHostAndPort.getRight())); + + List<String> secondaryMembers = secondaryBucketToMemberMap.get(i); + if (secondaryMembers != null) { + for (String m : secondaryMembers) { + Pair<String, Integer> hostPort = memberToHostPortMap.get(m); + entry.add(Arrays.asList(hostPort.getLeft(), hostPort.getRight())); + } + } slots.add(entry); index++;
