This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 6b43b76 GEODE-10048: Add framework for Redis events and BLPOP command (#7408) 6b43b76 is described below commit 6b43b76e3d5a9c5b0c84df8e1995d61528871068 Author: Jens Deppe <jde...@vmware.com> AuthorDate: Tue Mar 22 19:47:00 2022 -0700 GEODE-10048: Add framework for Redis events and BLPOP command (#7408) * GEODE-10048: Add framework for Redis events and BLPOP command This commit adds a simple eventing framework to be used by blocking commands as well as keyspace event notification (still to be implemented). The main components are: - EventListener: an interface to be immplemented by anything wishing to receive events. Currently only implemented for blocking commands in the form of BlockingCommandListener. - EventDistributor: the component to which listeners are registered and where events are received and distributed. A single EventDistributor exists in the system and is associated with each ExecutionHandlerContext. - Event: not implemented as a separate class but logically consists of the command (RedisCommandType) and key (RedisKey). When a blocking command receives a relevant event the command is resubmitted into the Netty pipeline. This also means that something could happen (another command) that causes the blocking command to re-block and not complete. This is also what happens with native Redis. For example: - BLPOP 0 A executes and blocks - LPUSH A some-value - Before LPUSH fires an event, LPOP A is received but needs to wait to acquire the lock on A - LPUSH fires an event which the BLPOP listener receives and resubmits BLPOP into the pipeline - Once LPUSH completes, LPOP is next and removes A - BLPOP A runs and ends up blocking again because there is nothing to pop from A --- .../list/BLPopNativeRedisAcceptanceTest.java | 41 ++++ .../commands/executor/list/BLPopDUnitTest.java | 120 ++++++++++++ .../list/AbstractBLPopIntegrationTest.java | 201 +++++++++++++++++++ .../executor/list/BLPopIntegrationTest.java | 37 ++++ .../server/AbstractHitsMissesIntegrationTest.java | 6 + .../eventing/BlockingCommandListenerTest.java | 182 ++++++++++++++++++ .../internal/eventing/EventDistributorTest.java | 213 +++++++++++++++++++++ .../apache/geode/codeAnalysis/excludedClasses.txt | 23 +-- .../geode/redis/internal/GeodeRedisServer.java | 12 +- .../geode/redis/internal/RedisConstants.java | 2 + .../geode/redis/internal/commands/Command.java | 21 +- .../redis/internal/commands/RedisCommandType.java | 3 + .../internal/commands/executor/RedisResponse.java | 12 ++ .../commands/executor/list/BLPopExecutor.java | 61 ++++++ .../commands/executor/list/LPushExecutor.java | 5 +- .../commands/executor/list/RPushExecutor.java | 2 +- .../geode/redis/internal/data/NullRedisList.java | 17 +- .../geode/redis/internal/data/RedisList.java | 50 ++++- .../internal/eventing/BlockingCommandListener.java | 123 ++++++++++++ .../redis/internal/eventing/EventDistributor.java | 105 ++++++++++ .../redis/internal/eventing/EventListener.java | 62 ++++++ .../redis/internal/eventing/EventResponse.java | 34 ++++ .../apache/geode/redis/internal/netty/Client.java | 6 +- .../apache/geode/redis/internal/netty/Coder.java | 6 + .../internal/netty/ExecutionHandlerContext.java | 46 +++-- .../redis/internal/netty/NettyRedisServer.java | 8 +- .../redis/internal/netty/StringBytesGlossary.java | 6 + .../redis/internal/services/RegionProvider.java | 4 +- .../netty/ExecutionHandlerContextTest.java | 2 +- 29 files changed, 1344 insertions(+), 66 deletions(-) diff --git a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java new file mode 100755 index 0000000..2c54bdd --- /dev/null +++ b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.commands.executor.list; + + +import org.junit.ClassRule; + +import org.apache.geode.redis.NativeRedisClusterTestRule; + +public class BLPopNativeRedisAcceptanceTest extends AbstractBLPopIntegrationTest { + + @ClassRule + public static NativeRedisClusterTestRule redis = new NativeRedisClusterTestRule(); + + @Override + public int getPort() { + return redis.getExposedPorts().get(0); + } + + @Override + public void flushAll() { + redis.flushAll(); + } + + @Override + public void awaitEventDistributorSize(int size) throws Exception { + Thread.sleep(500); + } +} diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java new file mode 100644 index 0000000..ff1b394 --- /dev/null +++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.commands.executor.list; + +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.concurrent.Future; + +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; + +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.redis.internal.GeodeRedisService; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.dunit.rules.RedisClusterStartupRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class BLPopDUnitTest { + + @ClassRule + public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(4); + + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); + + private static final String LIST_KEY = "key"; + private static JedisCluster jedis; + private static MemberVM server1; + private static MemberVM server2; + private static MemberVM server3; + private static int locatorPort; + private static int redisServerPort; + + @BeforeClass + public static void classSetup() { + MemberVM locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + + server1 = cluster.startRedisVM(1, locatorPort); + server2 = cluster.startRedisVM(2, locatorPort); + + redisServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); + server3 = cluster.startRedisVM(3, Integer.toString(redisServerPort), locatorPort); + + jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), REDIS_CLIENT_TIMEOUT, + 20); + } + + @After + public void tearDown() { + cluster.flushAll(); + } + + @Test + public void testClientRepeatsBLpopAfterServerCrash() throws Exception { + cluster.moveBucketForKey(LIST_KEY, "server-3"); + Future<List<String>> blpopFuture = executor.submit(() -> jedis.blpop(0, LIST_KEY)); + + try { + cluster.crashVM(3); + jedis.lpush(LIST_KEY, "value"); + List<String> result = blpopFuture.get(); + assertThat(result).containsExactly(LIST_KEY, "value"); + } finally { + cluster.startRedisVM(3, Integer.toString(redisServerPort), locatorPort); + cluster.rebalanceAllRegions(); + } + } + + @Test + public void testBLPopFollowsBucketMovement() throws Exception { + Future<List<String>> blpopFuture = executor.submit(() -> jedis.blpop(0, LIST_KEY)); + + for (int i = 0; i < 11; i++) { + cluster.moveBucketForKey(LIST_KEY); + Thread.sleep(500); + } + + jedis.lpush(LIST_KEY, "value"); + + assertThat(blpopFuture.get()).containsExactly(LIST_KEY, "value"); + + int registeredListeners = getRegisteredListeners(server1); + registeredListeners += getRegisteredListeners(server2); + registeredListeners += getRegisteredListeners(server3); + + assertThat(registeredListeners).isEqualTo(0); + } + + private int getRegisteredListeners(MemberVM vm) { + return vm.invoke(() -> { + GeodeRedisService service = ClusterStartupRule.getCache().getService(GeodeRedisService.class); + return service.getRedisServer().getEventDistributor().getRegisteredKeys(); + }); + } + + +} diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java new file mode 100755 index 0000000..fd9b12a --- /dev/null +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.commands.executor.list; + +import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Protocol; + +import org.apache.geode.redis.RedisIntegrationTest; +import org.apache.geode.redis.internal.RedisConstants; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public abstract class AbstractBLPopIntegrationTest implements RedisIntegrationTest { + private static final String KEY = "key"; + + protected JedisCluster jedis; + + public abstract void awaitEventDistributorSize(int size) throws Exception; + + @ClassRule + public static ExecutorServiceRule executor = new ExecutorServiceRule(); + + @Before + public void setUp() { + jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), REDIS_CLIENT_TIMEOUT); + } + + @After + public void tearDown() { + flushAll(); + jedis.close(); + } + + @Test + public void testInvalidArguments_throwErrors() { + assertAtLeastNArgs(jedis, Protocol.Command.BLPOP, 2); + } + + @Test + public void testInvalidTimeout_throwsError() { + assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP, "key1", + "0.A")) + .hasMessage(RedisConstants.ERROR_TIMEOUT_INVALID); + } + + @Test + public void testKeysInDifferentSlots_throwsError() { + assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP, "key1", + "key2", "0")) + .hasMessage(RedisConstants.ERROR_WRONG_SLOT); + } + + @Test + public void testNegativeTimeout_throwsError() { + assertThatThrownBy(() -> jedis.blpop(-1, "key1")) + .hasMessage(RedisConstants.ERROR_NEGATIVE_TIMEOUT); + } + + @Test + public void testBLPopForNonListKey() { + jedis.set("not-a-list", "value"); + + assertThatThrownBy(() -> jedis.blpop(0, "not-a-list")) + .hasMessage(RedisConstants.ERROR_WRONG_TYPE); + } + + @Test + public void testBLPopWhenValueExists() { + jedis.lpush(KEY, "value1", "value2"); + + List<String> result = jedis.blpop(0, KEY); + + assertThat(result).containsExactly(KEY, "value2"); + assertThat(jedis.lpop(KEY)).isEqualTo("value1"); + } + + @Test + public void testBLPopDoesNotError_whenTimeoutHasExponent() { + jedis.lpush(KEY, "value1", "value2"); + + Object result = jedis.sendCommand(KEY, Protocol.Command.BLPOP, KEY, "1E+3"); + + assertThat(result).isNotNull(); + } + + @Test + public void testBLPopWhenValueDoesNotExist() throws Exception { + Future<List<String>> future = executor.submit(() -> jedis.blpop(0, KEY)); + + awaitEventDistributorSize(1); + jedis.lpush(KEY, "value1", "value2"); + + assertThat(future.get()).containsExactly(KEY, "value2"); + assertThat(jedis.lpop(KEY)).isEqualTo("value1"); + } + + @Test + public void testRPushFiresEventForBLPop() throws Exception { + Future<List<String>> future = executor.submit(() -> jedis.blpop(0, KEY)); + + awaitEventDistributorSize(1); + jedis.rpush(KEY, "value1", "value2"); + + assertThat(future.get()).containsExactly(KEY, "value1"); + assertThat(jedis.lpop(KEY)).isEqualTo("value2"); + } + + @Test + public void testBLPopWhenTimeoutIsExceeded() { + int timeout = 10; + Future<List<String>> future = executor.submit(() -> jedis.blpop(timeout, KEY)); + GeodeAwaitility.await().atMost(timeout * 2, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(future.get()).isNull()); + } + + @Test + public void testBLpopFirstKeyDoesNotExistButSecondOneDoes() { + jedis.lpush("{A}key2", "value2"); + + List<String> result = jedis.blpop(0, "{A}key1", "{A}key2"); + + assertThat(result).containsExactly("{A}key2", "value2"); + } + + @Test + public void testBLpopKeyOrderingIsCorrect() { + jedis.lpush("{A}key2", "value2"); + jedis.lpush("{A}key3", "value3"); + + List<String> result = jedis.blpop(0, "{A}key1", "{A}key3", "{A}key2"); + + assertThat(result).containsExactly("{A}key3", "value3"); + } + + @Test + public void testConcurrentBLPop() throws Exception { + int totalElements = 10_000; + List<Object> accumulated = Collections.synchronizedList(new ArrayList<>(totalElements + 2)); + AtomicBoolean running = new AtomicBoolean(true); + + Future<Void> future1 = executor.submit(() -> doBLPop(running, accumulated)); + Future<Void> future2 = executor.submit(() -> doBLPop(running, accumulated)); + + List<String> result = new ArrayList<>(); + for (int i = 0; i < totalElements; i++) { + jedis.lpush(KEY, "value" + i); + result.add("value" + i); + } + + GeodeAwaitility.await() + .untilAsserted(() -> assertThat(accumulated.size()).isEqualTo(totalElements)); + + running.set(false); + + jedis.lpush(KEY, "stop1"); + jedis.lpush(KEY, "stop2"); + result.add("stop1"); + result.add("stop2"); + future1.get(); + future2.get(); + + assertThat(accumulated).containsExactlyInAnyOrderElementsOf(result); + } + + private void doBLPop(AtomicBoolean running, List<Object> accumulated) { + while (running.get()) { + accumulated.add(jedis.blpop(0, KEY).get(1)); + } + } + +} diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java new file mode 100755 index 0000000..f77abc4 --- /dev/null +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.commands.executor.list; + +import org.junit.ClassRule; + +import org.apache.geode.redis.GeodeRedisServerRule; +import org.apache.geode.redis.internal.eventing.EventDistributor; +import org.apache.geode.test.awaitility.GeodeAwaitility; + +public class BLPopIntegrationTest extends AbstractBLPopIntegrationTest { + + @ClassRule + public static GeodeRedisServerRule server = new GeodeRedisServerRule(); + + @Override + public int getPort() { + return server.getPort(); + } + + public void awaitEventDistributorSize(int size) { + EventDistributor distributor = server.getServer().getEventDistributor(); + GeodeAwaitility.await().until(() -> distributor.getRegisteredKeys() == size); + } +} diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java index 73546e1..50000cb 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java @@ -575,6 +575,12 @@ public abstract class AbstractHitsMissesIntegrationTest implements RedisIntegrat /************* List related commands *************/ @Test + public void testBLPop() { + jedis.lpush(LIST_KEY, "element"); + runCommandAndAssertNoStatUpdates(LIST_KEY, k -> jedis.blpop(0, k)); + } + + @Test public void testLindex() { runCommandAndAssertHitsAndMisses(LIST_KEY, k -> jedis.lindex(k, 1)); } diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java new file mode 100644 index 0000000..a984fc6 --- /dev/null +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.internal.commands.Command; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class BlockingCommandListenerTest { + + @Test + public void testTimeoutIsAdjusted() { + ExecutionHandlerContext context = mock(ExecutionHandlerContext.class); + List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes()); + Command command = new Command(RedisCommandType.BLPOP, commandArgs); + BlockingCommandListener listener = + new BlockingCommandListener(context, command, Collections.emptyList(), 1.0D); + + listener.resubmitCommand(); + + ArgumentCaptor<Command> argumentCaptor = ArgumentCaptor.forClass(Command.class); + verify(context, times(1)).resubmitCommand(argumentCaptor.capture()); + + double timeout = Coder.bytesToDouble(argumentCaptor.getValue().getCommandArguments().get(0)); + assertThat(timeout).isLessThan(1.0D); + } + + @Test + public void testAdjustedTimeoutDoesNotBecomeNegative() { + ExecutionHandlerContext context = mock(ExecutionHandlerContext.class); + List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes()); + Command command = new Command(RedisCommandType.BLPOP, commandArgs); + BlockingCommandListener listener = + new BlockingCommandListener(context, command, Collections.emptyList(), 1e-9); + + listener.resubmitCommand(); + + ArgumentCaptor<Command> argumentCaptor = ArgumentCaptor.forClass(Command.class); + verify(context, times(1)).resubmitCommand(argumentCaptor.capture()); + + double timeout = Coder.bytesToDouble(argumentCaptor.getValue().getCommandArguments().get(0)); + assertThat(timeout).isEqualTo(1e-9); + } + + @Test + public void testListenerIsRemovedAfterTimeout() { + ExecutionHandlerContext context = mock(ExecutionHandlerContext.class); + List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes()); + Command command = new Command(RedisCommandType.BLPOP, commandArgs); + BlockingCommandListener listener = + new BlockingCommandListener(context, command, Collections.emptyList(), 1e-9); + EventDistributor eventDistributor = new EventDistributor(); + + eventDistributor.registerListener(listener); + + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> assertThat(eventDistributor.getRegisteredKeys()).isEqualTo(0)); + } + + @Test + public void testListenersAffectedByBucketMovementAreInactiveAndDoNotProcessTimeout() { + ExecutionHandlerContext context = mock(ExecutionHandlerContext.class); + RedisKey key = new RedisKey("KEY".getBytes()); + List<byte[]> commandArgs = Arrays.asList(key.toBytes(), "0".getBytes()); + Command command = new Command(RedisCommandType.BLPOP, commandArgs); + BlockingCommandListener listener = + new BlockingCommandListener(context, command, Collections.singletonList(key), 1); + EventDistributor eventDistributor = new EventDistributor(); + + eventDistributor.registerListener(listener); + + eventDistributor.afterBucketRemoved(key.getBucketId(), null); + + verify(context, atMost(1)).resubmitCommand(any()); + await().atMost(Duration.ofSeconds(5)) + .during(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(context, atMost(0)).writeToChannel(any())); + assertThat(eventDistributor.getRegisteredKeys()).isEqualTo(0); + } + + @Test + public void testResubmitAndTimeoutDoNotBothExecute() { + ExecutionHandlerContext context = mock(ExecutionHandlerContext.class); + List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes()); + Command command = new Command(RedisCommandType.BLPOP, commandArgs); + BlockingCommandListener listener = + new BlockingCommandListener(context, command, Arrays.asList(command.getKey()), 0); + AtomicReference<BlockingCommandListener> listenerRef = new AtomicReference<>(listener); + EventDistributor eventDistributor = new EventDistributor(); + + eventDistributor.registerListener(listener); + + new ConcurrentLoopingThreads(10_000, + i -> listenerRef.get().process(null, command.getKey()), + i -> listenerRef.get().timeout(eventDistributor)) + .runWithAction(() -> { + // Verify that resubmitCommand and timeout have not both been called + try { + verify(context, atLeastOnce()).resubmitCommand(any()); + verify(context, never()).writeToChannel(any()); + } catch (AssertionError e) { + verify(context, never()).resubmitCommand(any()); + verify(context, atLeastOnce()).writeToChannel(any()); + } + reset(context); + eventDistributor.removeListener(listenerRef.get()); + listenerRef.set(new BlockingCommandListener(context, command, + Arrays.asList(command.getKey()), 0)); + eventDistributor.registerListener(listenerRef.get()); + }); + } + + @Test + public void concurrencyOfManyRegistrationsForTheSameKeys() { + ExecutionHandlerContext context = mock(ExecutionHandlerContext.class); + EventDistributor distributor = new EventDistributor(); + RedisKey keyA = new RedisKey("keyA".getBytes()); + RedisKey keyB = new RedisKey("keyB".getBytes()); + RedisKey keyC = new RedisKey("keyC".getBytes()); + + // Should not produce any exceptions + new ConcurrentLoopingThreads(10_000, + i -> registerListener(distributor, context, keyA, keyB, keyC), + i -> registerListener(distributor, context, keyB, keyC, keyA), + i -> registerListener(distributor, context, keyC, keyA, keyB), + i -> distributor.afterBucketRemoved(keyA.getBucketId(), null), + i -> distributor.afterBucketRemoved(keyB.getBucketId(), null), + i -> distributor.afterBucketRemoved(keyC.getBucketId(), null), + i -> distributor.fireEvent(null, keyA), + i -> distributor.fireEvent(null, keyB), + i -> distributor.fireEvent(null, keyC)) + .run(); + + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> assertThat(distributor.getRegisteredKeys()).isEqualTo(0)); + } + + private void registerListener(EventDistributor distributor, ExecutionHandlerContext context, + RedisKey... keys) { + List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes()); + Command command = new Command(RedisCommandType.BLPOP, commandArgs); + BlockingCommandListener listener = + new BlockingCommandListener(context, command, Arrays.asList(keys), 1e-9); + distributor.registerListener(listener); + } +} diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java new file mode 100644 index 0000000..d8a4d1c --- /dev/null +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.junit.Test; + +import org.apache.geode.redis.ConcurrentLoopingThreads; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +public class EventDistributorTest { + + public static class TestEventListener implements EventListener { + private final List<RedisKey> keys; + private int fired = 0; + + public TestEventListener(RedisKey... keys) { + this.keys = Arrays.stream(keys).collect(Collectors.toList()); + } + + public int getFired() { + return fired; + } + + @Override + public EventResponse process(RedisCommandType commandType, RedisKey key) { + fired += 1; + return EventResponse.REMOVE_AND_STOP; + } + + @Override + public List<RedisKey> keys() { + return keys; + } + + @Override + public void resubmitCommand() {} + + @Override + public void scheduleTimeout(ScheduledExecutorService executor, EventDistributor distributor) {} + } + + @Test + public void firingEventRemovesListener() { + RedisKey keyA = new RedisKey("a".getBytes()); + RedisKey keyB = new RedisKey("b".getBytes()); + EventDistributor distributor = new EventDistributor(); + TestEventListener listener = new TestEventListener(keyA, keyB); + distributor.registerListener(listener); + + distributor.fireEvent(null, keyA); + assertThat(listener.getFired()).isEqualTo(1); + assertThat(distributor.getRegisteredKeys()).isEqualTo(0); + } + + @Test + public void firingEventRemovesFirstListener_whenMultipleExist() { + RedisKey keyA = new RedisKey("a".getBytes()); + RedisKey keyB = new RedisKey("b".getBytes()); + EventDistributor distributor = new EventDistributor(); + TestEventListener listener1 = new TestEventListener(keyA, keyB); + TestEventListener listener2 = new TestEventListener(keyA, keyB); + distributor.registerListener(listener1); + distributor.registerListener(listener2); + + assertThat(distributor.getRegisteredKeys()).isEqualTo(4); + + distributor.fireEvent(null, keyA); + assertThat(listener1.getFired()).isEqualTo(1); + assertThat(listener2.getFired()).isEqualTo(0); + assertThat(distributor.getRegisteredKeys()).isEqualTo(2); + + distributor.fireEvent(null, keyA); + assertThat(listener1.getFired()).isEqualTo(1); + assertThat(listener2.getFired()).isEqualTo(1); + assertThat(distributor.getRegisteredKeys()).isEqualTo(0); + } + + @Test + public void listenerIsRemovedAfterBucketMoves() { + RedisKey keyA = new RedisKey("a".getBytes()); + RedisKey keyB = new RedisKey("b".getBytes()); + EventDistributor distributor = new EventDistributor(); + TestEventListener listener1 = new TestEventListener(keyA); + distributor.registerListener(listener1); + distributor.registerListener(new TestEventListener(keyB)); + + assertThat(distributor.getRegisteredKeys()).isEqualTo(2); + + distributor.afterBucketRemoved(keyA.getBucketId(), null); + + assertThat(distributor.getRegisteredKeys()).isEqualTo(1); + } + + @Test + public void concurrencyOfManyRegistrationsAndBucketMovementForTheSameKeys() { + EventDistributor distributor = new EventDistributor(); + RedisKey keyA = new RedisKey("keyA".getBytes()); + RedisKey keyB = new RedisKey("keyB".getBytes()); + RedisKey keyC = new RedisKey("keyC".getBytes()); + + // Should not produce any exceptions + new ConcurrentLoopingThreads(10_000, + i -> distributor.registerListener(new TestEventListener(keyA, keyB, keyC)), + i -> distributor.registerListener(new TestEventListener(keyB, keyC, keyA)), + i -> distributor.registerListener(new TestEventListener(keyC, keyA, keyB)), + i -> distributor.afterBucketRemoved(keyA.getBucketId(), null), + i -> distributor.afterBucketRemoved(keyB.getBucketId(), null), + i -> distributor.afterBucketRemoved(keyC.getBucketId(), null), + i -> distributor.fireEvent(null, keyA), + i -> distributor.fireEvent(null, keyB), + i -> distributor.fireEvent(null, keyC)) + .runInLockstep(); + + distributor.fireEvent(null, keyA); + distributor.fireEvent(null, keyB); + distributor.fireEvent(null, keyC); + assertThat(distributor.getRegisteredKeys()).isEqualTo(0); + } + + @Test + public void concurrencyOfManyRegistrationAndRemovalOfSameListener() { + EventDistributor distributor = new EventDistributor(); + RedisKey keyA = new RedisKey("keyA".getBytes()); + RedisKey keyB = new RedisKey("keyB".getBytes()); + RedisKey keyC = new RedisKey("keyC".getBytes()); + AtomicReference<EventListener> listenerRef1 = + new AtomicReference<>(new TestEventListener(keyA, keyB, keyC)); + AtomicReference<EventListener> listenerRef2 = + new AtomicReference<>(new TestEventListener(keyB, keyC, keyA)); + AtomicReference<EventListener> listenerRef3 = + new AtomicReference<>(new TestEventListener(keyC, keyA, keyB)); + + // Should not produce any exceptions + new ConcurrentLoopingThreads(10_000, + i -> distributor.registerListener(listenerRef1.get()), + i -> distributor.registerListener(listenerRef2.get()), + i -> distributor.registerListener(listenerRef3.get()), + i -> distributor.removeListener(listenerRef1.get()), + i -> distributor.removeListener(listenerRef2.get()), + i -> distributor.removeListener(listenerRef3.get()), + i -> distributor.fireEvent(null, keyA), + i -> distributor.fireEvent(null, keyB), + i -> distributor.fireEvent(null, keyC)) + .runWithAction(() -> { + listenerRef1.set(new TestEventListener(keyA, keyB, keyC)); + listenerRef2.set(new TestEventListener(keyB, keyC, keyA)); + listenerRef3.set(new TestEventListener(keyC, keyA, keyB)); + }); + + distributor.fireEvent(null, keyA); + distributor.fireEvent(null, keyB); + distributor.fireEvent(null, keyC); + assertThat(distributor.getRegisteredKeys()).isEqualTo(0); + } + + /** + * When a key points to a Queue (of listeners) that becomes empty that key is + * removed. Without correct synchronization we could add a new listener to a queue + * just before the key (and thus the queue) is removed. This test ensures that is + * not happening. + */ + @Test + public void ensureNotRegisteringListenerOnQueueJustRemoved() { + EventDistributor distributor = new EventDistributor(); + RedisKey keyA = new RedisKey("keyA".getBytes()); + AtomicReference<EventListener> listenerRef1 = + new AtomicReference<>(new TestEventListener(keyA)); + AtomicReference<EventListener> listenerRef2 = + new AtomicReference<>(new TestEventListener(keyA)); + + distributor.registerListener(listenerRef1.get()); + AtomicInteger iteration = new AtomicInteger(0); + + new ConcurrentLoopingThreads(10_000, + iteration::set, + i -> distributor.registerListener(listenerRef2.get()), + i -> distributor.removeListener(listenerRef1.get())) + .runWithAction(() -> { + assertThat(distributor.getRegisteredKeys()) + .as("Iteration = " + iteration.get()).isEqualTo(1); + distributor.removeListener(listenerRef2.get()); + + listenerRef1.set(new TestEventListener(keyA)); + listenerRef2.set(new TestEventListener(keyA)); + + distributor.registerListener(listenerRef1.get()); + }); + } + +} diff --git a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt index fee9c78..29d0c21 100644 --- a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt +++ b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt @@ -1,20 +1,21 @@ -org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction -org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap -org/apache/geode/redis/internal/data/collections/SizeableByteArrayList -org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor -org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor +org/apache/geode/redis/internal/RedisException +org/apache/geode/redis/internal/commands/RedisCommandType +org/apache/geode/redis/internal/commands/RedisCommandType$Category +org/apache/geode/redis/internal/commands/RedisCommandType$Flag +org/apache/geode/redis/internal/commands/executor/sortedset/ZAggregator +org/apache/geode/redis/internal/commands/executor/string/BitOpExecutor$BitOp org/apache/geode/redis/internal/data/RedisCrossSlotException org/apache/geode/redis/internal/data/RedisDataMovedException org/apache/geode/redis/internal/data/RedisDataTypeMismatchException -org/apache/geode/redis/internal/commands/executor/sortedset/ZAggregator -org/apache/geode/redis/internal/RedisException org/apache/geode/redis/internal/data/RedisHash$Hash org/apache/geode/redis/internal/data/RedisKeyExistsException org/apache/geode/redis/internal/data/RedisSet$MemberSet org/apache/geode/redis/internal/data/RedisSortedSet$MemberMap -org/apache/geode/redis/internal/commands/executor/string/BitOpExecutor$BitOp +org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap +org/apache/geode/redis/internal/data/collections/SizeableByteArrayList +org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor +org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor +org/apache/geode/redis/internal/eventing/EventResponse org/apache/geode/redis/internal/pubsub/Publisher$PublishFunction +org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction org/apache/geode/redis/internal/services/locking/StripedExecutorService$State -org/apache/geode/redis/internal/commands/RedisCommandType -org/apache/geode/redis/internal/commands/RedisCommandType$Category -org/apache/geode/redis/internal/commands/RedisCommandType$Flag diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java index 8db6c30..29a0ed5 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java @@ -27,6 +27,7 @@ import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.internal.statistics.StatisticsClockFactory; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.eventing.EventDistributor; import org.apache.geode.redis.internal.netty.Coder; import org.apache.geode.redis.internal.netty.NettyRedisServer; import org.apache.geode.redis.internal.pubsub.PubSub; @@ -62,6 +63,7 @@ public class GeodeRedisServer { private final RegionProvider regionProvider; private final PubSub pubSub; private final RedisStats redisStats; + private final EventDistributor eventDistributor; private boolean shutdown; /** @@ -81,7 +83,8 @@ public class GeodeRedisServer { StripedCoordinator stripedCoordinator = new LockingStripedCoordinator(); RedisMemberInfoRetrievalFunction infoFunction = RedisMemberInfoRetrievalFunction.register(); - regionProvider = new RegionProvider(cache, stripedCoordinator, redisStats); + eventDistributor = new EventDistributor(); + regionProvider = new RegionProvider(cache, stripedCoordinator, redisStats, eventDistributor); pubSub = new PubSubImpl(new Subscriptions(redisStats), regionProvider, redisStats); activeExpirationManager = new ActiveExpirationManager(regionProvider); @@ -92,7 +95,7 @@ public class GeodeRedisServer { nettyRedisServer = new NettyRedisServer(() -> cache.getInternalDistributedSystem().getConfig(), regionProvider, pubSub, this::allowUnsupportedCommands, port, bindAddress, redisStats, - member, securityService); + member, securityService, eventDistributor); infoFunction.initialize(member, bindAddress, nettyRedisServer.getPort()); } @@ -102,6 +105,11 @@ public class GeodeRedisServer { return ((PubSubImpl) pubSub).getSubscriptionCount(); } + @VisibleForTesting + public EventDistributor getEventDistributor() { + return eventDistributor; + } + private static RedisStats createStats(InternalCache cache) { InternalDistributedSystem system = cache.getInternalDistributedSystem(); StatisticsClock statisticsClock = diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java index acb1093..1be3ccc 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java @@ -104,4 +104,6 @@ public class RedisConstants { "ERR wrong number of arguments for '%s' command"; public static final String ERROR_BITOP_NOT_MUST_USE_SINGLE_KEY = "ERR BITOP NOT must be called with a single source key."; + public static final String ERROR_TIMEOUT_INVALID = "ERR timeout is not a float or out of range"; + public static final String ERROR_NEGATIVE_TIMEOUT = "ERR timeout is negative"; } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java index 8758a00..a44b7f8 100755 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java @@ -22,8 +22,6 @@ import java.nio.channels.SocketChannel; import java.util.List; import java.util.stream.Collectors; -import io.netty.channel.ChannelHandlerContext; - import org.apache.geode.redis.internal.commands.executor.RedisResponse; import org.apache.geode.redis.internal.data.RedisKey; import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; @@ -47,6 +45,16 @@ public class Command { commandType = null; } + public Command(RedisCommandType command, List<byte[]> commandElems) { + if (commandElems == null || commandElems.isEmpty()) { + throw new IllegalArgumentException( + "List of command elements cannot be empty -> List:" + commandElems); + } + + this.commandType = command; + this.commandElems = commandElems; + } + /** * Constructor for {@link Command}. Must initialize Command with a {@link SocketChannel} and a * {@link List} of command elements @@ -222,13 +230,4 @@ public class Command { return result; } - private ChannelHandlerContext channelHandlerContext; - - public void setChannelHandlerContext(ChannelHandlerContext ctx) { - channelHandlerContext = ctx; - } - - public ChannelHandlerContext getChannelHandlerContext() { - return channelHandlerContext; - } } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java index d5e7d58..e734c06 100755 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java @@ -78,6 +78,7 @@ import org.apache.geode.redis.internal.commands.executor.key.RestoreExecutor; import org.apache.geode.redis.internal.commands.executor.key.ScanExecutor; import org.apache.geode.redis.internal.commands.executor.key.TTLExecutor; import org.apache.geode.redis.internal.commands.executor.key.TypeExecutor; +import org.apache.geode.redis.internal.commands.executor.list.BLPopExecutor; import org.apache.geode.redis.internal.commands.executor.list.LIndexExecutor; import org.apache.geode.redis.internal.commands.executor.list.LInsertExecutor; import org.apache.geode.redis.internal.commands.executor.list.LLenExecutor; @@ -388,6 +389,8 @@ public enum RedisCommandType { /************** Lists *****************/ + BLPOP(new BLPopExecutor(), Category.LIST, SUPPORTED, + new Parameter().min(3).lastKey(-2).flags(WRITE, NOSCRIPT)), LINDEX(new LIndexExecutor(), Category.LIST, SUPPORTED, new Parameter().exact(3).flags(READONLY)), LINSERT(new LInsertExecutor(), Category.LIST, SUPPORTED, new Parameter().exact(5).flags(WRITE, DENYOOM)), diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java index 22a8fae..2577814 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java @@ -42,6 +42,11 @@ import org.apache.geode.redis.internal.netty.CoderException; public class RedisResponse { + /** + * Static value returned by blocking commands. Effectively a no-op. + */ + public static final RedisResponse BLOCKED = new RedisResponse(x -> null); + private final Function<ByteBuf, ByteBuf> coderCallback; private Runnable afterWriteCallback; @@ -119,6 +124,13 @@ public class RedisResponse { } @Immutable + private static final RedisResponse NIL_ARRAY = new RedisResponse(Coder::getNilArrayResponse); + + public static RedisResponse nilArray() { + return NIL_ARRAY; + } + + @Immutable private static final RedisResponse EMPTY = new RedisResponse(Coder::getEmptyResponse); public static RedisResponse flattenedArray(Collection<Collection<?>> nestedCollection) { diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java new file mode 100755 index 0000000..2981aa0 --- /dev/null +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.commands.executor.list; + +import static org.apache.geode.redis.internal.RedisConstants.ERROR_NEGATIVE_TIMEOUT; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.geode.redis.internal.RedisConstants; +import org.apache.geode.redis.internal.commands.Command; +import org.apache.geode.redis.internal.commands.executor.CommandExecutor; +import org.apache.geode.redis.internal.commands.executor.RedisResponse; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.data.RedisList; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class BLPopExecutor implements CommandExecutor { + + @Override + public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) { + List<byte[]> arguments = command.getCommandArguments(); + double timeoutSeconds; + try { + timeoutSeconds = Coder.bytesToDouble(arguments.get(arguments.size() - 1)); + } catch (NumberFormatException e) { + return RedisResponse.error(RedisConstants.ERROR_TIMEOUT_INVALID); + } + + if (timeoutSeconds < 0) { + return RedisResponse.error(ERROR_NEGATIVE_TIMEOUT); + } + + int keyCount = arguments.size() - 1; + List<RedisKey> keys = new ArrayList<>(keyCount); + for (int i = 0; i < keyCount; i++) { + keys.add(new RedisKey(arguments.get(i))); + } + // The order of keys is important and, since locking may alter the order passed into + // lockedExecute, we create a copy here. + List<RedisKey> keysForLocking = new ArrayList<>(keys); + + List<byte[]> popped = context.lockedExecute(keys.get(0), keysForLocking, + () -> RedisList.blpop(context, command, keys, timeoutSeconds)); + + return popped == null ? RedisResponse.BLOCKED : RedisResponse.array(popped, true); + } +} diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java index 8e4b7e3..01763fd 100755 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java @@ -16,11 +16,9 @@ package org.apache.geode.redis.internal.commands.executor.list; import java.util.List; -import org.apache.geode.cache.Region; import org.apache.geode.redis.internal.commands.Command; import org.apache.geode.redis.internal.commands.executor.CommandExecutor; import org.apache.geode.redis.internal.commands.executor.RedisResponse; -import org.apache.geode.redis.internal.data.RedisData; import org.apache.geode.redis.internal.data.RedisKey; import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; @@ -30,13 +28,12 @@ public class LPushExecutor implements CommandExecutor { public final RedisResponse executeCommand(final Command command, final ExecutionHandlerContext context) { List<byte[]> commandElements = command.getProcessedCommand(); - Region<RedisKey, RedisData> region = context.getRegion(); RedisKey key = command.getKey(); List<byte[]> elementsToAdd = commandElements.subList(2, commandElements.size()); final long newLength = context.listLockedExecute(key, false, - list -> list.lpush(elementsToAdd, region, key, shouldPushOnlyIfKeyExists())); + list -> list.lpush(context, elementsToAdd, key, shouldPushOnlyIfKeyExists())); return RedisResponse.integer(newLength); } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java index fc313df..7549aba 100755 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java @@ -36,7 +36,7 @@ public class RPushExecutor implements CommandExecutor { List<byte[]> elementsToAdd = commandElements.subList(2, commandElements.size()); final long newLength = context.listLockedExecute(key, false, - list -> list.rpush(elementsToAdd, region, key, shouldPushOnlyIfKeyExists())); + list -> list.rpush(context, elementsToAdd, key, shouldPushOnlyIfKeyExists())); return RedisResponse.integer(newLength); } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java index c1f14a7..f14fc5a 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java @@ -20,6 +20,8 @@ import java.util.Collections; import java.util.List; import org.apache.geode.cache.Region; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; class NullRedisList extends RedisList { @@ -44,22 +46,23 @@ class NullRedisList extends RedisList { } @Override - public long lpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> region, RedisKey key, - final boolean onlyIfExists) { + public long lpush(ExecutionHandlerContext context, List<byte[]> elementsToAdd, RedisKey key, + boolean onlyIfExists) { if (onlyIfExists) { return 0; } - RedisList newList = new RedisList(); for (byte[] element : elementsToAdd) { newList.elementPushHead(element); } - region.create(key, newList); + context.getRegion().create(key, newList); + context.fireEvent(RedisCommandType.LPUSH, key); + return elementsToAdd.size(); } @Override - public long rpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> region, RedisKey key, + public long rpush(ExecutionHandlerContext context, List<byte[]> elementsToAdd, RedisKey key, final boolean onlyIfExists) { if (onlyIfExists) { return 0; @@ -69,7 +72,9 @@ class NullRedisList extends RedisList { for (byte[] element : elementsToAdd) { newList.elementPushTail(element); } - region.create(key, newList); + context.getRegion().create(key, newList); + context.fireEvent(RedisCommandType.RPUSH, key); + return elementsToAdd.size(); } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java index 64d233c..37fbfa9 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java @@ -36,12 +36,17 @@ import org.apache.geode.internal.serialization.DeserializationContext; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.serialization.SerializationContext; import org.apache.geode.redis.internal.RedisException; +import org.apache.geode.redis.internal.commands.Command; +import org.apache.geode.redis.internal.commands.RedisCommandType; import org.apache.geode.redis.internal.data.collections.SizeableByteArrayList; import org.apache.geode.redis.internal.data.delta.AddByteArrays; import org.apache.geode.redis.internal.data.delta.AddByteArraysTail; import org.apache.geode.redis.internal.data.delta.InsertByteArray; import org.apache.geode.redis.internal.data.delta.RemoveElementsByIndex; import org.apache.geode.redis.internal.data.delta.ReplaceByteArrayAtOffset; +import org.apache.geode.redis.internal.eventing.BlockingCommandListener; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; +import org.apache.geode.redis.internal.services.RegionProvider; public class RedisList extends AbstractRedisData { protected static final int REDIS_LIST_OVERHEAD = memoryOverhead(RedisList.class); @@ -158,40 +163,44 @@ public class RedisList extends AbstractRedisData { } /** - * @param elementsToAdd elements to add to this list; NOTE this list may be modified by this call - * @param region the region this instance is stored in - * @param key the name of the list to add to + * @param context the context of the executing command + * @param elementsToAdd elements to add to this list + * @param key the name of the set to add to * @param onlyIfExists if true then the elements should only be added if the key already exists * and holds a list, otherwise no operation is performed. * @return the length of the list after the operation */ - public long lpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> region, - RedisKey key, final boolean onlyIfExists) { + public long lpush(ExecutionHandlerContext context, List<byte[]> elementsToAdd, + RedisKey key, boolean onlyIfExists) { byte newVersion; synchronized (this) { newVersion = incrementAndGetVersion(); elementsPushHead(elementsToAdd); } - storeChanges(region, key, new AddByteArrays(elementsToAdd, newVersion)); + storeChanges(context.getRegion(), key, new AddByteArrays(elementsToAdd, newVersion)); + context.fireEvent(RedisCommandType.LPUSH, key); + return elementList.size(); } /** + * @param context the context of the executing command * @param elementsToAdd elements to add to this list; - * @param region the region this instance is stored in * @param key the name of the list to add to * @param onlyIfExists if true then the elements should only be added if the key already exists * and holds a list, otherwise no operation is performed. * @return the length of the list after the operation */ - public long rpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> region, - RedisKey key, final boolean onlyIfExists) { + public long rpush(ExecutionHandlerContext context, List<byte[]> elementsToAdd, RedisKey key, + boolean onlyIfExists) { byte newVersion; synchronized (this) { newVersion = incrementAndGetVersion(); elementsToAdd.forEach(this::elementPushTail); } - storeChanges(region, key, new AddByteArraysTail(newVersion, elementsToAdd)); + storeChanges(context.getRegion(), key, new AddByteArraysTail(newVersion, elementsToAdd)); + context.fireEvent(RedisCommandType.RPUSH, key); + return elementList.size(); } @@ -214,6 +223,27 @@ public class RedisList extends AbstractRedisData { return popped; } + public static List<byte[]> blpop(ExecutionHandlerContext context, Command command, + List<RedisKey> keys, double timeoutSeconds) { + RegionProvider regionProvider = context.getRegionProvider(); + for (RedisKey key : keys) { + RedisList list = regionProvider.getTypedRedisData(REDIS_LIST, key, false); + if (!list.isNull()) { + byte[] poppedValue = list.lpop(context.getRegion(), key); + + // return the key and value + List<byte[]> result = new ArrayList<>(2); + result.add(key.toBytes()); + result.add(poppedValue); + return result; + } + } + + context.registerListener(new BlockingCommandListener(context, command, keys, timeoutSeconds)); + + return null; + } + /** * @return the number of elements in the list */ diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java new file mode 100644 index 0000000..bce6756 --- /dev/null +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.redis.internal.commands.Command; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.commands.executor.RedisResponse; +import org.apache.geode.redis.internal.data.RedisKey; +import org.apache.geode.redis.internal.netty.Coder; +import org.apache.geode.redis.internal.netty.ExecutionHandlerContext; + +public class BlockingCommandListener implements EventListener { + + private final ExecutionHandlerContext context; + private final Command command; + private final List<RedisKey> keys; + private final double timeoutSeconds; + private final long timeSubmitted; + private final AtomicBoolean active = new AtomicBoolean(true); + + /** + * Constructor to create an instance of a BlockingCommandListener in response to a blocking + * command. When receiving a relevant event, blocking commands simply resubmit the command + * into the Netty pipeline. + * + * @param context the associated ExecutionHandlerContext + * @param command the blocking command associated with this listener + * @param keys the list of keys the command is interested in + * @param timeoutSeconds the timeout for the command to block in seconds + */ + public BlockingCommandListener(ExecutionHandlerContext context, Command command, + List<RedisKey> keys, double timeoutSeconds) { + this.context = context; + this.command = command; + this.timeoutSeconds = timeoutSeconds; + this.keys = Collections.unmodifiableList(keys); + timeSubmitted = System.nanoTime(); + } + + @Override + public List<RedisKey> keys() { + return keys; + } + + @Override + public EventResponse process(RedisCommandType commandType, RedisKey key) { + if (!keys.contains(key)) { + return EventResponse.CONTINUE; + } + + resubmitCommand(); + return EventResponse.REMOVE_AND_STOP; + } + + @Override + public void resubmitCommand() { + if (!active.compareAndSet(true, false)) { + return; + } + + // Recalculate the timeout since we've already been waiting + double adjustedTimeoutSeconds = 0; + if (timeoutSeconds > 0.0D) { + long timeoutNanos = (long) (timeoutSeconds * 1e9); + long adjustedTimeoutNanos = timeoutNanos - (System.nanoTime() - timeSubmitted); + adjustedTimeoutNanos = Math.max(1, adjustedTimeoutNanos); + adjustedTimeoutSeconds = ((double) adjustedTimeoutNanos) / 1e9; + } + + // The commands we are currently supporting all have the timeout at the end of the argument + // list. Some newer Redis 7 commands (BLMPOP and BZMPOP) have the timeout as the first argument + // after the command. We'll need to adjust this once those commands are supported. + List<byte[]> commandArguments = command.getCommandArguments(); + commandArguments.set(commandArguments.size() - 1, Coder.doubleToBytes(adjustedTimeoutSeconds)); + + context.resubmitCommand(command); + } + + @Override + public void scheduleTimeout(ScheduledExecutorService executor, EventDistributor distributor) { + if (timeoutSeconds == 0 || !active.get()) { + return; + } + + long timeoutNanos = (long) (timeoutSeconds * 1e9); + executor.schedule(() -> timeout(distributor), timeoutNanos, TimeUnit.NANOSECONDS); + } + + @VisibleForTesting + void timeout(EventDistributor distributor) { + if (active.compareAndSet(true, false)) { + distributor.removeListener(this); + context.writeToChannel(RedisResponse.nilArray()); + } + } + + @Override + public String toString() { + return "BlockingCommandListener [command:" + command.getCommandType().name() + + " keys:" + keys + "]"; + } + +} diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java new file mode 100644 index 0000000..1a806bc --- /dev/null +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.cache.partition.PartitionListenerAdapter; +import org.apache.geode.internal.lang.utils.JavaWorkarounds; +import org.apache.geode.logging.internal.executors.LoggingThreadFactory; +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +public class EventDistributor extends PartitionListenerAdapter { + + private final Map<RedisKey, Queue<EventListener>> listeners = new ConcurrentHashMap<>(); + + private final ScheduledThreadPoolExecutor timerExecutor = + new ScheduledThreadPoolExecutor(1, + new LoggingThreadFactory("GeodeForRedisEventTimer-", true)); + + public EventDistributor() { + timerExecutor.setRemoveOnCancelPolicy(true); + } + + public synchronized void registerListener(EventListener listener) { + for (RedisKey key : listener.keys()) { + JavaWorkarounds.computeIfAbsent(listeners, key, k -> new LinkedBlockingQueue<>()) + .add(listener); + } + + listener.scheduleTimeout(timerExecutor, this); + } + + public void fireEvent(RedisCommandType command, RedisKey key) { + Queue<EventListener> listenerList = listeners.get(key); + if (listenerList == null) { + return; + } + + for (EventListener listener : listenerList) { + if (listener.process(command, key) == EventResponse.REMOVE_AND_STOP) { + removeListener(listener); + break; + } + } + } + + /** + * The total number of keys registered by all listeners (includes duplicates). + */ + @VisibleForTesting + public int getRegisteredKeys() { + return listeners.values().stream().mapToInt(Collection::size).sum(); + } + + @Override + public void afterBucketRemoved(int bucketId, Iterable<?> keys) { + Set<EventListener> resubmittingList = new HashSet<>(); + for (Map.Entry<RedisKey, Queue<EventListener>> entry : listeners.entrySet()) { + if (entry.getKey().getBucketId() == bucketId) { + resubmittingList.addAll(entry.getValue()); + } + } + + resubmittingList.forEach(x -> { + removeListener(x); + x.resubmitCommand(); + }); + } + + public synchronized void removeListener(EventListener listener) { + for (RedisKey key : listener.keys()) { + Queue<EventListener> listenerList = listeners.get(key); + if (listenerList == null) { + continue; + } + listenerList.remove(listener); + if (listenerList.isEmpty()) { + listeners.remove(key); + } + } + } + +} diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java new file mode 100644 index 0000000..c0e1f54 --- /dev/null +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +/** + * Interface intended to be implemented in order to receive Redis events. Specifically this would be + * keyspace events or blocking commands. EventListeners are registered with the + * {@link EventDistributor}. + */ +public interface EventListener { + + /** + * Receive and process an event. This method should execute very quickly. The return value + * determines additional process steps for the given event. + * + * @param commandType the command triggering the event + * @param key the key triggering the event + * @return response determining subsequent processing steps + */ + EventResponse process(RedisCommandType commandType, RedisKey key); + + /** + * Return the list of keys this listener is interested in. + */ + List<RedisKey> keys(); + + /** + * Method to resubmit a command if appropriate. This is only relevant for listeners that process + * events for blocking commands. Listeners that handle keyspace event notification will not use + * this. + */ + void resubmitCommand(); + + /** + * Schedule the removal of this listener using the passed in {@link ScheduledExecutorService} + * and {@link EventDistributor}. The implementation is responsible for cancelling the timeout if + * necessary. + * + * @param executor the ScheduledExecutorService to use for scheduling + * @param distributor the EventDistributor which would be used to remove this listener + */ + void scheduleTimeout(ScheduledExecutorService executor, EventDistributor distributor); +} diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java new file mode 100644 index 0000000..538e49d --- /dev/null +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.eventing; + +import org.apache.geode.redis.internal.commands.RedisCommandType; +import org.apache.geode.redis.internal.data.RedisKey; + +/** + * Response returned by {@link EventListener#process(RedisCommandType, RedisKey)} + */ +public enum EventResponse { + /** + * Response indicating that processing should continue. + */ + CONTINUE, + /** + * Response indicating that processing should stop and the listener should be + * removed. + */ + REMOVE_AND_STOP +} diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java index 50e71d3..1094636 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java @@ -155,11 +155,11 @@ public class Client { return result; } - public ChannelFuture writeToChannel(RedisResponse response) { + public void writeToChannel(RedisResponse response) { if (!logger.isDebugEnabled() && !response.hasAfterWriteCallback()) { - return channel.writeAndFlush(response.encode(byteBufAllocator)); + channel.writeAndFlush(response.encode(byteBufAllocator)); } else { - return channel.writeAndFlush(response.encode(byteBufAllocator)) + channel.writeAndFlush(response.encode(byteBufAllocator)) .addListener((ChannelFutureListener) f -> { response.afterWrite(); logResponse(response, channel.remoteAddress(), f.cause()); diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java index 62efc01..ee24e66 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java @@ -27,6 +27,7 @@ import static org.apache.geode.redis.internal.netty.StringBytesGlossary.INF; import static org.apache.geode.redis.internal.netty.StringBytesGlossary.INFINITY; import static org.apache.geode.redis.internal.netty.StringBytesGlossary.INTEGER_ID; import static org.apache.geode.redis.internal.netty.StringBytesGlossary.NIL; +import static org.apache.geode.redis.internal.netty.StringBytesGlossary.NIL_ARRAY; import static org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INF; import static org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INFINITY; import static org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INF_STRING; @@ -238,6 +239,11 @@ public class Coder { return buffer; } + public static ByteBuf getNilArrayResponse(ByteBuf buffer) { + buffer.writeBytes(NIL_ARRAY); + return buffer; + } + public static ByteBuf getEmptyResponse(ByteBuf buffer) { return buffer; } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java index 1c37039..5bb01a7 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java @@ -31,9 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.logging.log4j.Logger; import org.apache.shiro.subject.Subject; @@ -62,6 +60,8 @@ import org.apache.geode.redis.internal.data.RedisList; import org.apache.geode.redis.internal.data.RedisSet; import org.apache.geode.redis.internal.data.RedisSortedSet; import org.apache.geode.redis.internal.data.RedisString; +import org.apache.geode.redis.internal.eventing.EventDistributor; +import org.apache.geode.redis.internal.eventing.EventListener; import org.apache.geode.redis.internal.pubsub.PubSub; import org.apache.geode.redis.internal.services.RegionProvider; import org.apache.geode.redis.internal.services.locking.RedisSecurityService; @@ -103,9 +103,10 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { private final RedisStats redisStats; private final DistributedMember member; private final RedisSecurityService securityService; + private final EventDistributor eventDistributor; private BigInteger scanCursor; private final AtomicBoolean channelInactive = new AtomicBoolean(); - private final ChannelId channelId; + private final Channel channel; private final int serverPort; @@ -122,7 +123,9 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { String username, int serverPort, DistributedMember member, - RedisSecurityService securityService) { + RedisSecurityService securityService, + EventDistributor eventDistributor) { + this.channel = channel; this.regionProvider = regionProvider; this.pubsub = pubsub; this.allowUnsupportedSupplier = allowUnsupportedSupplier; @@ -132,15 +135,17 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { this.serverPort = serverPort; this.member = member; this.securityService = securityService; + this.eventDistributor = eventDistributor; scanCursor = new BigInteger("0"); - channelId = channel.id(); redisStats.addClient(); channel.closeFuture().addListener(future -> logout()); } - public ChannelFuture writeToChannel(RedisResponse response) { - return client.writeToChannel(response); + public void writeToChannel(RedisResponse response) { + if (response != RedisResponse.BLOCKED) { + client.writeToChannel(response); + } } /** @@ -149,13 +154,12 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Command command = (Command) msg; - command.setChannelHandlerContext(ctx); if (!channelInactive.get()) { try { - executeCommand(command); + executeCommand(ctx, command); redisStats.incCommandsProcessed(); } catch (Throwable ex) { - exceptionCaught(command.getChannelHandlerContext(), ex); + exceptionCaught(ctx, ex); } } } @@ -232,7 +236,14 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { } } - private void executeCommand(Command command) throws Exception { + public void resubmitCommand(Command command) { + ChannelHandlerContext ctx = + channel.pipeline().context(ByteToCommandDecoder.class.getSimpleName()); + ctx.fireChannelRead(command); + } + + private void executeCommand(ChannelHandlerContext channelContext, Command command) + throws Exception { try { if (logger.isDebugEnabled()) { logger.debug("Executing Redis command: {} - {}", command, @@ -277,7 +288,7 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { } if (command.isOfType(RedisCommandType.QUIT)) { - channelInactive(command.getChannelHandlerContext()); + channelInactive(channelContext); } } catch (Exception e) { if (!(e instanceof RedisDataMovedException)) { @@ -306,7 +317,7 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { */ private void logout() { if (subject != null) { - securityService.logout(channelId); + securityService.logout(channel.id()); subject = null; } } @@ -393,7 +404,7 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { } public Subject login(Properties properties) { - subject = securityService.login(channelId, properties); + subject = securityService.login(channel.id(), properties); return subject; } @@ -408,6 +419,13 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { } } + public void registerListener(EventListener listener) { + eventDistributor.registerListener(listener); + } + + public void fireEvent(RedisCommandType command, RedisKey key) { + eventDistributor.fireEvent(command, key); + } public Region<RedisKey, RedisData> getRegion() { return getRegionProvider().getLocalDataRegion(); diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java index ff3c5f5..0d85e19 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java @@ -63,6 +63,7 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel; import org.apache.geode.logging.internal.executors.LoggingThreadFactory; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.management.ManagementException; +import org.apache.geode.redis.internal.eventing.EventDistributor; import org.apache.geode.redis.internal.pubsub.PubSub; import org.apache.geode.redis.internal.services.RegionProvider; import org.apache.geode.redis.internal.services.locking.RedisSecurityService; @@ -87,12 +88,14 @@ public class NettyRedisServer { private final int serverPort; private final DistributedMember member; private final RedisSecurityService securityService; + private final EventDistributor eventDistributor; private final int writeTimeoutSeconds; public NettyRedisServer(Supplier<DistributionConfig> configSupplier, RegionProvider regionProvider, PubSub pubsub, Supplier<Boolean> allowUnsupportedSupplier, int port, String requestedAddress, RedisStats redisStats, - DistributedMember member, RedisSecurityService securityService) { + DistributedMember member, RedisSecurityService securityService, + EventDistributor eventDistributor) { this.configSupplier = configSupplier; this.regionProvider = regionProvider; this.pubsub = pubsub; @@ -100,6 +103,7 @@ public class NettyRedisServer { this.redisStats = redisStats; this.member = member; this.securityService = securityService; + this.eventDistributor = eventDistributor; writeTimeoutSeconds = getIntegerSystemProperty(WRITE_TIMEOUT_SECONDS, DEFAULT_REDIS_WRITE_TIMEOUT_SECONDS, 1); @@ -170,7 +174,7 @@ public class NettyRedisServer { pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(), new ExecutionHandlerContext(socketChannel, regionProvider, pubsub, allowUnsupportedSupplier, redisStats, redisUsername, - getPort(), member, securityService)); + getPort(), member, securityService, eventDistributor)); } }; } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java index 62bd890..4f21013 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java @@ -62,6 +62,12 @@ public class StringBytesGlossary { public static final byte[] NIL = stringToBytes("$-1\r\n"); /** + * byte array of a nil array response + */ + @MakeImmutable + public static final byte[] NIL_ARRAY = stringToBytes("*-1\r\n"); + + /** * byte array of an empty array */ @MakeImmutable diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java index a130736..3fd6278 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java @@ -33,6 +33,7 @@ import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.partition.PartitionListener; import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionConfig; @@ -82,7 +83,7 @@ public class RegionProvider { private final String redisRegionName; public RegionProvider(InternalCache cache, StripedCoordinator stripedCoordinator, - RedisStats redisStats) { + RedisStats redisStats, PartitionListener partitionListener) { this.stripedCoordinator = stripedCoordinator; this.redisStats = redisStats; @@ -95,6 +96,7 @@ public class RegionProvider { attributesFactory.setRedundantCopies(config.getRedisRedundantCopies()); attributesFactory.setPartitionResolver(new RedisPartitionResolver()); attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS); + attributesFactory.addPartitionListener(partitionListener); redisDataRegionFactory.setPartitionAttributes(attributesFactory.create()); redisRegionName = diff --git a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java index 6e4895b..fefe3e1 100644 --- a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java +++ b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java @@ -42,7 +42,7 @@ public class ExecutionHandlerContextTest { when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class)); RedisStats redisStats = mock(RedisStats.class); return new ExecutionHandlerContext(channel, null, null, null, redisStats, null, 0, null, - securityService); + securityService, null); } @Test