This is an automated email from the ASF dual-hosted git repository. upthewaterspout pushed a commit to branch feature/redis-performance-testing in repository https://gitbox.apache.org/repos/asf/geode.git
commit cbe19e467f3d854a471e5b9857db6c5c9f1bebe0 Author: Jens Deppe <[email protected]> AuthorDate: Tue Feb 16 11:01:54 2021 -0800 WIP add clustering support --- .../org/apache/geode/cache/PartitionResolver.java | 4 +- .../geode/internal/cache/PartitionedRegion.java | 8 +- .../internal/cache/PartitionedRegionDataStore.java | 28 ++--- .../cluster/RedisPartitionResolverDUnitTest.java | 123 +++++++++++++++++++++ .../executor/cluster/ClusterIntegrationTest.java | 48 ++++++++ .../geode/redis/internal/RedisCommandType.java | 5 +- .../geode/redis/internal/RegionProvider.java | 15 ++- .../internal/cluster/BucketRetrievalFunction.java | 69 ++++++++++++ .../redis/internal/data/ByteArrayWrapper.java | 18 +++ .../redis/internal/executor/cluster/CRC16.java | 59 ++++++++++ .../internal/executor/cluster/ClusterExecutor.java | 110 ++++++++++++++++++ .../executor/cluster/RedisPartitionResolver.java | 30 +++++ .../sanctioned-geode-redis-serializables.txt | 2 + 13 files changed, 493 insertions(+), 26 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java b/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java index 3e9981c..7ad3cb9 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java +++ b/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java @@ -76,5 +76,7 @@ public interface PartitionResolver<K, V> extends CacheCallback { * * @return String name */ - String getName(); + default String getName() { + return getClass().getName(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index cd89363..68a039c 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -8149,8 +8149,8 @@ public class PartitionedRegion extends LocalRegion * A test method to get the list of all the bucket ids for the partitioned region in the data * Store. */ - public List getLocalBucketsListTestOnly() { - List localBucketList = null; + public List<Integer> getLocalBucketsListTestOnly() { + List<Integer> localBucketList = null; if (this.dataStore != null) { localBucketList = this.dataStore.getLocalBucketsListTestOnly(); } @@ -8161,8 +8161,8 @@ public class PartitionedRegion extends LocalRegion * A test method to get the list of all the primary bucket ids for the partitioned region in the * data Store. */ - public List getLocalPrimaryBucketsListTestOnly() { - List localPrimaryList = null; + public List<Integer> getLocalPrimaryBucketsListTestOnly() { + List<Integer> localPrimaryList = null; if (this.dataStore != null) { localPrimaryList = this.dataStore.getLocalPrimaryBucketsListTestOnly(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index d1d3dba..d1cb4e6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -2604,14 +2604,9 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { * <i>Test Method</i> Return the list of all the bucket names in this data store. * */ - public List getLocalBucketsListTestOnly() { - final List bucketList = new ArrayList(); - visitBuckets(new BucketVisitor() { - @Override - public void visit(Integer bucketId, Region r) { - bucketList.add(bucketId); - } - }); + public List<Integer> getLocalBucketsListTestOnly() { + final List<Integer> bucketList = new ArrayList<>(); + visitBuckets((bucketId, r) -> bucketList.add(bucketId)); return bucketList; } @@ -2635,16 +2630,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { * <i>Test Method</i> Return the list of all the non primary bucket ids in this data store. * */ - public List getLocalNonPrimaryBucketsListTestOnly() { - final List nonPrimaryBucketList = new ArrayList(); - visitBuckets(new BucketVisitor() { - @Override - public void visit(Integer bucketId, Region r) { - BucketRegion br = (BucketRegion) r; - BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor(); - if (!ba.isPrimary()) { - nonPrimaryBucketList.add(bucketId); - } + public List<Integer> getLocalNonPrimaryBucketsListTestOnly() { + final List<Integer> nonPrimaryBucketList = new ArrayList<>(); + visitBuckets((bucketId, r) -> { + BucketRegion br = (BucketRegion) r; + BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor(); + if (!ba.isPrimary()) { + nonPrimaryBucketList.add(bucketId); } }); return nonPrimaryBucketList; diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java new file mode 100644 index 0000000..3304932 --- /dev/null +++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.cluster; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.internal.cache.LocalDataSet; +import org.apache.geode.redis.internal.RegionProvider; +import org.apache.geode.redis.internal.data.ByteArrayWrapper; +import org.apache.geode.redis.internal.data.RedisData; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.dunit.rules.RedisClusterStartupRule; + +public class RedisPartitionResolverDUnitTest { + + @ClassRule + public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(4); + + private static final String LOCAL_HOST = "127.0.0.1"; + private static final int JEDIS_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); + private static Jedis jedis1; + + private static MemberVM locator; + private static MemberVM server1; + private static MemberVM server2; + private static MemberVM server3; + + private static int redisServerPort1; + private static int redisServerPort2; + + @BeforeClass + public static void classSetup() { + locator = cluster.startLocatorVM(0); + server1 = cluster.startRedisVM(1, locator.getPort()); + server2 = cluster.startRedisVM(2, locator.getPort()); + server3 = cluster.startRedisVM(3, locator.getPort()); + + redisServerPort1 = cluster.getRedisPort(1); + redisServerPort2 = cluster.getRedisPort(2); + + jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT); + } + + @Before + public void testSetup() { + jedis1.flushAll(); + } + + @Test + public void testRedisHashesMapToCorrectBuckets() { + int numKeys = 1000; + for (int i = 0; i < numKeys; i++) { + String key = "key-" + i; + jedis1.set(key, "value-" + i); + } + + Map<ByteArrayWrapper, Integer> keyToBucketMap1 = getKeyToBucketMap(server1); + Map<ByteArrayWrapper, Integer> keyToBucketMap2 = getKeyToBucketMap(server2); + Map<ByteArrayWrapper, Integer> keyToBucketMap3 = getKeyToBucketMap(server3); + + Set<Integer> buckets1 = new HashSet<>(keyToBucketMap1.values()); + Set<Integer> buckets2 = new HashSet<>(keyToBucketMap2.values()); + Set<Integer> buckets3 = new HashSet<>(keyToBucketMap3.values()); + + assertThat(buckets1).doesNotContainAnyElementsOf(buckets2); + assertThat(buckets1).doesNotContainAnyElementsOf(buckets3); + assertThat(buckets2).doesNotContainAnyElementsOf(buckets3); + + assertThat(buckets1.size() + buckets2.size() + buckets3.size()) + .isEqualTo(RegionProvider.REDIS_REGION_BUCKETS); + + + } + + private Map<ByteArrayWrapper, Integer> getKeyToBucketMap(MemberVM vm) { + return vm.invoke( + (SerializableCallableIF<Map<ByteArrayWrapper, Integer>>) () -> { + Region<ByteArrayWrapper, RedisData> region = + RedisClusterStartupRule.getCache().getRegion(RegionProvider.REDIS_DATA_REGION); + + LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region); + Map<ByteArrayWrapper, Integer> keyMap = new HashMap<>(); + + for (Object key : local.localKeys()) { + int id = local.getProxy().getKeyInfo(key).getBucketId(); + keyMap.put((ByteArrayWrapper) key, id); + } + + return keyMap; + }); + } +} diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java new file mode 100644 index 0000000..6bbfa7e --- /dev/null +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.executor.cluster; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import org.apache.geode.redis.GeodeRedisServerRule; + +public class ClusterIntegrationTest { + + @ClassRule + public static GeodeRedisServerRule server = new GeodeRedisServerRule(); + + private Jedis jedis; + + @Before + public void setUp() { + jedis = new Jedis("localhost", server.getPort(), 10000000); + } + + @After + public void tearDown() { + jedis.close(); + } + + @Test + public void testClusterSlots() { + System.err.println(server.getPort()); + System.err.println(jedis.clusterSlots()); + } +} diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java index 914e4f6..33ca297 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java @@ -33,6 +33,7 @@ import org.apache.geode.redis.internal.ParameterRequirements.UnspecifiedParamete import org.apache.geode.redis.internal.executor.Executor; import org.apache.geode.redis.internal.executor.RedisResponse; import org.apache.geode.redis.internal.executor.UnknownExecutor; +import org.apache.geode.redis.internal.executor.cluster.ClusterExecutor; import org.apache.geode.redis.internal.executor.connection.AuthExecutor; import org.apache.geode.redis.internal.executor.connection.EchoExecutor; import org.apache.geode.redis.internal.executor.connection.PingExecutor; @@ -276,6 +277,9 @@ public enum RedisCommandType { SLOWLOG(new SlowlogExecutor(), UNSUPPORTED, new SlowlogParameterRequirements()), TIME(new TimeExecutor(), UNSUPPORTED, new ExactParameterRequirements(1)), + /*********** CLUSTER **********/ + CLUSTER(new ClusterExecutor(), UNSUPPORTED, new MinimumParameterRequirements(1)), + /////////// UNIMPLEMENTED ///////////////////// ACL(null, UNIMPLEMENTED), @@ -288,7 +292,6 @@ public enum RedisCommandType { BZPOPMIN(null, UNIMPLEMENTED), BZPOPMAX(null, UNIMPLEMENTED), CLIENT(null, UNIMPLEMENTED), - CLUSTER(null, UNIMPLEMENTED), COMMAND(null, UNIMPLEMENTED), CONFIG(null, UNIMPLEMENTED), DEBUG(null, UNIMPLEMENTED), diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java index c7be2af..a8cd6c8 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java @@ -14,19 +14,23 @@ */ package org.apache.geode.redis.internal; +import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionFactory; import org.apache.geode.redis.internal.data.ByteArrayWrapper; import org.apache.geode.redis.internal.data.RedisData; +import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver; public class RegionProvider { /** * The name of the region that holds data stored in redis. */ - private static final String REDIS_DATA_REGION = "__REDIS_DATA"; - private static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG"; + public static final String REDIS_DATA_REGION = "__REDIS_DATA"; + public static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG"; + public static final int REDIS_REGION_BUCKETS = Integer.getInteger("redis.region.buckets", 128); + public static final int REDIS_SLOTS = Integer.getInteger("redis.slots", 16384); private final Region<ByteArrayWrapper, RedisData> dataRegion; private final Region<String, Object> configRegion; @@ -36,6 +40,13 @@ public class RegionProvider { InternalRegionFactory<ByteArrayWrapper, RedisData> redisDataRegionFactory = cache.createInternalRegionFactory(RegionShortcut.PARTITION_REDUNDANT); redisDataRegionFactory.setInternalRegion(true).setIsUsedForMetaRegion(true); + + PartitionAttributesFactory<ByteArrayWrapper, RedisData> attributesFactory = + new PartitionAttributesFactory<>(); + attributesFactory.setPartitionResolver(new RedisPartitionResolver()); + attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS); + redisDataRegionFactory.setPartitionAttributes(attributesFactory.create()); + dataRegion = redisDataRegionFactory.create(REDIS_DATA_REGION); InternalRegionFactory<String, Object> redisConfigRegionFactory = diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java new file mode 100644 index 0000000..ed9144c --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.cluster; + +import java.io.Serializable; +import java.net.InetAddress; +import java.util.Set; + +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.RegionFunctionContext; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.internal.cache.LocalDataSet; +import org.apache.geode.internal.inet.LocalHostUtil; + +public class BucketRetrievalFunction implements Function<Void> { + + private static final String hostAddress; + + static { + InetAddress localhost = null; + try { + localhost = LocalHostUtil.getLocalHost(); + } catch (Exception ex) { + } + + hostAddress = localhost == null ? "localhost" : localhost.getHostAddress(); + } + + @Override + public void execute(FunctionContext<Void> context) { + LocalDataSet local = (LocalDataSet) PartitionRegionHelper + .getLocalDataForContext((RegionFunctionContext) context); + + MemberBuckets mb = new MemberBuckets(hostAddress, local.getBucketSet()); + context.getResultSender().lastResult(mb); + } + + public static class MemberBuckets implements Serializable { + private final String hostAddress; + private final Set<Integer> bucketIds; + + public MemberBuckets(String hostAddress, Set<Integer> bucketIds) { + this.hostAddress = hostAddress; + this.bucketIds = bucketIds; + } + + public String getHostAddress() { + return hostAddress; + } + + public Set<Integer> getBucketIds() { + return bucketIds; + } + } +} diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java index ae292ca..b24d5a6 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java @@ -15,6 +15,9 @@ */ package org.apache.geode.redis.internal.data; +import static org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS; +import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -26,6 +29,8 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID; import org.apache.geode.internal.serialization.DeserializationContext; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.redis.internal.executor.cluster.CRC16; +import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver; import org.apache.geode.redis.internal.netty.Coder; /** @@ -40,6 +45,8 @@ public class ByteArrayWrapper */ protected byte[] value; + private transient Object routingId; + /** * Empty constructor for serialization */ @@ -108,6 +115,17 @@ public class ByteArrayWrapper } /** + * Used by the {@link RedisPartitionResolver} to map slots to buckets + */ + public synchronized Object getRoutingId() { + if (routingId == null) { + routingId = (CRC16.calculate(value) % REDIS_SLOTS) % REDIS_REGION_BUCKETS; + } + + return routingId; + } + + /** * Private helper method to compare two byte arrays, A.compareTo(B). The comparison is basically * numerical, for each byte index, the byte representing the greater value will be the greater * diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java new file mode 100644 index 0000000..7d1d049 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.executor.cluster; + +public class CRC16 { + + // CCITT/SDLC/HDLC x^16 + x^12 + x^5 + 1 (CRC-16-CCITT) + private static final int CCITT_POLY = 0x8408; + private static final short[] crcTable = new short[256]; + + // Create the table up front + static { + int poly = reverseInt16(CCITT_POLY); + + for (int x = 0; x < 256; x++) { + int w = x << 8; + for (int i = 0; i < 8; i++) { + if ((w & 0x8000) != 0) { + w = (w << 1) ^ poly; + } else { + w = w << 1; + } + } + crcTable[x] = (short) w; + } + } + + // Calculate CRC with MSB first + public static int calculate(byte[] data) { + int crc = 0; + for (byte datum : data) { + crc = ((crc << 8) & 0xFF00) ^ (crcTable[(crc >> 8) ^ (datum & 0xFF)] & 0xFFFF); + } + return crc; + } + + // Reverses the bits of a 16 bit integer. + private static int reverseInt16(int i) { + i = (i & 0x5555) << 1 | (i >>> 1) & 0x5555; + i = (i & 0x3333) << 2 | (i >>> 2) & 0x3333; + i = (i & 0x0F0F) << 4 | (i >>> 4) & 0x0F0F; + i = (i & 0x00FF) << 8 | (i >>> 8); + return i; + } + +} diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java new file mode 100644 index 0000000..7fac868 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.executor.cluster; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_COMMAND; +import static org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS; +import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS; +import static org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.MemberBuckets; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.cache.execute.ResultCollector; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction; +import org.apache.geode.redis.internal.executor.AbstractExecutor; +import org.apache.geode.redis.internal.executor.RedisResponse; +import org.apache.geode.redis.internal.netty.Command; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class ClusterExecutor extends AbstractExecutor { + + private static final Logger logger = LogService.getLogger(); + + @Override + public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { + + List<byte[]> args = command.getProcessedCommand(); + String subCommand = new String(args.get(1)); + + StringBuilder strArgs = new StringBuilder(); + args.forEach(x -> strArgs.append(new String(x)).append(" ")); + + logger.info("CLUSTER args: {}", strArgs); + + RedisResponse response; + switch (subCommand.toLowerCase()) { + case "slots": { + response = getSlots(context); + break; + } + default: { + response = RedisResponse.error(ERROR_UNKNOWN_COMMAND); + } + } + + return response; + } + + // @SuppressWarnings("unchecked") + private RedisResponse getSlots(ExecutionHandlerContext ctx) { + Region<?, ?> region = ctx.getRegionProvider().getDataRegion(); + + Execution<Void, MemberBuckets, List<MemberBuckets>> execution = + FunctionService.onRegion(region); + ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector = + execution.execute(new BucketRetrievalFunction()); + + SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>(); + int retrievedBucketCount = 0; + for (MemberBuckets m : resultCollector.getResult()) { + for (Integer id : m.getBucketIds()) { + bucketToMemberMap.put(id, m.getHostAddress()); + retrievedBucketCount++; + } + } + + if (retrievedBucketCount != REDIS_REGION_BUCKETS) { + return RedisResponse.error("Internal error: bucket count mismatch " + retrievedBucketCount + + " != " + REDIS_REGION_BUCKETS); + } + + int slotsPerBucket = REDIS_SLOTS / REDIS_REGION_BUCKETS; + int index = 0; + List<Object> slots = new ArrayList<>(); + + for (String member : bucketToMemberMap.values()) { + List<?> entry = Arrays.asList( + index * slotsPerBucket, + ((index + 1) * slotsPerBucket) - 1, + Arrays.asList(member, ctx.getServerPort())); + + slots.add(entry); + index++; + } + + return RedisResponse.array(slots); + } +} diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java new file mode 100644 index 0000000..5ca304e --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.executor.cluster; + +import org.apache.geode.cache.EntryOperation; +import org.apache.geode.cache.PartitionResolver; +import org.apache.geode.redis.internal.data.ByteArrayWrapper; +import org.apache.geode.redis.internal.data.RedisData; + +public class RedisPartitionResolver implements PartitionResolver<ByteArrayWrapper, RedisData> { + + @Override + public Object getRoutingObject(EntryOperation<ByteArrayWrapper, RedisData> opDetails) { + return opDetails.getKey().getRoutingId(); + } + +} diff --git a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt index 3d91f76..d299a57 100755 --- a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt +++ b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt @@ -1,6 +1,8 @@ org/apache/geode/redis/internal/ParameterRequirements/RedisParametersMismatchException,true,-643700717871858072 org/apache/geode/redis/internal/RedisCommandSupportLevel,false org/apache/geode/redis/internal/RedisCommandType,false,deferredParameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,executor:org/apache/geode/redis/internal/executor/Executor,parameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,supportLevel:org/apache/geode/redis/internal/RedisCommandSupportLevel +org/apache/geode/redis/internal/cluster/BucketRetrievalFunction,false +org/apache/geode/redis/internal/cluster/BucketRetrievalFunction$MemberBuckets,false org/apache/geode/redis/internal/data/NullRedisSet$SetOp,false org/apache/geode/redis/internal/data/NullRedisString$BitOp,false org/apache/geode/redis/internal/data/RedisDataType,false,toStringValue:java/lang/String
