This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/redis-performance-testing
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c48fdc50e64bbfdbcb44d8e69b7ccafc6bd03be7
Author: Jens Deppe <[email protected]>
AuthorDate: Thu Feb 25 15:37:48 2021 -0800

    Add secondaries to CLUSTER SLOTS output
---
 geode-redis/build.gradle                           |  1 +
 .../internal/cluster/BucketRetrievalFunction.java  | 42 +++++++++++++++++-----
 .../internal/executor/cluster/ClusterExecutor.java | 39 +++++++++++++-------
 3 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle
index 11b61f9..8d0eb70 100644
--- a/geode-redis/build.gradle
+++ b/geode-redis/build.gradle
@@ -31,6 +31,7 @@ facets {
 dependencies {
   compile(platform(project(':boms:geode-all-bom')))
   implementation(project(':geode-serialization'))
+  implementation(project(':geode-membership'))
   implementation(project(':geode-logging'))
   implementation(project(':geode-core'))
   implementation(project(':geode-gfsh'))
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
index 5faf700..4605d33 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
@@ -17,14 +17,21 @@ package org.apache.geode.redis.internal.cluster;
 
 import java.io.Serializable;
 import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.inet.LocalHostUtil;
 import org.apache.geode.redis.internal.RegionProvider;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
@@ -60,10 +67,22 @@ public class BucketRetrievalFunction implements 
InternalFunction<Void> {
     Region<RedisKey, ByteArrayWrapper> region =
         context.getCache().getRegion(RegionProvider.REDIS_DATA_REGION);
 
-    LocalDataSet local = (LocalDataSet) 
PartitionRegionHelper.getLocalPrimaryData(region);
+    String memberId =
+        
context.getCache().getDistributedSystem().getDistributedMember().getUniqueId();
+    LocalDataSet localPrimary = (LocalDataSet) 
PartitionRegionHelper.getLocalPrimaryData(region);
+    RegionAdvisor advisor = ((PartitionedRegion) region).getRegionAdvisor();
+    Map<Integer, List<String>> bucketToSecondaries = new HashMap<>();
+
+    for (Integer bucketId : localPrimary.getBucketSet()) {
+      List<String> a = advisor.getBucketOwners(bucketId).stream()
+          .map(InternalDistributedMember::getId)
+          .filter(x -> !x.equals(memberId))
+          .collect(Collectors.toList());
+      bucketToSecondaries.put(bucketId, a);
+    }
 
-    MemberBuckets mb =
-        new MemberBuckets(context.getMemberName(), hostAddress, redisPort, 
local.getBucketSet());
+    MemberBuckets mb = new MemberBuckets(memberId, hostAddress, redisPort,
+        localPrimary.getBucketSet(), bucketToSecondaries);
     context.getResultSender().lastResult(mb);
   }
 
@@ -76,13 +95,16 @@ public class BucketRetrievalFunction implements 
InternalFunction<Void> {
     private final String memberId;
     private final String hostAddress;
     private final int port;
-    private final Set<Integer> bucketIds;
+    private final Set<Integer> primaryBucketIds;
+    private final Map<Integer, List<String>> secondaryBucketMembers;
 
-    public MemberBuckets(String memberId, String hostAddress, int port, 
Set<Integer> bucketIds) {
+    public MemberBuckets(String memberId, String hostAddress, int port,
+        Set<Integer> primaryBucketIds, Map<Integer, List<String>> 
secondaryBucketIds) {
       this.memberId = memberId;
       this.hostAddress = hostAddress;
       this.port = port;
-      this.bucketIds = bucketIds;
+      this.primaryBucketIds = primaryBucketIds;
+      this.secondaryBucketMembers = secondaryBucketIds;
     }
 
     public String getMemberId() {
@@ -97,8 +119,12 @@ public class BucketRetrievalFunction implements 
InternalFunction<Void> {
       return port;
     }
 
-    public Set<Integer> getBucketIds() {
-      return bucketIds;
+    public Set<Integer> getPrimaryBucketIds() {
+      return primaryBucketIds;
+    }
+
+    public Map<Integer, List<String>> getSecondaryBucketMembers() {
+      return secondaryBucketMembers;
     }
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
index be2f708..3dde5aa 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
@@ -17,16 +17,16 @@ package org.apache.geode.redis.internal.executor.cluster;
 
 import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_COMMAND;
 import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
-import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
 import static 
org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.MemberBuckets;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.TreeMap;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -94,15 +94,21 @@ public class ClusterExecutor extends AbstractExecutor {
     ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector =
         
FunctionService.onMembers(membersWithDataRegion).execute(BucketRetrievalFunction.ID);
 
-    SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>();
+    Map<Integer, String> primaryBucketToMemberMap = new HashMap<>();
+    Map<Integer, List<String>> secondaryBucketToMemberMap = new HashMap<>();
     Map<String, Pair<String, Integer>> memberToHostPortMap = new TreeMap<>();
     int retrievedBucketCount = 0;
+
     for (MemberBuckets m : resultCollector.getResult()) {
       memberToHostPortMap.put(m.getMemberId(), Pair.of(m.getHostAddress(), 
m.getPort()));
-      for (Integer id : m.getBucketIds()) {
-        bucketToMemberMap.put(id, m.getMemberId());
+      for (Integer id : m.getPrimaryBucketIds()) {
+        primaryBucketToMemberMap.put(id, m.getMemberId());
         retrievedBucketCount++;
       }
+
+      for (Map.Entry<Integer, List<String>> entry : 
m.getSecondaryBucketMembers().entrySet()) {
+        secondaryBucketToMemberMap.put(entry.getKey(), entry.getValue());
+      }
     }
 
     if (retrievedBucketCount != REDIS_REGION_BUCKETS) {
@@ -110,16 +116,25 @@ public class ClusterExecutor extends AbstractExecutor {
           + " != " + REDIS_REGION_BUCKETS);
     }
 
-    int slotsPerBucket = REDIS_SLOTS / REDIS_REGION_BUCKETS;
     int index = 0;
     List<Object> slots = new ArrayList<>();
 
-    for (String member : bucketToMemberMap.values()) {
-      Pair<String, Integer> hostAndPort = memberToHostPortMap.get(member);
-      List<?> entry = Arrays.asList(
-          index * slotsPerBucket,
-          ((index + 1) * slotsPerBucket) - 1,
-          Arrays.asList(hostAndPort.getLeft(), hostAndPort.getRight()));
+    for (int i = 0; i < REDIS_REGION_BUCKETS; i++) {
+      Pair<String, Integer> primaryHostAndPort =
+          memberToHostPortMap.get(primaryBucketToMemberMap.get(i));
+
+      List<Object> entry = new ArrayList<>();
+      entry.add(index * REDIS_SLOTS_PER_BUCKET);
+      entry.add(((index + 1) * REDIS_SLOTS_PER_BUCKET) - 1);
+      entry.add(Arrays.asList(primaryHostAndPort.getLeft(), 
primaryHostAndPort.getRight()));
+
+      List<String> secondaryMembers = secondaryBucketToMemberMap.get(i);
+      if (secondaryMembers != null) {
+        for (String m : secondaryMembers) {
+          Pair<String, Integer> hostPort = memberToHostPortMap.get(m);
+          entry.add(Arrays.asList(hostPort.getLeft(), hostPort.getRight()));
+        }
+      }
 
       slots.add(entry);
       index++;

Reply via email to