This is an automated email from the ASF dual-hosted git repository. dschneider pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new e084aa4 GEODE-8130: use a single region for redis Sets and Hashes (#5120) e084aa4 is described below commit e084aa4bdaed4d626e559fec4e7e64e1fbf93755 Author: Darrel Schneider <dschnei...@pivotal.io> AuthorDate: Fri May 15 09:37:54 2020 -0700 GEODE-8130: use a single region for redis Sets and Hashes (#5120) * Replaced replicated metaRegion with a partitioned dataRegion. Currently the dataRegion is used the same way as the metaRegion except for sets and hashes which store their actual data in it. * Exception handling now correctly deals with FunctionException * Disabled a test until GEODE-8127 if fixed. * Now uses the ByteArrayWrapper as the key on the meta region and the locks map instead of using a String. Since a ByteArrayWrapper is used as the key in the data region this will end up saving memory. * Found a problem with redis dynamic region management. Some of the code was executing when we added a new set or hash to the metaDataRegion. It was only ignoring STRING and HLL. This caused some extra memory to be used for every redis set/hash. Now the dynamic region code is only used for lists and sortedSet. * This commit has some TODO comments of what looks like a bug in the dynamic region code when a new server is started. It looks like the new server will not create already existing dynamic regions. We could test this by starting one server, create a LIST, then start another server, and then shutdown the first server. Does the LIST still exist? If we change them not to use dynamic regions then this issue will go away. --- .../apache/geode/redis/GeodeRedisServerRule.java | 4 +- .../org/apache/geode/redis/RedisDistDUnitTest.java | 5 +- .../apache/geode/redis/HashesIntegrationTest.java | 62 ------- .../geode/redis/RedisServerIntegrationTest.java | 8 +- .../apache/geode/redis/StringsIntegrationTest.java | 4 +- .../geode/redis/sets/SRemIntegrationTest.java | 2 +- .../geode/redis/sets/SetsIntegrationTest.java | 4 +- .../codeAnalysis/sanctionedDataSerializables.txt | 4 + .../redis/internal/ExecutionHandlerContext.java | 19 +- .../geode/redis/internal/GeodeRedisServer.java | 154 +++++++--------- .../apache/geode/redis/internal/KeyRegistrar.java | 147 ++++++++++++--- .../geode/redis/internal/RedisConstants.java | 2 - .../org/apache/geode/redis/internal/RedisData.java | 24 +++ .../geode/redis/internal/RegionProvider.java | 198 +++++++++++---------- .../redis/internal/executor/CommandFunction.java | 12 +- .../redis/internal/executor/EmptyRedisHash.java | 26 ++- .../redis/internal/executor/FlushAllExecutor.java | 10 +- .../redis/internal/executor/KeysExecutor.java | 15 +- .../redis/internal/executor/RedisHashInRegion.java | 34 +++- .../redis/internal/executor/RenameExecutor.java | 39 +++- .../redis/internal/executor/ScanExecutor.java | 6 +- .../redis/internal/executor/hash/HDelExecutor.java | 9 +- .../internal/executor/hash/HExistsExecutor.java | 18 +- .../internal/executor/hash/HGetAllExecutor.java | 2 +- .../redis/internal/executor/hash/HGetExecutor.java | 4 +- .../internal/executor/hash/HIncrByExecutor.java | 16 +- .../executor/hash/HIncrByFloatExecutor.java | 13 +- .../internal/executor/hash/HKeysExecutor.java | 23 +-- .../redis/internal/executor/hash/HLenExecutor.java | 21 +-- .../internal/executor/hash/HMGetExecutor.java | 5 +- .../internal/executor/hash/HMSetExecutor.java | 4 +- .../internal/executor/hash/HScanExecutor.java | 7 +- .../redis/internal/executor/hash/HSetExecutor.java | 4 +- .../internal/executor/hash/HValsExecutor.java | 16 +- .../redis/internal/executor/hash/HashExecutor.java | 42 +++-- .../redis/internal/executor/hash/RedisHash.java | 17 +- .../hash/RedisHashCommandsFunctionExecutor.java | 5 +- .../redis/internal/executor/set/EmptyRedisSet.java | 9 +- .../redis/internal/executor/set/RedisSet.java | 31 ++-- .../internal/executor/set/RedisSetCommands.java | 4 +- .../set/RedisSetCommandsFunctionExecutor.java | 17 +- .../internal/executor/set/RedisSetInRegion.java | 29 ++- .../redis/internal/executor/set/SAddExecutor.java | 6 +- .../redis/internal/executor/set/SCardExecutor.java | 2 +- .../internal/executor/set/SIsMemberExecutor.java | 19 +- .../internal/executor/set/SMembersExecutor.java | 5 +- .../redis/internal/executor/set/SMoveExecutor.java | 11 +- .../redis/internal/executor/set/SPopExecutor.java | 2 +- .../internal/executor/set/SRandMemberExecutor.java | 2 +- .../redis/internal/executor/set/SRemExecutor.java | 18 +- .../redis/internal/executor/set/SScanExecutor.java | 2 +- .../redis/internal/executor/set/SetExecutor.java | 5 +- .../redis/internal/executor/set/SetOpExecutor.java | 5 +- .../executor/set/SynchronizedStripedExecutor.java | 2 + .../redis/internal/RegionProviderJUnitTest.java | 11 +- .../executor/AbstractExecutorJUnitTest.java | 8 - 56 files changed, 600 insertions(+), 573 deletions(-) diff --git a/geode-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java b/geode-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java index f46d4e9..5ca4e1b 100644 --- a/geode-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java +++ b/geode-redis/src/commonTest/java/org/apache/geode/redis/GeodeRedisServerRule.java @@ -70,8 +70,8 @@ public class GeodeRedisServerRule extends SerializableExternalResource { return server.getKeyRegistrar(); } - public RegionProvider getRegionCache() { - return server.getRegionCache(); + public RegionProvider getRegionProvider() { + return server.getRegionProvider(); } public RedisLockService getLockService() { diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/RedisDistDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/RedisDistDUnitTest.java index 36b46be..3b7a3a4 100644 --- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/RedisDistDUnitTest.java +++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/RedisDistDUnitTest.java @@ -110,6 +110,7 @@ public class RedisDistDUnitTest implements Serializable { } @Test + @Ignore("GEODE-8127") public void testConcurrentSaddOperations_runWithoutException_orDataLoss() throws InterruptedException { List<String> set1 = new ArrayList<>(); @@ -118,6 +119,8 @@ public class RedisDistDUnitTest implements Serializable { final String setName = "keyset"; + Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT); + AsyncInvocation<Void> remoteSaddInvocation = client1.invokeAsync(new ConcurrentSADDOperation(server1Port, setName, set1)); @@ -125,8 +128,6 @@ public class RedisDistDUnitTest implements Serializable { remoteSaddInvocation.await(); - Jedis jedis = new Jedis(LOCALHOST, server1Port, JEDIS_TIMEOUT); - Set<String> smembers = jedis.smembers(setName); assertThat(smembers).hasSize(setSize * 2); diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java index 73a2a53..71f407f 100755 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java @@ -35,12 +35,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.RandomStringUtils; @@ -50,14 +45,12 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import redis.clients.jedis.Jedis; import redis.clients.jedis.ScanResult; import redis.clients.jedis.exceptions.JedisDataException; -import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.test.junit.categories.RedisTest; @Category({RedisTest.class}) @@ -163,61 +156,6 @@ public class HashesIntegrationTest { assertThat(jedis.exists("farm")).isFalse(); } - @Ignore("GEODE-7905") - @Test - public void testConcurrentHDelConsistentlyUpdatesMetaInformation() - throws ExecutionException, InterruptedException { - ByteArrayWrapper keyAsByteArray = new ByteArrayWrapper("hash".getBytes()); - AtomicLong errorCount = new AtomicLong(); - CyclicBarrier startCyclicBarrier = new CyclicBarrier(2, () -> { - boolean keyIsRegistered = server.getKeyRegistrar().isRegistered(keyAsByteArray); - boolean containsKey = server.getRegionCache().getHashRegion().containsKey(keyAsByteArray); - - if (keyIsRegistered != containsKey) { - errorCount.getAndIncrement(); - jedis.hset("hash", "field", "value"); - jedis.del("hash"); - } - }); - - ExecutorService pool = Executors.newFixedThreadPool(2); - - Callable<Long> callable1 = () -> { - Long removedCount = 0L; - for (int i = 0; i < 1000; i++) { - try { - Long result = jedis.hdel("hash", "field"); - startCyclicBarrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return removedCount; - }; - - Callable<Long> callable2 = () -> { - Long addedCount = 0L; - for (int i = 0; i < 1000; i++) { - try { - addedCount += jedis2.hset("hash", "field", "value"); - startCyclicBarrier.await(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return addedCount; - }; - - Future<Long> future1 = pool.submit(callable1); - Future<Long> future2 = pool.submit(callable2); - - future1.get(); - future2.get(); - - assertThat(errorCount.get()) - .as("Inconsistency between keyRegistrar and backing store detected.").isEqualTo(0L); - } - @Test public void testHkeys() { String key = randString(); diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/RedisServerIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/RedisServerIntegrationTest.java index 29392da..c2e8909 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/RedisServerIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/RedisServerIntegrationTest.java @@ -59,11 +59,11 @@ public class RedisServerIntegrationTest { } @Test - public void initializeRedisCreatesFourRegions() { - redisServer = new GeodeRedisServer(); + public void initializeRedisCreatesTwoRegions() { + redisServer = new GeodeRedisServer(redisPort); redisServer.start(); - assertThat(cache.rootRegions()).hasSize(4); - assertThat(cache.getRegion(GeodeRedisServer.REDIS_META_DATA_REGION)).isNotNull(); + assertThat(cache.rootRegions()).hasSize(2); + assertThat(cache.getRegion(GeodeRedisServer.REDIS_DATA_REGION)).isNotNull(); } @Test diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java index ffa2093..8c4919d 100755 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java @@ -15,7 +15,7 @@ package org.apache.geode.redis; import static java.lang.Integer.parseInt; -import static org.apache.geode.redis.internal.GeodeRedisServer.REDIS_META_DATA_REGION; +import static org.apache.geode.redis.internal.GeodeRedisServer.REDIS_DATA_REGION; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -583,7 +583,7 @@ public class StringsIntegrationTest { @Test public void testSet_protectedRedisDataType_throwsRedisDataTypeMismatchException() { assertThatThrownBy( - () -> jedis.set(REDIS_META_DATA_REGION, "something else")) + () -> jedis.set(REDIS_DATA_REGION, "something else")) .isInstanceOf(JedisDataException.class) .hasMessageContaining("protected"); } diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SRemIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SRemIntegrationTest.java index 7e92818..ddf0649 100755 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SRemIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SRemIntegrationTest.java @@ -187,7 +187,7 @@ public class SRemIntegrationTest { AtomicLong errorCount = new AtomicLong(); CyclicBarrier startCyclicBarrier = new CyclicBarrier(2, () -> { boolean keyIsRegistered = server.getKeyRegistrar().isRegistered(keyAsByteArray); - boolean containsKey = server.getRegionCache().getSetRegion().containsKey(keyAsByteArray); + boolean containsKey = server.getRegionProvider().getDataRegion().containsKey(keyAsByteArray); if (keyIsRegistered != containsKey) { errorCount.getAndIncrement(); diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SetsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SetsIntegrationTest.java index e9108db..0758e87 100755 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SetsIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/sets/SetsIntegrationTest.java @@ -37,7 +37,6 @@ import redis.clients.jedis.exceptions.JedisDataException; import org.apache.geode.management.internal.cli.util.ThreePhraseGenerator; import org.apache.geode.redis.GeodeRedisServerRule; -import org.apache.geode.redis.internal.RedisConstants; import org.apache.geode.test.junit.categories.RedisTest; @Category({RedisTest.class}) @@ -92,7 +91,8 @@ public class SetsIntegrationTest { setValue[0] = "set value that should never get added"; exceptionRule.expect(JedisDataException.class); - exceptionRule.expectMessage(RedisConstants.ERROR_WRONG_TYPE); + exceptionRule + .expectMessage("WRONGTYPE Operation against a key holding the wrong kind of value"); jedis.set(key, stringValue); jedis.sadd(key, setValue); diff --git a/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 25b02e4..24cbbd4 100644 --- a/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -6,6 +6,10 @@ org/apache/geode/redis/internal/DoubleWrapper,2 fromData,9 toData,9 +org/apache/geode/redis/internal/KeyRegistrar$RedisDataTransformer,2 +fromData,14 +toData,9 + org/apache/geode/redis/internal/executor/hash/RedisHash,2 fromData,9 toData,9 diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java index 93e8978..1998371 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java @@ -35,6 +35,7 @@ import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.UnsupportedOperationInTransactionException; +import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.query.QueryInvocationTargetException; import org.apache.geode.cache.query.RegionNotFoundException; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -55,7 +56,7 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { private static final Logger logger = LogService.getLogger(); private static final int WAIT_REGION_DSTRYD_MILLIS = 100; private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) / WAIT_REGION_DSTRYD_MILLIS; // 60 - // seconds + // seconds private final RedisLockService lockService; private final Cache cache; @@ -99,7 +100,8 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { * @param password Authentication password for each context, can be null */ public ExecutionHandlerContext(Channel channel, Cache cache, RegionProvider regionProvider, - GeodeRedisServer server, byte[] password, KeyRegistrar keyRegistrar, PubSub pubSub, + GeodeRedisServer server, byte[] password, + KeyRegistrar keyRegistrar, PubSub pubSub, RedisLockService lockService) { this.keyRegistrar = keyRegistrar; this.lockService = lockService; @@ -168,6 +170,19 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { private ByteBuf getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) { ByteBuf response; + if (cause instanceof FunctionException) { + Throwable th = cause.getCause(); + if (th == null) { + FunctionException functionException = (FunctionException) cause; + if (functionException.getExceptions() != null) { + th = functionException.getExceptions().get(0); + } + } + if (th != null) { + cause = th; + } + } + if (cause instanceof RedisDataTypeMismatchException) { response = Coder.getWrongTypeResponse(this.byteBufAllocator, cause.getMessage()); } else if (cause instanceof DecoderException diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java index ad1d385..05afba1 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java @@ -65,7 +65,6 @@ import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.EntryEvent; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; @@ -84,8 +83,6 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel; import org.apache.geode.management.ManagementService; import org.apache.geode.management.internal.SystemManagementService; import org.apache.geode.redis.internal.executor.CommandFunction; -import org.apache.geode.redis.internal.executor.hash.RedisHash; -import org.apache.geode.redis.internal.executor.set.RedisSet; import org.apache.geode.redis.internal.serverinitializer.NamedThreadFactory; /** @@ -97,7 +94,7 @@ import org.apache.geode.redis.internal.serverinitializer.NamedThreadFactory; * Each Redis data type instance is stored in a separate {@link Region} except for the Strings and * HyperLogLogs which are collectively stored in one Region respectively. That Region along with a * meta data region used internally are protected so the client may not store keys with the name - * {@link GeodeRedisServer#REDIS_META_DATA_REGION} or {@link GeodeRedisServer#STRING_REGION}. The + * {@link GeodeRedisServer#REDIS_DATA_REGION} or {@link GeodeRedisServer#STRING_REGION}. The * default Region type is {@link RegionShortcut#PARTITION_REDUNDANT}. If the * {@link GeodeRedisServer#NUM_THREADS_SYS_PROP_NAME} system property is set * to 0, one thread per client will be created. Otherwise a worker thread pool of specified size is @@ -118,7 +115,7 @@ import org.apache.geode.redis.internal.serverinitializer.NamedThreadFactory; * HMSET, HSETNX, HLEN, HSCAN, HSET, HVALS * <p> * Supported Set commands - SADD, SCARD, SDIFF, SDIFFSTORE, SINTER, SINTERSTORE, SISMEMBER, - * SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, SCAN, SUNION, SUNIONSTORE + * SMEMBERS, SMOVE, SREM, SPOP, SRANDMEMBER, SSCAN, SUNION, SUNIONSTORE * <p> * Supported SortedSet commands - ZADD, ZCARD, ZCOUNT, ZINCRBY, ZLEXCOUNT, ZRANGE, ZRANGEBYLEX, * ZRANGEBYSCORE, ZRANK, ZREM, ZREMRANGEBYLEX, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREVRANGE, @@ -220,7 +217,7 @@ public class GeodeRedisServer { @SuppressWarnings("deprecation") private org.apache.geode.LogWriter logger; - private RegionProvider regionCache; + private RegionProvider regionProvider; private final MetaCacheListener metaListener; @@ -242,29 +239,17 @@ public class GeodeRedisServer { public static final String STRING_REGION = "ReDiS_StRiNgS"; /** - * TThe field that defines the name of the {@link Region} which holds non-named hash. The current - * value of this field is {@value #HASH_REGION}. - */ - public static final String HASH_REGION = "ReDiS_HASH"; - - /** - * TThe field that defines the name of the {@link Region} which holds sets. The current value of - * this field is {@value #SET_REGION}. - */ - public static final String SET_REGION = "ReDiS_SET"; - - - /** * The field that defines the name of the {@link Region} which holds all of the HyperLogLogs. The * current value of this field is {@code HLL_REGION}. */ public static final String HLL_REGION = "ReDiS_HlL"; /** - * The field that defines the name of the {@link Region} which holds all of the Redis meta data. - * The current value of this field is {@code REDIS_META_DATA_REGION}. + * The name of the region that holds data stored in redis. + * Currently this is the meta data but at some point the value for a particular + * type will be changed to an instance of {@link RedisData} */ - public static final String REDIS_META_DATA_REGION = "ReDiS_MeTa_DaTa"; + public static final String REDIS_DATA_REGION = "ReDiS_DaTa"; /** * System property name that can be used to set the number of threads to be used by the @@ -420,8 +405,8 @@ public class GeodeRedisServer { } @VisibleForTesting - public RegionProvider getRegionCache() { - return regionCache; + public RegionProvider getRegionProvider() { + return regionProvider; } private void initializeRedis() { @@ -429,9 +414,7 @@ public class GeodeRedisServer { Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion; Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion; - Region<ByteArrayWrapper, RedisHash> redisHash; - Region<String, RedisDataType> redisMetaData; - Region<ByteArrayWrapper, RedisSet> redisSet; + Region<ByteArrayWrapper, RedisData> redisData; InternalCache gemFireCache = (InternalCache) cache; if ((stringsRegion = cache.getRegion(STRING_REGION)) == null) { @@ -445,37 +428,25 @@ public class GeodeRedisServer { hLLRegion = regionFactory.create(HLL_REGION); } - if ((redisHash = cache.getRegion(HASH_REGION)) == null) { - RegionFactory<ByteArrayWrapper, RedisHash> regionFactory = - gemFireCache.createRegionFactory(DEFAULT_REGION_TYPE); - redisHash = regionFactory.create(HASH_REGION); - } - - if ((redisSet = cache.getRegion(SET_REGION)) == null) { - RegionFactory<ByteArrayWrapper, RedisSet> regionFactory = - gemFireCache.createRegionFactory(DEFAULT_REGION_TYPE); - redisSet = regionFactory.create(SET_REGION); - } - - if ((redisMetaData = cache.getRegion(REDIS_META_DATA_REGION)) == null) { - InternalRegionFactory<String, RedisDataType> redisMetaDataFactory = - gemFireCache.createInternalRegionFactory(); + if ((redisData = cache.getRegion(REDIS_DATA_REGION)) == null) { + InternalRegionFactory<ByteArrayWrapper, RedisData> redisMetaDataFactory = + gemFireCache.createInternalRegionFactory(DEFAULT_REGION_TYPE); redisMetaDataFactory.addCacheListener(metaListener); - redisMetaDataFactory.setDataPolicy(DataPolicy.REPLICATE); redisMetaDataFactory.setInternalRegion(true).setIsUsedForMetaRegion(true); - redisMetaData = redisMetaDataFactory.create(REDIS_META_DATA_REGION); + redisData = redisMetaDataFactory.create(REDIS_DATA_REGION); } - keyRegistrar = new KeyRegistrar(redisMetaData); + keyRegistrar = new KeyRegistrar(redisData); hashLockService = new RedisLockService(); pubSub = new PubSubImpl(new Subscriptions()); - regionCache = new RegionProvider(stringsRegion, hLLRegion, keyRegistrar, - expirationFutures, expirationExecutor, DEFAULT_REGION_TYPE, redisHash, redisSet); - redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED); - redisMetaData.put(HLL_REGION, RedisDataType.REDIS_PROTECTED); - redisMetaData.put(STRING_REGION, RedisDataType.REDIS_PROTECTED); - redisMetaData.put(SET_REGION, RedisDataType.REDIS_PROTECTED); - redisMetaData.put(HASH_REGION, RedisDataType.REDIS_PROTECTED); + regionProvider = new RegionProvider(stringsRegion, hLLRegion, keyRegistrar, + expirationFutures, expirationExecutor, DEFAULT_REGION_TYPE, redisData); + keyRegistrar.register(Coder.stringToByteArrayWrapper(REDIS_DATA_REGION), + RedisDataType.REDIS_PROTECTED); + keyRegistrar.register(Coder.stringToByteArrayWrapper(HLL_REGION), + RedisDataType.REDIS_PROTECTED); + keyRegistrar.register(Coder.stringToByteArrayWrapper(STRING_REGION), + RedisDataType.REDIS_PROTECTED); CommandFunction.register(); } @@ -484,6 +455,8 @@ public class GeodeRedisServer { registerLockServiceMBean(); } + public static final int PROTECTED_KEY_COUNT = 3; + @VisibleForTesting public RedisLockService getLockService() { return hashLockService; @@ -509,20 +482,28 @@ public class GeodeRedisServer { } private void checkForRegions() { - Collection<Entry<String, RedisDataType>> entrySet = keyRegistrar.keyInfos(); - for (Entry<String, RedisDataType> entry : entrySet) { - String regionName = entry.getKey(); - RedisDataType type = entry.getValue(); - Region<?, ?> newRegion = cache.getRegion(regionName); - if (newRegion == null && type != RedisDataType.REDIS_STRING && type != RedisDataType.REDIS_HLL - && type != RedisDataType.REDIS_PROTECTED) { - try { - regionCache - .createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(regionName), type); - } catch (Exception e) { - if (logger.errorEnabled()) { - logger.error(e); - } + Collection<Entry<ByteArrayWrapper, RedisData>> entrySet = keyRegistrar.keyInfos(); + for (Entry<ByteArrayWrapper, RedisData> entry : entrySet) { + ByteArrayWrapper key = entry.getKey(); + RedisDataType type = entry.getValue().getType(); + if (!regionProvider.typeUsesDynamicRegions(type)) { + continue; + } + if (cache.getRegion(key.toString()) != null) { + // TODO: this seems to be correct (i.e. no need to call createRemoteRegionReferenceLocally + // if region already exists). + // HOWEVER: createRemoteRegionReferenceLocally ends up doing nothing if the region does not + // exist. So this caller of createRemoteRegionReferenceLocally basically does nothing. + // createRemoteRegionReferenceLocally might be needed even if the region exists because + // local state needs to be initialized (like indexes and queries). + continue; + } + try { + regionProvider.createRemoteRegionReferenceLocally(key, type); + } catch (Exception e) { + // TODO: this eats the exception so if something really is wrong we don't fail but just log. + if (logger.errorEnabled()) { + logger.error(e); } } } @@ -614,7 +595,7 @@ public class GeodeRedisServer { pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder()); pipeline.addLast(new WriteTimeoutHandler(10)); pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(), - new ExecutionHandlerContext(socketChannel, cache, regionCache, GeodeRedisServer.this, + new ExecutionHandlerContext(socketChannel, cache, regionProvider, GeodeRedisServer.this, redisPasswordBytes, keyRegistrar, pubSub, hashLockService)); } @@ -659,17 +640,13 @@ public class GeodeRedisServer { * * @param event EntryEvent from meta data region */ - private void afterKeyCreate(EntryEvent<String, RedisDataType> event) { + private void afterKeyCreate(EntryEvent<ByteArrayWrapper, RedisData> event) { if (event.isOriginRemote()) { - final String key = event.getKey(); - final RedisDataType value = event.getNewValue(); - if (value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL - && value != RedisDataType.REDIS_PROTECTED) { - try { - regionCache.createRemoteRegionReferenceLocally(Coder.stringToByteArrayWrapper(key), - value); - } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore - } + final ByteArrayWrapper key = event.getKey(); + final RedisData value = event.getNewValue(); + try { + regionProvider.createRemoteRegionReferenceLocally(key, value.getType()); + } catch (RegionDestroyedException ignore) { // Region already destroyed, ignore } } } @@ -678,29 +655,22 @@ public class GeodeRedisServer { * When a key is removed then this function will make sure the associated queries with the key are * also removed from each vm to avoid unnecessary data retention */ - private void afterKeyDestroy(EntryEvent<String, RedisDataType> event) { + private void afterKeyDestroy(EntryEvent<ByteArrayWrapper, RedisData> event) { if (event.isOriginRemote()) { - final String key = event.getKey(); - final RedisDataType value = event.getOldValue(); - if (value != null && value != RedisDataType.REDIS_STRING && value != RedisDataType.REDIS_HLL - && value != RedisDataType.REDIS_PROTECTED) { - ByteArrayWrapper kW = Coder.stringToByteArrayWrapper(key); - Region<?, ?> r = regionCache.getRegion(kW); - if (r != null) { - regionCache.removeRegionReferenceLocally(kW, value); - } - } + final ByteArrayWrapper key = event.getKey(); + final RedisData value = event.getOldValue(); + regionProvider.removeRegionReferenceLocally(key, value.getType()); } } - private class MetaCacheListener extends CacheListenerAdapter<String, RedisDataType> { + private class MetaCacheListener extends CacheListenerAdapter<ByteArrayWrapper, RedisData> { @Override - public void afterCreate(EntryEvent<String, RedisDataType> event) { + public void afterCreate(EntryEvent<ByteArrayWrapper, RedisData> event) { afterKeyCreate(event); } @Override - public void afterDestroy(EntryEvent<String, RedisDataType> event) { + public void afterDestroy(EntryEvent<ByteArrayWrapper, RedisData> event) { afterKeyDestroy(event); } } @@ -730,7 +700,7 @@ public class GeodeRedisServer { serverChannel.close(); c.syncUninterruptibly(); c2.syncUninterruptibly(); - regionCache.close(); + regionProvider.close(); if (mainThread != null) { mainThread.interrupt(); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/KeyRegistrar.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/KeyRegistrar.java index 291193d..a73d56e 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/KeyRegistrar.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/KeyRegistrar.java @@ -14,16 +14,21 @@ */ package org.apache.geode.redis.internal; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Map; import java.util.Set; +import org.apache.geode.DataSerializer; +import org.apache.geode.InvalidDeltaException; import org.apache.geode.cache.Region; public class KeyRegistrar { - private Region<String, RedisDataType> redisMetaRegion; + private Region<ByteArrayWrapper, RedisData> redisDataRegion; - public KeyRegistrar(Region<String, RedisDataType> redisMetaRegion) { - this.redisMetaRegion = redisMetaRegion; + public KeyRegistrar(Region<ByteArrayWrapper, RedisData> redisDataRegion) { + this.redisDataRegion = redisDataRegion; } /** @@ -31,39 +36,119 @@ public class KeyRegistrar { * store the key:datatype association in the metadataRegion */ public void register(ByteArrayWrapper key, RedisDataType type) { - RedisDataType existingType = this.redisMetaRegion.putIfAbsent(key.toString(), type); - if (!isValidDataType(existingType, type)) { - throwDataTypeException(key, existingType); + RedisData existingValue = this.redisDataRegion.putIfAbsent(key, transformType(type)); + if (!isValidDataType(existingValue, type)) { + throwDataTypeException(key, existingValue); } } - public boolean unregister(ByteArrayWrapper key) { - return this.redisMetaRegion.remove(key.toString()) != null; + /** + * TODO: This class should go away once all data types implement RedisData. + */ + private static class RedisDataTransformer implements RedisData { + private RedisDataType type; + + public RedisDataTransformer(RedisDataType type) { + this.type = type; + } + + public RedisDataTransformer() { + // needed for serialization + } + + @Override + public RedisDataType getType() { + return type; + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeEnum(type, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + type = DataSerializer.readEnum(RedisDataType.class, in); + } + + @Override + public boolean hasDelta() { + return false; + } + + @Override + public void toDelta(DataOutput out) throws IOException {} + + @Override + public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {} + } + + private static final RedisDataTransformer REDIS_SORTEDSET_DATA = + new RedisDataTransformer(RedisDataType.REDIS_SORTEDSET); + private static final RedisDataTransformer REDIS_LIST_DATA = + new RedisDataTransformer(RedisDataType.REDIS_LIST); + private static final RedisDataTransformer REDIS_STRING_DATA = + new RedisDataTransformer(RedisDataType.REDIS_STRING); + private static final RedisDataTransformer REDIS_PROTECTED_DATA = + new RedisDataTransformer(RedisDataType.REDIS_PROTECTED); + private static final RedisDataTransformer REDIS_HLL_DATA = + new RedisDataTransformer(RedisDataType.REDIS_HLL); + private static final RedisDataTransformer REDIS_PUBSUB_DATA = + new RedisDataTransformer(RedisDataType.REDIS_PUBSUB); + + /** + * TODO: This method should only exist while we still have data types that have not implemented + * this RedisData interface. Once they all implement it then we can get rid of this. + */ + private RedisData transformType(RedisDataType type) { + switch (type) { + case REDIS_SORTEDSET: + return REDIS_SORTEDSET_DATA; + case REDIS_LIST: + return REDIS_LIST_DATA; + case REDIS_STRING: + return REDIS_STRING_DATA; + case REDIS_PROTECTED: + return REDIS_PROTECTED_DATA; + case REDIS_HLL: + return REDIS_HLL_DATA; + case REDIS_PUBSUB: + return REDIS_PUBSUB_DATA; + case REDIS_HASH: + case REDIS_SET: + throw new IllegalStateException( + type + " should never be added as a type to the data region"); + default: + throw new IllegalStateException("unexpected RedisDataType: " + type); + } } - public boolean unregisterIfType(ByteArrayWrapper key, RedisDataType expectedType) { - return redisMetaRegion.remove(key.toString(), expectedType); + public boolean unregister(ByteArrayWrapper key) { + return this.redisDataRegion.remove(key) != null; } public boolean isRegistered(ByteArrayWrapper key) { - return this.redisMetaRegion.containsKey(key.toString()); + return this.redisDataRegion.containsKey(key); } - public Set<String> keys() { - Set<String> keysWithProtected = this.redisMetaRegion.keySet(); - return keysWithProtected; + public Set<ByteArrayWrapper> keys() { + return this.redisDataRegion.keySet(); } - public Set<Map.Entry<String, RedisDataType>> keyInfos() { - return this.redisMetaRegion.entrySet(); + public Set<Map.Entry<ByteArrayWrapper, RedisData>> keyInfos() { + return this.redisDataRegion.entrySet(); } public int numKeys() { - return this.redisMetaRegion.size() - RedisConstants.NUM_DEFAULT_KEYS; + return this.redisDataRegion.size() - GeodeRedisServer.PROTECTED_KEY_COUNT; } public RedisDataType getType(ByteArrayWrapper key) { - return this.redisMetaRegion.get(key.toString()); + RedisData currentValue = redisDataRegion.get(key); + if (currentValue == null) { + return null; + } + return currentValue.getType(); } /** @@ -74,9 +159,12 @@ public class KeyRegistrar { * @param type Type to check to */ public void validate(ByteArrayWrapper key, RedisDataType type) { - RedisDataType currentType = redisMetaRegion.get(key.toString()); - if (!isValidDataType(currentType, type)) { - throwDataTypeException(key, currentType); + RedisData currentValue = redisDataRegion.get(key); + if (currentValue != null) { + RedisDataType currentType = currentValue.getType(); + if (!isValidDataType(currentType, type)) { + throwDataTypeException(key, currentType); + } } } @@ -86,7 +174,18 @@ public class KeyRegistrar { * @param key Key to check */ public boolean isProtected(ByteArrayWrapper key) { - return RedisDataType.REDIS_PROTECTED.equals(redisMetaRegion.get(key.toString())); + RedisData redisData = redisDataRegion.get(key); + if (redisData == null) { + return false; + } + return RedisDataType.REDIS_PROTECTED.equals(redisData.getType()); + } + + private boolean isValidDataType(RedisData actualData, RedisDataType expectedDataType) { + if (actualData == null) { + return true; + } + return isValidDataType(actualData.getType(), expectedDataType); } private boolean isValidDataType(RedisDataType actualDataType, RedisDataType expectedDataType) { @@ -97,6 +196,10 @@ public class KeyRegistrar { return dataType == null; } + private void throwDataTypeException(ByteArrayWrapper key, RedisData data) { + throwDataTypeException(key, data.getType()); + } + private void throwDataTypeException(ByteArrayWrapper key, RedisDataType dataType) { if (RedisDataType.REDIS_PROTECTED.equals(dataType)) { throw new RedisDataTypeMismatchException("The key name \"" + key + "\" is protected"); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java index 0abd406..9f0365c 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java @@ -17,8 +17,6 @@ package org.apache.geode.redis.internal; public class RedisConstants { - public static final int NUM_DEFAULT_KEYS = 3; - /* * Responses */ diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisData.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisData.java new file mode 100644 index 0000000..a8634c2 --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisData.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.geode.redis.internal; + +import org.apache.geode.DataSerializable; +import org.apache.geode.Delta; + +public interface RedisData extends Delta, DataSerializable { + RedisDataType getType(); +} diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java index 59716ed..98ff1ea 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java @@ -47,10 +47,8 @@ import org.apache.geode.management.internal.cli.result.model.ResultModel; import org.apache.geode.redis.internal.executor.ExpirationExecutor; import org.apache.geode.redis.internal.executor.ListQuery; import org.apache.geode.redis.internal.executor.SortedSetQuery; -import org.apache.geode.redis.internal.executor.hash.RedisHash; import org.apache.geode.redis.internal.executor.hash.RedisHashCommands; import org.apache.geode.redis.internal.executor.hash.RedisHashCommandsFunctionExecutor; -import org.apache.geode.redis.internal.executor.set.RedisSet; import org.apache.geode.redis.internal.executor.set.RedisSetCommands; import org.apache.geode.redis.internal.executor.set.RedisSetCommandsFunctionExecutor; @@ -61,7 +59,7 @@ import org.apache.geode.redis.internal.executor.set.RedisSetCommandsFunctionExec * synchronized, which is done away with and abstracted by this class. */ public class RegionProvider implements Closeable { - private final ConcurrentHashMap<ByteArrayWrapper, Region<Object, Object>> regions; + private final ConcurrentHashMap<ByteArrayWrapper, Region<Object, Object>> dynamicRegions; /** * This is the Redis meta data {@link Region} that holds the {@link RedisDataType} information for @@ -82,8 +80,7 @@ public class RegionProvider implements Closeable { */ private final Region<ByteArrayWrapper, HyperLogLogPlus> hLLRegion; - private final Region<ByteArrayWrapper, RedisHash> hashRegion; - private final Region<ByteArrayWrapper, RedisSet> setRegion; + private final Region<ByteArrayWrapper, RedisData> dataRegion; private final Cache cache; private final QueryService queryService; @@ -94,7 +91,7 @@ public class RegionProvider implements Closeable { private final RegionShortcut defaultRegionType; @Immutable private static final CreateRegionCommand createRegionCmd = new CreateRegionCommand(); - private final ConcurrentHashMap<String, Lock> locks; + private final ConcurrentHashMap<ByteArrayWrapper, Lock> dynamicRegionLocks; @SuppressWarnings("deprecation") public RegionProvider(Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion, @@ -102,11 +99,10 @@ public class RegionProvider implements Closeable { KeyRegistrar redisMetaRegion, ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationsMap, ScheduledExecutorService expirationExecutor, RegionShortcut defaultShortcut, - Region<ByteArrayWrapper, RedisHash> hashRegion, - Region<ByteArrayWrapper, RedisSet> setRegion) { + Region<ByteArrayWrapper, RedisData> dataRegion) { this(stringsRegion, hLLRegion, redisMetaRegion, expirationsMap, expirationExecutor, - defaultShortcut, hashRegion, setRegion, GemFireCacheImpl.getInstance()); + defaultShortcut, dataRegion, GemFireCacheImpl.getInstance()); } public RegionProvider(Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion, @@ -114,14 +110,12 @@ public class RegionProvider implements Closeable { KeyRegistrar redisMetaRegion, ConcurrentMap<ByteArrayWrapper, ScheduledFuture<?>> expirationsMap, ScheduledExecutorService expirationExecutor, RegionShortcut defaultShortcut, - Region<ByteArrayWrapper, RedisHash> hashRegion, - Region<ByteArrayWrapper, RedisSet> setRegion, Cache cache) { + Region<ByteArrayWrapper, RedisData> dataRegion, Cache cache) { if (stringsRegion == null || hLLRegion == null || redisMetaRegion == null) { throw new NullPointerException(); } - this.hashRegion = hashRegion; - this.setRegion = setRegion; - regions = new ConcurrentHashMap<>(); + this.dataRegion = dataRegion; + dynamicRegions = new ConcurrentHashMap<>(); this.stringsRegion = stringsRegion; this.hLLRegion = hLLRegion; keyRegistrar = redisMetaRegion; @@ -130,7 +124,7 @@ public class RegionProvider implements Closeable { this.expirationsMap = expirationsMap; this.expirationExecutor = expirationExecutor; defaultRegionType = defaultShortcut; - locks = new ConcurrentHashMap<>(); + dynamicRegionLocks = new ConcurrentHashMap<>(); } public Region<?, ?> getRegion(ByteArrayWrapper key) { @@ -138,7 +132,7 @@ public class RegionProvider implements Closeable { return null; } - return regions.get(key); + return dynamicRegions.get(key); } @@ -152,11 +146,8 @@ public class RegionProvider implements Closeable { return stringsRegion; case REDIS_HASH: - return hashRegion; - case REDIS_SET: - case REDIS_SORTEDSET: - return setRegion; + return dataRegion; case REDIS_HLL: return hLLRegion; @@ -165,28 +156,34 @@ public class RegionProvider implements Closeable { case REDIS_PUBSUB: case NONE: case REDIS_LIST: + case REDIS_SORTEDSET: default: return null; } } public void removeRegionReferenceLocally(ByteArrayWrapper key, RedisDataType type) { - Lock lock = locks.get(key.toString()); - boolean locked = false; - try { - if (lock != null) { - locked = lock.tryLock(); - } + if (!typeUsesDynamicRegions(type)) { + return; + } + if (getRegion(key) == null) { + return; + } + Lock lock = dynamicRegionLocks.get(key); + if (lock == null) { + return; + } + boolean locked = lock.tryLock(); + if (!locked) { // If we cannot get the lock we ignore this remote event, this key has local event // that started independently, ignore this event to prevent deadlock - if (locked) { - cancelKeyExpiration(key); - removeRegionState(key, type); - } + return; + } + try { + cancelKeyExpiration(key); + removeRegionState(key, type); } finally { - if (locked) { - lock.unlock(); - } + lock.unlock(); } } @@ -199,16 +196,28 @@ public class RegionProvider implements Closeable { return removeKey(key, type, true); } + private boolean typeStoresDataInKeyRegistrar(RedisDataType type) { + if (type == RedisDataType.REDIS_SET) { + return true; + } + if (type == RedisDataType.REDIS_HASH) { + return true; + } + return false; + } + public boolean removeKey(ByteArrayWrapper key, RedisDataType type, boolean cancelExpiration) { if (type == RedisDataType.REDIS_PROTECTED) { return false; } - Lock lock = locks.get(key.toString()); + Lock lock = dynamicRegionLocks.get(key); try { - if (lock != null) {// Strings/hlls will not have locks + if (lock != null) { // only typeUsesDynamicRegions will have a lock lock.lock(); } - keyRegistrar.unregister(key); + if (!typeStoresDataInKeyRegistrar(type)) { + keyRegistrar.unregister(key); + } try { if (type == RedisDataType.REDIS_STRING) { return stringsRegion.remove(key) != null; @@ -218,11 +227,11 @@ public class RegionProvider implements Closeable { return destroyRegion(key, type); } else if (type == RedisDataType.REDIS_SET) { RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(setRegion); + new RedisSetCommandsFunctionExecutor(dataRegion); return redisSetCommands.del(key); } else if (type == RedisDataType.REDIS_HASH) { RedisHashCommands redisHashCommands = - new RedisHashCommandsFunctionExecutor(hashRegion); + new RedisHashCommandsFunctionExecutor(dataRegion); return redisHashCommands.del(key); } else { return false; @@ -236,7 +245,7 @@ public class RegionProvider implements Closeable { removeKeyExpiration(key); } if (lock != null) { - locks.remove(key.toString()); + dynamicRegionLocks.remove(key); } } } finally { @@ -251,50 +260,54 @@ public class RegionProvider implements Closeable { return getOrCreateRegion0(key, type, context, true); } + public boolean typeUsesDynamicRegions(RedisDataType type) { + return type == RedisDataType.REDIS_LIST || type == RedisDataType.REDIS_SORTEDSET; + } + public void createRemoteRegionReferenceLocally(ByteArrayWrapper key, RedisDataType type) { - if (type == null || type == RedisDataType.REDIS_STRING || type == RedisDataType.REDIS_HLL) { + if (!typeUsesDynamicRegions(type)) { return; } - Region<Object, Object> r = regions.get(key); + Region<Object, Object> r = dynamicRegions.get(key); if (r != null) { return; } - if (!regions.containsKey(key)) { - String stringKey = key.toString(); - Lock lock = locks.get(stringKey); + Lock lock = dynamicRegionLocks.get(key); + if (lock == null) { + Lock newLock = new ReentrantLock(); + lock = dynamicRegionLocks.putIfAbsent(key, newLock); if (lock == null) { - locks.putIfAbsent(stringKey, new ReentrantLock()); - lock = locks.get(stringKey); + lock = newLock; } - boolean locked = false; + } + boolean locked = lock.tryLock(); + // If we cannot get the lock then this remote event may have been initialized + // independently on this machine, so if we wait on the lock it is more than + // likely we will deadlock just to do the same task. This event can be ignored + if (locked) { try { - locked = lock.tryLock(); - // If we cannot get the lock then this remote event may have been initialized - // independently on this machine, so if we wait on the lock it is more than - // likely we will deadlock just to do the same task. This event can be ignored - if (locked) { - r = cache.getRegion(key.toString()); - // If r is null, this implies that we are after a create/destroy - // simply ignore. Calls to getRegion or getOrCreate will work correctly - if (r == null) { - return; - } + r = cache.getRegion(key.toString()); + // If r is null, this implies that we are after a create/destroy + // simply ignore. Calls to getRegion or getOrCreate will work correctly + if (r == null) { + // TODO: one caller of this method only calls it if getRegion returned null. It was + // expecting us to create it locally. If someone else will create it locally then this + // method does not need to be called. + return; + } - if (type == RedisDataType.REDIS_LIST) { - doInitializeList(key, r); - } else if (type == RedisDataType.REDIS_SORTEDSET) { - try { - doInitializeSortedSet(key, r); - } catch (RegionNotFoundException | IndexInvalidException e) { - // ignore - } + if (type == RedisDataType.REDIS_LIST) { + doInitializeList(key, r); + } else if (type == RedisDataType.REDIS_SORTEDSET) { + try { + doInitializeSortedSet(key, r); + } catch (RegionNotFoundException | IndexInvalidException e) { + // ignore } - regions.put(key, r); } + dynamicRegions.put(key, r); } finally { - if (locked) { - lock.unlock(); - } + lock.unlock(); } } } @@ -302,24 +315,25 @@ public class RegionProvider implements Closeable { private Region<?, ?> getOrCreateRegion0(ByteArrayWrapper key, RedisDataType type, ExecutionHandlerContext context, boolean addToMeta) { - String regionName = key.toString(); keyRegistrar.validate(key, type); - Region<Object, Object> r = regions.get(key); + Region<Object, Object> r = dynamicRegions.get(key); if (r != null && r.isDestroyed()) { removeKey(key, type); r = null; } if (r == null) { - String stringKey = key.toString(); - Lock lock = locks.get(stringKey); + Lock lock = dynamicRegionLocks.get(key); if (lock == null) { - locks.putIfAbsent(stringKey, new ReentrantLock()); - lock = locks.get(stringKey); + Lock newLock = new ReentrantLock(); + lock = dynamicRegionLocks.putIfAbsent(key, newLock); + if (lock == null) { + lock = newLock; + } } + lock.lock(); try { - lock.lock(); - r = regions.get(key); + r = dynamicRegions.get(key); if (r == null) { boolean hasTransaction = context != null && context.hasTransaction(); // Can create // without context @@ -334,7 +348,7 @@ public class RegionProvider implements Closeable { do { concurrentCreateDestroyException = null; - r = createRegionGlobally(regionName); + r = createRegionGlobally(key.toString()); try { if (type == RedisDataType.REDIS_LIST) { @@ -352,7 +366,7 @@ public class RegionProvider implements Closeable { } } } while (concurrentCreateDestroyException != null); - regions.put(key, r); + dynamicRegions.put(key, r); if (addToMeta) { keyRegistrar.register(key, type); } @@ -370,14 +384,14 @@ public class RegionProvider implements Closeable { } /** - * SYNCHRONIZE EXTERNALLY OF this.locks.get(key.toString())!!!!! + * SYNCHRONIZE EXTERNALLY OF this.locks.get(key)!!!!! * * @param key Key of region to destroy * @param type Type of region to destroyu * @return Flag if destroyed */ private boolean destroyRegion(ByteArrayWrapper key, RedisDataType type) { - Region<?, ?> r = regions.get(key); + Region<?, ?> r = dynamicRegions.get(key); if (r != null) { try { r.destroyRegion(); @@ -398,7 +412,7 @@ public class RegionProvider implements Closeable { */ private void removeRegionState(ByteArrayWrapper key, RedisDataType type) { preparedQueries.remove(key); - regions.remove(key); + dynamicRegions.remove(key); } private void doInitializeSortedSet(ByteArrayWrapper key, Region<?, ?> r) @@ -472,25 +486,15 @@ public class RegionProvider implements Closeable { } public boolean regionExists(ByteArrayWrapper key) { - return regions.containsKey(key); + return dynamicRegions.containsKey(key); } public Region<ByteArrayWrapper, ByteArrayWrapper> getStringsRegion() { return stringsRegion; } - /** - * @return the hashRegion - */ - public Region<ByteArrayWrapper, RedisHash> getHashRegion() { - return hashRegion; - } - - /** - * @return the setRegion - */ - public Region<ByteArrayWrapper, RedisSet> getSetRegion() { - return setRegion; + public Region<ByteArrayWrapper, RedisData> getDataRegion() { + return dataRegion; } public Region<ByteArrayWrapper, HyperLogLogPlus> gethLLRegion() { @@ -592,7 +596,7 @@ public class RegionProvider implements Closeable { public String dumpRegionsCache() { StringBuilder builder = new StringBuilder(); - for (Entry<ByteArrayWrapper, Region<Object, Object>> e : regions.entrySet()) { + for (Entry<ByteArrayWrapper, Region<Object, Object>> e : dynamicRegions.entrySet()) { builder.append(e.getKey()).append(" --> {").append(e.getValue()).append("}\n"); } return builder.toString(); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java index 90ff601..6d37582 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java @@ -20,13 +20,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.geode.cache.Region; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.RedisCommandType; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RedisDataType; import org.apache.geode.redis.internal.executor.set.RedisSetInRegion; import org.apache.geode.redis.internal.executor.set.SingleResultCollector; @@ -45,10 +45,9 @@ public class CommandFunction extends SingleResultRedisFunction { FunctionService.registerFunction(new CommandFunction(stripedExecutor)); } - @SuppressWarnings("unchecked") public static <T> T execute(RedisCommandType command, ByteArrayWrapper key, - Object commandArguments, Region region) { + Object commandArguments, Region<ByteArrayWrapper, RedisData> region) { SingleResultCollector<T> rc = new SingleResultCollector<>(); FunctionService .onRegion(region) @@ -82,11 +81,7 @@ public class CommandFunction extends SingleResultRedisFunction { } case SREM: { ArrayList<ByteArrayWrapper> membersToRemove = (ArrayList<ByteArrayWrapper>) args[1]; - callable = () -> { - AtomicBoolean setWasDeleted = new AtomicBoolean(); - long srem = new RedisSetInRegion(localRegion).srem(key, membersToRemove, setWasDeleted); - return new Object[] {srem, setWasDeleted.get()}; - }; + callable = () -> new RedisSetInRegion(localRegion).srem(key, membersToRemove); break; } case DEL: @@ -145,7 +140,6 @@ public class CommandFunction extends SingleResultRedisFunction { } - @SuppressWarnings("unchecked") private Callable<Object> executeDel(ByteArrayWrapper key, Region localRegion, RedisDataType delType) { switch (delType) { diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/EmptyRedisHash.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/EmptyRedisHash.java index acf4e5f..79b9205 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/EmptyRedisHash.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/EmptyRedisHash.java @@ -22,17 +22,18 @@ import java.util.List; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.executor.hash.RedisHash; public class EmptyRedisHash extends RedisHash { @Override - public synchronized int hset(Region<ByteArrayWrapper, RedisHash> region, ByteArrayWrapper key, + public synchronized int hset(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToSet, boolean nx) { throw new UnsupportedOperationException(); } @Override - public synchronized int hdel(Region<ByteArrayWrapper, RedisHash> region, ByteArrayWrapper key, + public synchronized int hdel(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToRemove) { return 0; } @@ -41,4 +42,25 @@ public class EmptyRedisHash extends RedisHash { public synchronized Collection<ByteArrayWrapper> hgetall() { return emptyList(); } + + @Override + public synchronized boolean isEmpty() { + return true; + } + + @Override + public synchronized boolean containsKey(ByteArrayWrapper field) { + return false; + } + + @Override + public synchronized ByteArrayWrapper get(ByteArrayWrapper field) { + return null; + } + + @Override + public synchronized int size() { + return 0; + } + } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/FlushAllExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/FlushAllExecutor.java index 71712ec..2ca7fbd 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/FlushAllExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/FlushAllExecutor.java @@ -18,9 +18,11 @@ import java.util.Map.Entry; import org.apache.geode.cache.EntryDestroyedException; import org.apache.geode.cache.UnsupportedOperationInTransactionException; +import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RedisDataType; public class FlushAllExecutor extends AbstractExecutor { @@ -31,11 +33,11 @@ public class FlushAllExecutor extends AbstractExecutor { throw new UnsupportedOperationInTransactionException(); } - for (Entry<String, RedisDataType> e : context.getKeyRegistrar().keyInfos()) { + for (Entry<ByteArrayWrapper, RedisData> e : context.getKeyRegistrar().keyInfos()) { try { - String skey = e.getKey(); - RedisDataType type = e.getValue(); - removeEntry(Coder.stringToByteWrapper(skey), type, context); + ByteArrayWrapper skey = e.getKey(); + RedisDataType type = e.getValue().getType(); + removeEntry(skey, type, context); } catch (EntryDestroyedException e1) { continue; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/KeysExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/KeysExecutor.java index f4547ea..bf72668 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/KeysExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/KeysExecutor.java @@ -20,6 +20,7 @@ import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; +import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; @@ -39,7 +40,7 @@ public class KeysExecutor extends AbstractExecutor { } String glob = Coder.bytesToString(commandElems.get(1)); - Set<String> allKeys = context.getKeyRegistrar().keys(); + Set<ByteArrayWrapper> allKeys = context.getKeyRegistrar().keys(); List<String> matchingKeys = new ArrayList<String>(); Pattern pattern; @@ -51,10 +52,14 @@ public class KeysExecutor extends AbstractExecutor { return; } - for (String key : allKeys) { - if (!(key.equals(GeodeRedisServer.REDIS_META_DATA_REGION) - || key.equals(GeodeRedisServer.STRING_REGION) || key.equals(GeodeRedisServer.HLL_REGION)) - && pattern.matcher(key).matches()) { + for (ByteArrayWrapper bytesKey : allKeys) { + String key = bytesKey.toString(); + if (key.equals(GeodeRedisServer.REDIS_DATA_REGION) + || key.equals(GeodeRedisServer.STRING_REGION) + || key.equals(GeodeRedisServer.HLL_REGION)) { + continue; + } + if (pattern.matcher(key).matches()) { matchingKeys.add(key); } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisHashInRegion.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisHashInRegion.java index 359a3c5..0e3f8d4 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisHashInRegion.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisHashInRegion.java @@ -16,35 +16,40 @@ package org.apache.geode.redis.internal.executor; +import static org.apache.geode.redis.internal.RedisDataType.REDIS_HASH; + import java.util.Collection; import java.util.List; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.RedisConstants; +import org.apache.geode.redis.internal.RedisData; +import org.apache.geode.redis.internal.RedisDataTypeMismatchException; import org.apache.geode.redis.internal.executor.hash.RedisHash; import org.apache.geode.redis.internal.executor.hash.RedisHashCommands; public class RedisHashInRegion implements RedisHashCommands { - private final Region<ByteArrayWrapper, RedisHash> localRegion; + private final Region<ByteArrayWrapper, RedisData> region; - public RedisHashInRegion(Region<ByteArrayWrapper, RedisHash> localRegion) { - this.localRegion = localRegion; + public RedisHashInRegion(Region<ByteArrayWrapper, RedisData> region) { + this.region = region; } @Override public int hset(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToSet, boolean NX) { - RedisHash hash = localRegion.get(key); + RedisHash hash = checkType(region.get(key)); if (hash != null) { - return hash.hset(localRegion, key, fieldsToSet, NX); + return hash.hset(region, key, fieldsToSet, NX); } else { - localRegion.put(key, new RedisHash(fieldsToSet)); + region.put(key, new RedisHash(fieldsToSet)); return fieldsToSet.size() / 2; } } @Override public int hdel(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToRemove) { - return getRedisHash(key).hdel(localRegion, key, fieldsToRemove); + return getRedisHash(key).hdel(region, key, fieldsToRemove); } @Override @@ -54,10 +59,21 @@ public class RedisHashInRegion implements RedisHashCommands { @Override public boolean del(ByteArrayWrapper key) { - return localRegion.remove(key) != null; + return region.remove(key) != null; } private RedisHash getRedisHash(ByteArrayWrapper key) { - return localRegion.getOrDefault(key, RedisHash.EMPTY); + return checkType(region.getOrDefault(key, RedisHash.EMPTY)); + } + + public static RedisHash checkType(RedisData redisData) { + if (redisData == null) { + return null; + } + if (redisData.getType() != REDIS_HASH) { + throw new RedisDataTypeMismatchException(RedisConstants.ERROR_WRONG_TYPE); + } + return (RedisHash) redisData; } + } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RenameExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RenameExecutor.java index d3c2a20..80a6b2e 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RenameExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RenameExecutor.java @@ -15,7 +15,10 @@ package org.apache.geode.redis.internal.executor; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.AutoCloseableLock; @@ -25,6 +28,10 @@ import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; import org.apache.geode.redis.internal.RedisConstants; import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.executor.hash.RedisHashCommands; +import org.apache.geode.redis.internal.executor.hash.RedisHashCommandsFunctionExecutor; +import org.apache.geode.redis.internal.executor.set.RedisSetCommands; +import org.apache.geode.redis.internal.executor.set.RedisSetCommandsFunctionExecutor; import org.apache.geode.redis.internal.executor.string.StringExecutor; public class RenameExecutor extends StringExecutor { @@ -40,21 +47,19 @@ public class RenameExecutor extends StringExecutor { ByteArrayWrapper key = command.getKey(); ByteArrayWrapper newKey = new ByteArrayWrapper(commandElems.get(2)); - if (!context.getKeyRegistrar().isRegistered(key)) { - command.setResponse( - Coder.getErrorResponse(context.getByteBufAllocator(), RedisConstants.ERROR_NO_SUCH_KEY)); - return; - } - try (@SuppressWarnings("unused") AutoCloseableLock lockForOldKey = context.getLockService().lock(key)) { try (@SuppressWarnings("unused") AutoCloseableLock lockForNewKey = context.getLockService().lock(newKey)) { RedisDataType redisDataType = context.getKeyRegistrar().getType(key); + if (redisDataType == null) { + command.setResponse( + Coder.getErrorResponse(context.getByteBufAllocator(), + RedisConstants.ERROR_NO_SUCH_KEY)); + return; + } switch (redisDataType) { case REDIS_STRING: - case REDIS_HASH: - case REDIS_SET: @SuppressWarnings("unchecked") Region<ByteArrayWrapper, Object> region = (Region<ByteArrayWrapper, Object>) context.getRegionProvider() @@ -64,6 +69,24 @@ public class RenameExecutor extends StringExecutor { region.put(newKey, value); removeEntry(key, redisDataType, context); break; + case REDIS_HASH: + // TODO this all needs to be done atomically. Add RENAME support to RedisHashCommands + RedisHashCommands redisHashCommands = + new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); + Collection<ByteArrayWrapper> fieldsAndValues = redisHashCommands.hgetall(key); + redisHashCommands.del(key); + redisHashCommands.del(newKey); + redisHashCommands.hset(newKey, new ArrayList<>(fieldsAndValues), false); + break; + case REDIS_SET: + // TODO this all needs to be done atomically. Add RENAME support to RedisSetCommands + RedisSetCommands redisSetCommands = + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); + Set<ByteArrayWrapper> members = redisSetCommands.smembers(key); + redisSetCommands.del(key); + redisSetCommands.del(newKey); + redisSetCommands.sadd(newKey, new ArrayList<>(members)); + break; case REDIS_LIST: throw new RuntimeException("Renaming List isn't supported"); case REDIS_SORTEDSET: diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/ScanExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/ScanExecutor.java index a092c28..1193e20 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/ScanExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/ScanExecutor.java @@ -113,9 +113,9 @@ public class ScanExecutor extends AbstractScanExecutor { int numElements = 0; int i = -1; for (String key : (Collection<String>) list) { - if (key.equals(GeodeRedisServer.REDIS_META_DATA_REGION) - || key.equals(GeodeRedisServer.STRING_REGION) || key - .equals(GeodeRedisServer.HLL_REGION)) { + if (key.equals(GeodeRedisServer.REDIS_DATA_REGION) + || key.equals(GeodeRedisServer.STRING_REGION) + || key.equals(GeodeRedisServer.HLL_REGION)) { continue; } i++; diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java index 401e47e..4f41e10 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java @@ -21,7 +21,6 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; /** * <pre> @@ -47,17 +46,11 @@ public class HDelExecutor extends HashExecutor { List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers(); ByteArrayWrapper key = command.getKey(); - checkDataType(key, RedisDataType.REDIS_HASH, context); RedisHashCommands redisHashCommands = - new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getHashRegion()); + new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); ArrayList<ByteArrayWrapper> fieldsToDelete = new ArrayList<>(commandElems.subList(2, commandElems.size())); int numDeleted = redisHashCommands.hdel(key, fieldsToDelete); - if (numDeleted != 0) { - if (!context.getRegionProvider().getHashRegion().containsKey(key)) { - context.getKeyRegistrar().unregisterIfType(key, RedisDataType.REDIS_HASH); - } - } command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), numDeleted)); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java index 1c378ac..b00ac51 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java @@ -16,8 +16,6 @@ package org.apache.geode.redis.internal.executor.hash; import java.util.List; -import org.apache.geode.cache.TimeoutException; -import org.apache.geode.redis.internal.AutoCloseableLock; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; @@ -49,23 +47,11 @@ public class HExistsExecutor extends HashExecutor { public void executeCommand(Command command, ExecutionHandlerContext context) { List<byte[]> commandElems = command.getProcessedCommand(); - boolean hasField; byte[] byteField = commandElems.get(FIELD_INDEX); ByteArrayWrapper field = new ByteArrayWrapper(byteField); ByteArrayWrapper key = command.getKey(); - try (AutoCloseableLock regionLock = withRegionLock(context, key)) { - RedisHash map = getMap(context, key); - hasField = map.containsKey(field); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - command.setResponse( - Coder.getErrorResponse(context.getByteBufAllocator(), "Thread interrupted.")); - return; - } catch (TimeoutException e) { - command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), - "Timeout acquiring lock. Please try again.")); - return; - } + RedisHash map = getRedisHash(context, key); + boolean hasField = map.containsKey(field); if (hasField) { command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), EXISTS)); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java index 6240528..c4406ec 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java @@ -48,7 +48,7 @@ public class HGetAllExecutor extends HashExecutor { public void executeCommand(Command command, ExecutionHandlerContext context) { ByteArrayWrapper key = command.getKey(); RedisHashCommands redisHashCommands = - new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getHashRegion()); + new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); Collection<ByteArrayWrapper> fieldsAndValues = redisHashCommands.hgetall(key); try { command.setResponse(Coder.getArrayResponse(context.getByteBufAllocator(), fieldsAndValues)); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java index 7825d02..688265f 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java @@ -47,8 +47,8 @@ public class HGetExecutor extends HashExecutor { ByteArrayWrapper key = command.getKey(); - RedisHash entry = getMap(context, key); - ByteArrayWrapper valueWrapper = entry.get(field); + RedisHash redisHash = getRedisHash(context, key); + ByteArrayWrapper valueWrapper = redisHash.get(field); try { if (valueWrapper != null) { command.setResponse( diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java index 3c2bb95..5acc6c9 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java @@ -77,7 +77,7 @@ public class HIncrByExecutor extends HashExecutor { long value; try (AutoCloseableLock regionLock = withRegionLock(context, key)) { - RedisHash map = getMap(context, key); + RedisHash redisHash = getModifiableRedisHash(context, key); byte[] byteField = commandElems.get(FIELD_INDEX); ByteArrayWrapper field = new ByteArrayWrapper(byteField); @@ -86,12 +86,13 @@ public class HIncrByExecutor extends HashExecutor { * Put increment as value if field doesn't exist */ - ByteArrayWrapper oldValue = map.get(field); + ByteArrayWrapper oldValue = redisHash.get(field); if (oldValue == null) { - map.put(field, new ByteArrayWrapper(incrArray)); + ByteArrayWrapper newValue = new ByteArrayWrapper(incrArray); + redisHash.put(field, newValue); - saveMap(map, context, key); + saveRedishHash(redisHash, context, key); command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), increment)); @@ -114,15 +115,16 @@ public class HIncrByExecutor extends HashExecutor { */ if ((value >= 0 && increment > (Long.MAX_VALUE - value)) || (value <= 0 && increment < (Long.MIN_VALUE - value))) { - command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_OVERFLOW)); + command + .setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_OVERFLOW)); return; } value += increment; - map.put(field, new ByteArrayWrapper(Coder.longToBytes(value))); + redisHash.put(field, new ByteArrayWrapper(Coder.longToBytes(value))); - saveMap(map, context, key); + saveRedishHash(redisHash, context, key); } catch (InterruptedException e) { Thread.currentThread().interrupt(); command.setResponse( diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java index e416f03..c6ac788 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java @@ -76,7 +76,7 @@ public class HIncrByFloatExecutor extends HashExecutor { double value; try (AutoCloseableLock regionLock = withRegionLock(context, key)) { - RedisHash map = getMap(context, key); + RedisHash redisHash = getModifiableRedisHash(context, key); byte[] byteField = commandElems.get(FIELD_INDEX); ByteArrayWrapper field = new ByteArrayWrapper(byteField); @@ -85,12 +85,13 @@ public class HIncrByFloatExecutor extends HashExecutor { * Put increment as value if field doesn't exist */ - ByteArrayWrapper oldValue = map.get(field); + ByteArrayWrapper oldValue = redisHash.get(field); if (oldValue == null) { - map.put(field, new ByteArrayWrapper(incrArray)); + ByteArrayWrapper newValue = new ByteArrayWrapper(incrArray); + redisHash.put(field, newValue); - this.saveMap(map, context, key); + saveRedishHash(redisHash, context, key); respondBulkStrings(command, context, increment); return; @@ -115,9 +116,9 @@ public class HIncrByFloatExecutor extends HashExecutor { } value += increment; - map.put(field, new ByteArrayWrapper(Coder.doubleToBytes(value))); + redisHash.put(field, new ByteArrayWrapper(Coder.doubleToBytes(value))); - this.saveMap(map, context, key); + saveRedishHash(redisHash, context, key); } catch (InterruptedException e) { Thread.currentThread().interrupt(); command.setResponse( diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java index 306a417..732332c 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java @@ -14,10 +14,7 @@ */ package org.apache.geode.redis.internal.executor.hash; -import java.util.List; -import org.apache.geode.cache.TimeoutException; -import org.apache.geode.redis.internal.AutoCloseableLock; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; @@ -46,26 +43,12 @@ public class HKeysExecutor extends HashExecutor { @Override public void executeCommand(Command command, ExecutionHandlerContext context) { ByteArrayWrapper key = command.getKey(); - List<ByteArrayWrapper> keys; - try (AutoCloseableLock regionLock = withRegionLock(context, key)) { - RedisHash keyMap = getMap(context, key); - keys = keyMap.keys(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - command.setResponse( - Coder.getErrorResponse(context.getByteBufAllocator(), "Thread interrupted.")); - return; - } catch (TimeoutException e) { - command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), - "Timeout acquiring lock. Please try again.")); - return; - } - - if (keys.isEmpty()) { + RedisHash keyMap = getRedisHash(context, key); + if (keyMap.isEmpty()) { command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); return; } - respondBulkStrings(command, context, keys); + respondBulkStrings(command, context, keyMap.keys()); } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java index 9a89311..85fb952 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java @@ -15,8 +15,6 @@ package org.apache.geode.redis.internal.executor.hash; -import org.apache.geode.cache.TimeoutException; -import org.apache.geode.redis.internal.AutoCloseableLock; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; @@ -40,23 +38,8 @@ public class HLenExecutor extends HashExecutor { @Override public void executeCommand(Command command, ExecutionHandlerContext context) { ByteArrayWrapper key = command.getKey(); - final int size; - - try (AutoCloseableLock regionLock = withRegionLock(context, key)) { - RedisHash map = getMap(context, key); - size = map.size(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - command.setResponse( - Coder.getErrorResponse(context.getByteBufAllocator(), "Thread interrupted.")); - return; - } catch (TimeoutException e) { - command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), - "Timeout acquiring lock. Please try again.")); - return; - } - - command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), size)); + RedisHash map = getRedisHash(context, key); + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), map.size())); } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java index cabf567..6c77b28 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java @@ -20,7 +20,6 @@ import java.util.List; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; /** * <pre> @@ -48,9 +47,7 @@ public class HMGetExecutor extends HashExecutor { ByteArrayWrapper key = command.getKey(); - RedisHash map = getMap(context, key); - - checkDataType(key, RedisDataType.REDIS_HASH, context); + RedisHash map = getRedisHash(context, key); ArrayList<ByteArrayWrapper> fields = new ArrayList<ByteArrayWrapper>(); for (int i = 2; i < commandElems.size(); i++) { diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java index f752a4a..ac0d990 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java @@ -21,7 +21,6 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; /** * <pre> @@ -52,9 +51,8 @@ public class HMSetExecutor extends HashExecutor { List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers(); ByteArrayWrapper key = command.getKey(); - context.getKeyRegistrar().register(key, RedisDataType.REDIS_HASH); RedisHashCommands redisHashCommands = - new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getHashRegion()); + new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); ArrayList<ByteArrayWrapper> fieldsToSet = new ArrayList<>(commandElems.subList(2, commandElems.size())); redisHashCommands.hset(key, fieldsToSet, false); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java index 0c738cf..4c47e29 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java @@ -14,6 +14,8 @@ */ package org.apache.geode.redis.internal.executor.hash; +import static org.apache.geode.redis.internal.executor.RedisHashInRegion.checkType; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -40,10 +42,9 @@ public class HScanExecutor extends AbstractScanExecutor { ByteArrayWrapper key = command.getKey(); - RedisHash hash = - context.getRegionProvider().getHashRegion().get(key); + RedisHash hash = checkType(context.getRegionProvider().getDataRegion().get(key)); - if (hash == null || hash.isEmpty()) { + if (hash.isEmpty()) { command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), ERROR_CURSOR)); return; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java index 3749b4b..565a876 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java @@ -21,7 +21,6 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; /** * <pre> @@ -45,10 +44,9 @@ public class HSetExecutor extends HashExecutor { List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers(); ByteArrayWrapper key = command.getKey(); - context.getKeyRegistrar().register(key, RedisDataType.REDIS_HASH); RedisHashCommands redisHashCommands = - new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getHashRegion()); + new RedisHashCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); ArrayList<ByteArrayWrapper> fieldsToSet = new ArrayList<>(commandElems.subList(2, commandElems.size())); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java index 6d26658..5993220 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java @@ -14,13 +14,11 @@ */ package org.apache.geode.redis.internal.executor.hash; -import java.util.Collection; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; /** * <pre> @@ -51,23 +49,15 @@ public class HValsExecutor extends HashExecutor { @Override public void executeCommand(Command command, ExecutionHandlerContext context) { ByteArrayWrapper key = command.getKey(); - checkDataType(key, RedisDataType.REDIS_HASH, context); - RedisHash map = - context.getRegionProvider().getHashRegion().get(key); + RedisHash map = getRedisHash(context, key); - if (map == null) { + if (map.isEmpty()) { command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); return; } - Collection<ByteArrayWrapper> vals = map.values(); - if (vals.isEmpty()) { - command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator())); - return; - } - - respondBulkStrings(command, context, vals); + respondBulkStrings(command, context, map.values()); } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java index ae40357..7deaf4f 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HashExecutor.java @@ -21,10 +21,11 @@ import org.apache.geode.cache.TimeoutException; import org.apache.geode.redis.internal.AutoCloseableLock; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RedisLockService; import org.apache.geode.redis.internal.RegionProvider; import org.apache.geode.redis.internal.executor.AbstractExecutor; +import org.apache.geode.redis.internal.executor.RedisHashInRegion; /** * Executor for handling HASH datatypes @@ -41,17 +42,28 @@ public abstract class HashExecutor extends AbstractExecutor { * @param key the region hash key region:<key> * @return the map data */ - protected RedisHash getMap(ExecutionHandlerContext context, + protected RedisHash getRedisHash(ExecutionHandlerContext context, ByteArrayWrapper key) { - Region<ByteArrayWrapper, RedisHash> region = - context.getRegionProvider().getHashRegion(); + Region<ByteArrayWrapper, RedisData> region = + context.getRegionProvider().getDataRegion(); - RedisHash map = region.get(key); - if (map == null) { - map = new RedisHash(emptyList()); + RedisData data = region.get(key); + if (data == null) { + return RedisHash.EMPTY; } + return RedisHashInRegion.checkType(data); + } + + protected RedisHash getModifiableRedisHash(ExecutionHandlerContext context, + ByteArrayWrapper key) { + Region<ByteArrayWrapper, RedisData> region = + context.getRegionProvider().getDataRegion(); - return map; + RedisData data = region.get(key); + if (data == null) { + return new RedisHash(emptyList()); + } + return RedisHashInRegion.checkType(data); } protected AutoCloseableLock withRegionLock(ExecutionHandlerContext context, ByteArrayWrapper key) @@ -63,22 +75,22 @@ public abstract class HashExecutor extends AbstractExecutor { /** - * Save the map information to a region + * Save the redisHash information to a region * - * @param map the map to save + * @param redisHash the redisHash to save * @param context the execution handler context * @param key the raw HASH key */ - protected void saveMap(RedisHash map, - ExecutionHandlerContext context, ByteArrayWrapper key) { + protected void saveRedishHash(RedisHash redisHash, + ExecutionHandlerContext context, + ByteArrayWrapper key) { - if (map == null) { + if (redisHash == null) { return; } RegionProvider rp = context.getRegionProvider(); - rp.getHashRegion().put(key, map); - context.getKeyRegistrar().register(key, RedisDataType.REDIS_HASH); + rp.getDataRegion().put(key, redisHash); } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHash.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHash.java index 706bf15..c4ece14 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHash.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHash.java @@ -26,15 +26,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.geode.DataSerializable; import org.apache.geode.DataSerializer; -import org.apache.geode.Delta; import org.apache.geode.InvalidDeltaException; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.RedisData; +import org.apache.geode.redis.internal.RedisDataType; import org.apache.geode.redis.internal.executor.EmptyRedisHash; -public class RedisHash implements Delta, DataSerializable { +public class RedisHash implements RedisData { public static final RedisHash EMPTY = new EmptyRedisHash(); private HashMap<ByteArrayWrapper, ByteArrayWrapper> hash; /** @@ -101,7 +101,7 @@ public class RedisHash implements Delta, DataSerializable { } } - public synchronized int hset(Region<ByteArrayWrapper, RedisHash> region, ByteArrayWrapper key, + public synchronized int hset(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToSet, boolean nx) { int fieldsAdded = 0; Iterator<ByteArrayWrapper> iterator = fieldsToSet.iterator(); @@ -127,7 +127,7 @@ public class RedisHash implements Delta, DataSerializable { return fieldsAdded; } - public synchronized int hdel(Region<ByteArrayWrapper, RedisHash> region, ByteArrayWrapper key, + public synchronized int hdel(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToRemove) { int fieldsRemoved = 0; for (ByteArrayWrapper fieldToRemove : fieldsToRemove) { @@ -152,7 +152,7 @@ public class RedisHash implements Delta, DataSerializable { return result; } - private void storeChanges(Region<ByteArrayWrapper, RedisHash> region, ByteArrayWrapper key, + private void storeChanges(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, boolean doingAdds) { if (hasDelta()) { if (!doingAdds && hash.isEmpty()) { @@ -201,4 +201,9 @@ public class RedisHash implements Delta, DataSerializable { public synchronized boolean containsKey(ByteArrayWrapper field) { return hash.containsKey(field); } + + @Override + public RedisDataType getType() { + return RedisDataType.REDIS_HASH; + } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionExecutor.java index f1c8e17..7340482 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionExecutor.java @@ -25,15 +25,16 @@ import java.util.List; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RedisDataType; import org.apache.geode.redis.internal.executor.CommandFunction; @SuppressWarnings("unchecked") public class RedisHashCommandsFunctionExecutor implements RedisHashCommands { - private final Region<ByteArrayWrapper, RedisHash> region; + private final Region<ByteArrayWrapper, RedisData> region; - public RedisHashCommandsFunctionExecutor(Region region) { + public RedisHashCommandsFunctionExecutor(Region<ByteArrayWrapper, RedisData> region) { this.region = region; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/EmptyRedisSet.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/EmptyRedisSet.java index 396aaa7..284b700 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/EmptyRedisSet.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/EmptyRedisSet.java @@ -22,11 +22,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.RedisData; class EmptyRedisSet extends RedisSet { @@ -36,7 +36,7 @@ class EmptyRedisSet extends RedisSet { } @Override - synchronized Collection<ByteArrayWrapper> spop(Region<ByteArrayWrapper, RedisSet> region, + synchronized Collection<ByteArrayWrapper> spop(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, int popCount) { return emptyList(); } @@ -58,14 +58,13 @@ class EmptyRedisSet extends RedisSet { @Override synchronized long sadd(ArrayList<ByteArrayWrapper> membersToAdd, - Region<ByteArrayWrapper, RedisSet> region, ByteArrayWrapper key) { + Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) { throw new UnsupportedOperationException(); } @Override synchronized long srem(ArrayList<ByteArrayWrapper> membersToRemove, - Region<ByteArrayWrapper, RedisSet> region, ByteArrayWrapper key, - AtomicBoolean setWasDeleted) { + Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) { return 0; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSet.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSet.java index a05bd96..4804b39 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSet.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSet.java @@ -16,6 +16,7 @@ package org.apache.geode.redis.internal.executor.set; import static java.util.Collections.emptyList; +import static org.apache.geode.redis.internal.RedisDataType.REDIS_SET; import java.io.DataInput; import java.io.DataOutput; @@ -26,16 +27,15 @@ import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; -import org.apache.geode.DataSerializable; import org.apache.geode.DataSerializer; -import org.apache.geode.Delta; import org.apache.geode.InvalidDeltaException; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; +import org.apache.geode.redis.internal.RedisData; +import org.apache.geode.redis.internal.RedisDataType; /** * This class still uses "synchronized" to protect the @@ -44,7 +44,7 @@ import org.apache.geode.redis.internal.Coder; * class can be removed once readers are changed to * also use the {@link SynchronizedStripedExecutor}. */ -public class RedisSet implements Delta, DataSerializable { +public class RedisSet implements RedisData { public static transient RedisSet EMPTY = new EmptyRedisSet(); private HashSet<ByteArrayWrapper> members; @@ -65,6 +65,7 @@ public class RedisSet implements Delta, DataSerializable { public RedisSet() {} synchronized List<Object> sscan(Pattern matchPattern, int count, int cursor) { + List<Object> returnList = new ArrayList<>(); int size = members.size(); int beforeCursor = 0; @@ -100,14 +101,13 @@ public class RedisSet implements Delta, DataSerializable { } synchronized Collection<ByteArrayWrapper> spop( - Region<ByteArrayWrapper, RedisSet> region, ByteArrayWrapper key, int popCount) { + Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, int popCount) { int originalSize = scard(); if (originalSize == 0) { return emptyList(); } if (popCount >= originalSize) { - // TODO: need to also cause key to be removed from the metaregion region.remove(key, this); return this.members; } @@ -218,10 +218,10 @@ public class RedisSet implements Delta, DataSerializable { * modified by this call * @param region the region this instance is stored in * @param key the name of the set to add to - * @return the number of members actually added; -1 if concurrent modification + * @return the number of members actually added */ synchronized long sadd(ArrayList<ByteArrayWrapper> membersToAdd, - Region<ByteArrayWrapper, RedisSet> region, + Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) { membersToAdd.removeIf(memberToAdd -> !members.add(memberToAdd)); @@ -243,21 +243,17 @@ public class RedisSet implements Delta, DataSerializable { * modified by this call * @param region the region this instance is stored in * @param key the name of the set to remove from - * @param setWasDeleted set to true if this method deletes the set - * @return the number of members actually removed; -1 if concurrent modification + * @return the number of members actually removed */ synchronized long srem(ArrayList<ByteArrayWrapper> membersToRemove, - Region<ByteArrayWrapper, RedisSet> region, - ByteArrayWrapper key, AtomicBoolean setWasDeleted) { + Region<ByteArrayWrapper, RedisData> region, + ByteArrayWrapper key) { membersToRemove.removeIf(memberToRemove -> !members.remove(memberToRemove)); int membersRemoved = membersToRemove.size(); if (membersRemoved != 0) { if (members.isEmpty()) { region.remove(key); - if (setWasDeleted != null) { - setWasDeleted.set(true); - } } else { deltasAreAdds = false; deltas = membersToRemove; @@ -280,4 +276,9 @@ public class RedisSet implements Delta, DataSerializable { synchronized Set<ByteArrayWrapper> smembers() { return new HashSet<>(members); } + + @Override + public RedisDataType getType() { + return REDIS_SET; + } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java index de29ff0..53f06e5 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.geode.redis.internal.ByteArrayWrapper; @@ -28,8 +27,7 @@ public interface RedisSetCommands { long sadd(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToAdd); - long srem(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToAdd, - AtomicBoolean setWasDeleted); + long srem(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToRemove); Set<ByteArrayWrapper> smembers(ByteArrayWrapper key); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionExecutor.java index 53c0d6c..97a456c 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionExecutor.java @@ -29,19 +29,19 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RedisDataType; import org.apache.geode.redis.internal.executor.CommandFunction; public class RedisSetCommandsFunctionExecutor implements RedisSetCommands { - private final Region<ByteArrayWrapper, RedisSet> region; + private final Region<ByteArrayWrapper, RedisData> region; - public RedisSetCommandsFunctionExecutor(Region<ByteArrayWrapper, RedisSet> region) { + public RedisSetCommandsFunctionExecutor(Region<ByteArrayWrapper, RedisData> region) { this.region = region; } @@ -52,15 +52,8 @@ public class RedisSetCommandsFunctionExecutor implements RedisSetCommands { @SuppressWarnings("unchecked") @Override - public long srem(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToRemove, - AtomicBoolean setWasDeleted) { - Object[] resultList = - CommandFunction.execute(SREM, key, membersToRemove, region); - - long membersRemoved = (long) resultList[0]; - Boolean wasDeleted = (Boolean) resultList[1]; - setWasDeleted.set(wasDeleted); - return membersRemoved; + public long srem(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToRemove) { + return CommandFunction.execute(SREM, key, membersToRemove, region); } @Override diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetInRegion.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetInRegion.java index ac19b22..70c1339 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetInRegion.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetInRegion.java @@ -15,15 +15,19 @@ package org.apache.geode.redis.internal.executor.set; +import static org.apache.geode.redis.internal.RedisDataType.REDIS_SET; + import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.ByteArrayWrapper; +import org.apache.geode.redis.internal.RedisConstants; +import org.apache.geode.redis.internal.RedisData; +import org.apache.geode.redis.internal.RedisDataTypeMismatchException; /** * This class still uses "synchronized" to protect the underlying HashSet even though all writers do @@ -31,10 +35,10 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; * removed once readers are changed to also use the {@link SynchronizedStripedExecutor}. */ public class RedisSetInRegion implements RedisSetCommands { - private final Region<ByteArrayWrapper, RedisSet> region; + private final Region<ByteArrayWrapper, RedisData> region; @SuppressWarnings("unchecked") - public RedisSetInRegion(Region<ByteArrayWrapper, RedisSet> region) { + public RedisSetInRegion(Region<ByteArrayWrapper, RedisData> region) { this.region = region; } @@ -43,7 +47,7 @@ public class RedisSetInRegion implements RedisSetCommands { ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToAdd) { - RedisSet redisSet = region.get(key); + RedisSet redisSet = checkType(region.get(key)); if (redisSet != null) { return redisSet.sadd(membersToAdd, region, key); @@ -56,8 +60,8 @@ public class RedisSetInRegion implements RedisSetCommands { @Override public long srem( ByteArrayWrapper key, - ArrayList<ByteArrayWrapper> membersToRemove, AtomicBoolean setWasDeleted) { - return getRedisSet(key).srem(membersToRemove, region, key, setWasDeleted); + ArrayList<ByteArrayWrapper> membersToRemove) { + return getRedisSet(key).srem(membersToRemove, region, key); } @Override @@ -101,6 +105,17 @@ public class RedisSetInRegion implements RedisSetCommands { } private RedisSet getRedisSet(ByteArrayWrapper key) { - return region.getOrDefault(key, RedisSet.EMPTY); + return checkType(region.getOrDefault(key, RedisSet.EMPTY)); + } + + private RedisSet checkType(RedisData redisData) { + if (redisData == null) { + return null; + } + if (redisData.getType() != REDIS_SET) { + throw new RedisDataTypeMismatchException(RedisConstants.ERROR_WRONG_TYPE); + } + return (RedisSet) redisData; } + } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java index ea251da..0b86071 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java @@ -21,7 +21,6 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; public class SAddExecutor extends SetExecutor { @@ -30,11 +29,8 @@ public class SAddExecutor extends SetExecutor { List<ByteArrayWrapper> commandElements = command.getProcessedCommandWrappers(); - // Save key - context.getKeyRegistrar().register(command.getKey(), RedisDataType.REDIS_SET); - RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); ArrayList<ByteArrayWrapper> membersToAdd = new ArrayList<>(commandElements.subList(2, commandElements.size())); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java index b3d587c..f5c313b 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java @@ -28,7 +28,7 @@ public class SCardExecutor extends SetExecutor { ByteArrayWrapper key = command.getKey(); checkDataType(key, RedisDataType.REDIS_SET, context); RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); int size = redisSetCommands.scard(key); command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), size)); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java index bda26df..85819be 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java @@ -20,12 +20,10 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; public class SIsMemberExecutor extends SetExecutor { private static final int EXISTS = 1; - private static final int NOT_EXISTS = 0; @Override @@ -33,21 +31,10 @@ public class SIsMemberExecutor extends SetExecutor { List<byte[]> commandElems = command.getProcessedCommand(); ByteArrayWrapper key = command.getKey(); - if (!context.getKeyRegistrar().isRegistered(key)) { - command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS)); - return; - } - ByteArrayWrapper member = new ByteArrayWrapper(commandElems.get(2)); RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); - boolean isMember = redisSetCommands.sismember(key, member); - if (isMember) { - command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), EXISTS)); - // save key for next quick lookup - context.getKeyRegistrar().register(key, RedisDataType.REDIS_SET); - } else { - command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_EXISTS)); - } + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); + int result = redisSetCommands.sismember(key, member) ? EXISTS : NOT_EXISTS; + command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), result)); } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java index b7089da..10b909b 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java @@ -22,17 +22,14 @@ import org.apache.geode.redis.internal.CoderException; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; import org.apache.geode.redis.internal.RedisConstants; -import org.apache.geode.redis.internal.RedisDataType; public class SMembersExecutor extends SetExecutor { @Override public void executeCommand(Command command, ExecutionHandlerContext context) { ByteArrayWrapper key = command.getKey(); - checkDataType(key, RedisDataType.REDIS_SET, context); - RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); Set<ByteArrayWrapper> members = redisSetCommands.smembers(key); try { diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java index 3539cfc..4da5317 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java @@ -25,6 +25,7 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RedisDataType; public class SMoveExecutor extends SetExecutor { @@ -44,10 +45,10 @@ public class SMoveExecutor extends SetExecutor { checkDataType(source, RedisDataType.REDIS_SET, context); checkDataType(destination, RedisDataType.REDIS_SET, context); - Region<ByteArrayWrapper, RedisSet> region = getRegion(context); + Region<ByteArrayWrapper, RedisData> region = getRegion(context); try (AutoCloseableLock regionLock = withRegionLock(context, source)) { - RedisSet sourceSet = region.get(source); + RedisData sourceSet = region.get(source); if (sourceSet == null) { command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_MOVED)); @@ -56,9 +57,7 @@ public class SMoveExecutor extends SetExecutor { boolean removed = new RedisSetInRegion(region).srem(source, - new ArrayList<>(Collections.singletonList(member)), - null) == 1; - // TODO: native redis SMOVE that empties the src set causes it to no longer exist + new ArrayList<>(Collections.singletonList(member))) == 1; if (!removed) { command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), NOT_MOVED)); @@ -67,8 +66,6 @@ public class SMoveExecutor extends SetExecutor { // TODO: this should invoke a function in case the primary for destination is remote new RedisSetInRegion(region).sadd(destination, new ArrayList<>(Collections.singletonList(member))); - context.getKeyRegistrar().register(destination, RedisDataType.REDIS_SET); - context.getKeyRegistrar().register(source, RedisDataType.REDIS_SET); command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), MOVED)); } catch (InterruptedException e) { diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java index d3f0ce2..a05777d 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java @@ -36,7 +36,7 @@ public class SPopExecutor extends SetExecutor { ByteArrayWrapper key = command.getKey(); RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); Collection<ByteArrayWrapper> popped = redisSetCommands.spop(key, popCount); if (popped.isEmpty()) { command.setResponse(Coder.getNilResponse(context.getByteBufAllocator())); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java index 73180a8..bc4fbbd 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java @@ -54,7 +54,7 @@ public class SRandMemberExecutor extends SetExecutor { } RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); Collection<ByteArrayWrapper> results = redisSetCommands.srandmember(key, count); try { if (results.isEmpty()) { diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java index 48b6a75..de7882f 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java @@ -16,13 +16,11 @@ package org.apache.geode.redis.internal.executor.set; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; public class SRemExecutor extends SetExecutor { @Override @@ -31,30 +29,18 @@ public class SRemExecutor extends SetExecutor { ByteArrayWrapper key = command.getKey(); - checkDataType(key, RedisDataType.REDIS_SET, context); - RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); ArrayList<ByteArrayWrapper> membersToRemove = new ArrayList<>( commandElements .subList(2, commandElements.size())); - AtomicBoolean setWasDeleted = new AtomicBoolean(); - long membersRemoved = redisSetCommands.srem( key, - membersToRemove, - setWasDeleted); - if (setWasDeleted.get()) { - context - .getKeyRegistrar() - .unregisterIfType( - key, - RedisDataType.REDIS_SET); - } + membersToRemove); command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), membersRemoved)); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java index 11b0e38..ef2a7e8 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java @@ -98,7 +98,7 @@ public class SScanExecutor extends AbstractScanExecutor { } RedisSetCommands redisSetCommands = - new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getSetRegion()); + new RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion()); List<Object> returnList = redisSetCommands.sscan(key, matchPattern, count, cursor); command.setResponse(Coder.getScanResponse(context.getByteBufAllocator(), returnList)); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetExecutor.java index c5c247c..e6ec430 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetExecutor.java @@ -20,6 +20,7 @@ import org.apache.geode.cache.TimeoutException; import org.apache.geode.redis.internal.AutoCloseableLock; import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.ExecutionHandlerContext; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RedisLockService; import org.apache.geode.redis.internal.executor.AbstractExecutor; @@ -31,12 +32,12 @@ public abstract class SetExecutor extends AbstractExecutor { * @param context the execution handler * @return the set Region */ - Region<ByteArrayWrapper, RedisSet> getRegion( + Region<ByteArrayWrapper, RedisData> getRegion( ExecutionHandlerContext context) { return context .getRegionProvider() - .getSetRegion(); + .getDataRegion(); } protected AutoCloseableLock withRegionLock( diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java index 32a76fc..b6928fd 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java @@ -26,7 +26,7 @@ import org.apache.geode.redis.internal.ByteArrayWrapper; import org.apache.geode.redis.internal.Coder; import org.apache.geode.redis.internal.Command; import org.apache.geode.redis.internal.ExecutionHandlerContext; -import org.apache.geode.redis.internal.RedisDataType; +import org.apache.geode.redis.internal.RedisData; import org.apache.geode.redis.internal.RegionProvider; public abstract class SetOpExecutor extends SetExecutor { @@ -65,7 +65,7 @@ public abstract class SetOpExecutor extends SetExecutor { List<byte[]> commandElems, int setsStartIndex, RegionProvider regionProvider, ByteArrayWrapper destination, ByteArrayWrapper firstSetKey) { - Region<ByteArrayWrapper, RedisSet> region = this.getRegion(context); + Region<ByteArrayWrapper, RedisData> region = this.getRegion(context); Set<ByteArrayWrapper> firstSet = new RedisSetInRegion(region).smembers(firstSetKey); List<Set<ByteArrayWrapper>> setList = new ArrayList<>(); @@ -91,7 +91,6 @@ public abstract class SetOpExecutor extends SetExecutor { if (resultSet != null) { if (!resultSet.isEmpty()) { region.put(destination, new RedisSet(resultSet)); - context.getKeyRegistrar().register(destination, RedisDataType.REDIS_SET); } command .setResponse( diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SynchronizedStripedExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SynchronizedStripedExecutor.java index a1a48d9..f45eb39 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SynchronizedStripedExecutor.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SynchronizedStripedExecutor.java @@ -45,6 +45,8 @@ public class SynchronizedStripedExecutor implements StripedExecutor { synchronized (getSync(stripeId)) { try { return callable.call(); + } catch (RuntimeException re) { + throw re; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java index 2d6157e..7561fc8 100644 --- a/geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java +++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java @@ -29,8 +29,6 @@ import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.internal.hll.HyperLogLogPlus; -import org.apache.geode.redis.internal.executor.hash.RedisHash; -import org.apache.geode.redis.internal.executor.set.RedisSet; /** @@ -46,8 +44,7 @@ public class RegionProviderJUnitTest { private ExecutionHandlerContext context; - private Region<ByteArrayWrapper, RedisHash> hashRegion; - private Region<ByteArrayWrapper, RedisSet> setRegion; + private Region<ByteArrayWrapper, RedisData> dataRegion; /** * Setup data, objects mocks for the test case @@ -66,16 +63,14 @@ public class RegionProviderJUnitTest { Cache cache = Mockito.mock(Cache.class); Region<Object, Object> newRegion = org.mockito.Mockito.mock(Region.class); - setRegion = Mockito.mock(Region.class); + dataRegion = Mockito.mock(Region.class); Mockito.when(cache.getRegion(NEW_REGION_NM)).thenReturn(newRegion); RegionShortcut defaultShortcut = RegionShortcut.PARTITION; - hashRegion = Mockito.mock(Region.class); - regionProvider = new RegionProvider(stringsRegion, hLLRegion, keyRegistrar, expirationsMap, - expirationExecutor, defaultShortcut, hashRegion, setRegion, cache); + expirationExecutor, defaultShortcut, dataRegion, cache); context = Mockito.mock(ExecutionHandlerContext.class); } diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/executor/AbstractExecutorJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/executor/AbstractExecutorJUnitTest.java index 15a9c4f..ef2cca6 100644 --- a/geode-redis/src/test/java/org/apache/geode/redis/internal/executor/AbstractExecutorJUnitTest.java +++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/executor/AbstractExecutorJUnitTest.java @@ -26,16 +26,8 @@ import org.apache.geode.redis.internal.RedisDataType; import org.apache.geode.redis.internal.RegionProvider; import org.apache.geode.redis.internal.executor.string.SetExecutor; -/** - * Test for AbstractExecutor - * - * - */ public class AbstractExecutorJUnitTest { - /** - * Test the remove entry mehtod - */ @Test public void testRemoveEntry() { // Create any instance of the AbstractExecutor