This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6b43b76 GEODE-10048: Add framework for Redis events and BLPOP command
(#7408)
6b43b76 is described below
commit 6b43b76e3d5a9c5b0c84df8e1995d61528871068
Author: Jens Deppe <[email protected]>
AuthorDate: Tue Mar 22 19:47:00 2022 -0700
GEODE-10048: Add framework for Redis events and BLPOP command (#7408)
* GEODE-10048: Add framework for Redis events and BLPOP command
This commit adds a simple eventing framework to be used by blocking
commands as well as keyspace event notification (still to be
implemented). The main components are:
- EventListener: an interface to be immplemented by anything wishing to
receive events. Currently only implemented for blocking commands in
the form of BlockingCommandListener.
- EventDistributor: the component to which listeners are registered and
where events are received and distributed. A single EventDistributor
exists in the system and is associated with each
ExecutionHandlerContext.
- Event: not implemented as a separate class but logically consists of
the command (RedisCommandType) and key (RedisKey).
When a blocking command receives a relevant event the command is
resubmitted into the Netty pipeline. This also means that something
could happen (another command) that causes the blocking command to
re-block and not complete. This is also what happens with native Redis.
For example:
- BLPOP 0 A executes and blocks
- LPUSH A some-value
- Before LPUSH fires an event, LPOP A is received but needs to wait to
acquire the lock on A
- LPUSH fires an event which the BLPOP listener receives and resubmits
BLPOP into the pipeline
- Once LPUSH completes, LPOP is next and removes A
- BLPOP A runs and ends up blocking again because there is nothing to
pop from A
---
.../list/BLPopNativeRedisAcceptanceTest.java | 41 ++++
.../commands/executor/list/BLPopDUnitTest.java | 120 ++++++++++++
.../list/AbstractBLPopIntegrationTest.java | 201 +++++++++++++++++++
.../executor/list/BLPopIntegrationTest.java | 37 ++++
.../server/AbstractHitsMissesIntegrationTest.java | 6 +
.../eventing/BlockingCommandListenerTest.java | 182 ++++++++++++++++++
.../internal/eventing/EventDistributorTest.java | 213 +++++++++++++++++++++
.../apache/geode/codeAnalysis/excludedClasses.txt | 23 +--
.../geode/redis/internal/GeodeRedisServer.java | 12 +-
.../geode/redis/internal/RedisConstants.java | 2 +
.../geode/redis/internal/commands/Command.java | 21 +-
.../redis/internal/commands/RedisCommandType.java | 3 +
.../internal/commands/executor/RedisResponse.java | 12 ++
.../commands/executor/list/BLPopExecutor.java | 61 ++++++
.../commands/executor/list/LPushExecutor.java | 5 +-
.../commands/executor/list/RPushExecutor.java | 2 +-
.../geode/redis/internal/data/NullRedisList.java | 17 +-
.../geode/redis/internal/data/RedisList.java | 50 ++++-
.../internal/eventing/BlockingCommandListener.java | 123 ++++++++++++
.../redis/internal/eventing/EventDistributor.java | 105 ++++++++++
.../redis/internal/eventing/EventListener.java | 62 ++++++
.../redis/internal/eventing/EventResponse.java | 34 ++++
.../apache/geode/redis/internal/netty/Client.java | 6 +-
.../apache/geode/redis/internal/netty/Coder.java | 6 +
.../internal/netty/ExecutionHandlerContext.java | 46 +++--
.../redis/internal/netty/NettyRedisServer.java | 8 +-
.../redis/internal/netty/StringBytesGlossary.java | 6 +
.../redis/internal/services/RegionProvider.java | 4 +-
.../netty/ExecutionHandlerContextTest.java | 2 +-
29 files changed, 1344 insertions(+), 66 deletions(-)
diff --git
a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java
b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java
new file mode 100755
index 0000000..2c54bdd
--- /dev/null
+++
b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.NativeRedisClusterTestRule;
+
+public class BLPopNativeRedisAcceptanceTest extends
AbstractBLPopIntegrationTest {
+
+ @ClassRule
+ public static NativeRedisClusterTestRule redis = new
NativeRedisClusterTestRule();
+
+ @Override
+ public int getPort() {
+ return redis.getExposedPorts().get(0);
+ }
+
+ @Override
+ public void flushAll() {
+ redis.flushAll();
+ }
+
+ @Override
+ public void awaitEventDistributorSize(int size) throws Exception {
+ Thread.sleep(500);
+ }
+}
diff --git
a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java
b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java
new file mode 100644
index 0000000..ff1b394
--- /dev/null
+++
b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.redis.internal.GeodeRedisService;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class BLPopDUnitTest {
+
+ @ClassRule
+ public static RedisClusterStartupRule cluster = new
RedisClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ private static final String LIST_KEY = "key";
+ private static JedisCluster jedis;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+ private static int locatorPort;
+ private static int redisServerPort;
+
+ @BeforeClass
+ public static void classSetup() {
+ MemberVM locator = cluster.startLocatorVM(0);
+ locatorPort = locator.getPort();
+
+ server1 = cluster.startRedisVM(1, locatorPort);
+ server2 = cluster.startRedisVM(2, locatorPort);
+
+ redisServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ server3 = cluster.startRedisVM(3, Integer.toString(redisServerPort),
locatorPort);
+
+ jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort),
REDIS_CLIENT_TIMEOUT,
+ 20);
+ }
+
+ @After
+ public void tearDown() {
+ cluster.flushAll();
+ }
+
+ @Test
+ public void testClientRepeatsBLpopAfterServerCrash() throws Exception {
+ cluster.moveBucketForKey(LIST_KEY, "server-3");
+ Future<List<String>> blpopFuture = executor.submit(() -> jedis.blpop(0,
LIST_KEY));
+
+ try {
+ cluster.crashVM(3);
+ jedis.lpush(LIST_KEY, "value");
+ List<String> result = blpopFuture.get();
+ assertThat(result).containsExactly(LIST_KEY, "value");
+ } finally {
+ cluster.startRedisVM(3, Integer.toString(redisServerPort), locatorPort);
+ cluster.rebalanceAllRegions();
+ }
+ }
+
+ @Test
+ public void testBLPopFollowsBucketMovement() throws Exception {
+ Future<List<String>> blpopFuture = executor.submit(() -> jedis.blpop(0,
LIST_KEY));
+
+ for (int i = 0; i < 11; i++) {
+ cluster.moveBucketForKey(LIST_KEY);
+ Thread.sleep(500);
+ }
+
+ jedis.lpush(LIST_KEY, "value");
+
+ assertThat(blpopFuture.get()).containsExactly(LIST_KEY, "value");
+
+ int registeredListeners = getRegisteredListeners(server1);
+ registeredListeners += getRegisteredListeners(server2);
+ registeredListeners += getRegisteredListeners(server3);
+
+ assertThat(registeredListeners).isEqualTo(0);
+ }
+
+ private int getRegisteredListeners(MemberVM vm) {
+ return vm.invoke(() -> {
+ GeodeRedisService service =
ClusterStartupRule.getCache().getService(GeodeRedisService.class);
+ return
service.getRedisServer().getEventDistributor().getRegisteredKeys();
+ });
+ }
+
+
+}
diff --git
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
new file mode 100755
index 0000000..fd9b12a
--- /dev/null
+++
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public abstract class AbstractBLPopIntegrationTest implements
RedisIntegrationTest {
+ private static final String KEY = "key";
+
+ protected JedisCluster jedis;
+
+ public abstract void awaitEventDistributorSize(int size) throws Exception;
+
+ @ClassRule
+ public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @Before
+ public void setUp() {
+ jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()),
REDIS_CLIENT_TIMEOUT);
+ }
+
+ @After
+ public void tearDown() {
+ flushAll();
+ jedis.close();
+ }
+
+ @Test
+ public void testInvalidArguments_throwErrors() {
+ assertAtLeastNArgs(jedis, Protocol.Command.BLPOP, 2);
+ }
+
+ @Test
+ public void testInvalidTimeout_throwsError() {
+ assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP,
"key1",
+ "0.A"))
+ .hasMessage(RedisConstants.ERROR_TIMEOUT_INVALID);
+ }
+
+ @Test
+ public void testKeysInDifferentSlots_throwsError() {
+ assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP,
"key1",
+ "key2", "0"))
+ .hasMessage(RedisConstants.ERROR_WRONG_SLOT);
+ }
+
+ @Test
+ public void testNegativeTimeout_throwsError() {
+ assertThatThrownBy(() -> jedis.blpop(-1, "key1"))
+ .hasMessage(RedisConstants.ERROR_NEGATIVE_TIMEOUT);
+ }
+
+ @Test
+ public void testBLPopForNonListKey() {
+ jedis.set("not-a-list", "value");
+
+ assertThatThrownBy(() -> jedis.blpop(0, "not-a-list"))
+ .hasMessage(RedisConstants.ERROR_WRONG_TYPE);
+ }
+
+ @Test
+ public void testBLPopWhenValueExists() {
+ jedis.lpush(KEY, "value1", "value2");
+
+ List<String> result = jedis.blpop(0, KEY);
+
+ assertThat(result).containsExactly(KEY, "value2");
+ assertThat(jedis.lpop(KEY)).isEqualTo("value1");
+ }
+
+ @Test
+ public void testBLPopDoesNotError_whenTimeoutHasExponent() {
+ jedis.lpush(KEY, "value1", "value2");
+
+ Object result = jedis.sendCommand(KEY, Protocol.Command.BLPOP, KEY,
"1E+3");
+
+ assertThat(result).isNotNull();
+ }
+
+ @Test
+ public void testBLPopWhenValueDoesNotExist() throws Exception {
+ Future<List<String>> future = executor.submit(() -> jedis.blpop(0, KEY));
+
+ awaitEventDistributorSize(1);
+ jedis.lpush(KEY, "value1", "value2");
+
+ assertThat(future.get()).containsExactly(KEY, "value2");
+ assertThat(jedis.lpop(KEY)).isEqualTo("value1");
+ }
+
+ @Test
+ public void testRPushFiresEventForBLPop() throws Exception {
+ Future<List<String>> future = executor.submit(() -> jedis.blpop(0, KEY));
+
+ awaitEventDistributorSize(1);
+ jedis.rpush(KEY, "value1", "value2");
+
+ assertThat(future.get()).containsExactly(KEY, "value1");
+ assertThat(jedis.lpop(KEY)).isEqualTo("value2");
+ }
+
+ @Test
+ public void testBLPopWhenTimeoutIsExceeded() {
+ int timeout = 10;
+ Future<List<String>> future = executor.submit(() -> jedis.blpop(timeout,
KEY));
+ GeodeAwaitility.await().atMost(timeout * 2, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertThat(future.get()).isNull());
+ }
+
+ @Test
+ public void testBLpopFirstKeyDoesNotExistButSecondOneDoes() {
+ jedis.lpush("{A}key2", "value2");
+
+ List<String> result = jedis.blpop(0, "{A}key1", "{A}key2");
+
+ assertThat(result).containsExactly("{A}key2", "value2");
+ }
+
+ @Test
+ public void testBLpopKeyOrderingIsCorrect() {
+ jedis.lpush("{A}key2", "value2");
+ jedis.lpush("{A}key3", "value3");
+
+ List<String> result = jedis.blpop(0, "{A}key1", "{A}key3", "{A}key2");
+
+ assertThat(result).containsExactly("{A}key3", "value3");
+ }
+
+ @Test
+ public void testConcurrentBLPop() throws Exception {
+ int totalElements = 10_000;
+ List<Object> accumulated = Collections.synchronizedList(new
ArrayList<>(totalElements + 2));
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ Future<Void> future1 = executor.submit(() -> doBLPop(running,
accumulated));
+ Future<Void> future2 = executor.submit(() -> doBLPop(running,
accumulated));
+
+ List<String> result = new ArrayList<>();
+ for (int i = 0; i < totalElements; i++) {
+ jedis.lpush(KEY, "value" + i);
+ result.add("value" + i);
+ }
+
+ GeodeAwaitility.await()
+ .untilAsserted(() ->
assertThat(accumulated.size()).isEqualTo(totalElements));
+
+ running.set(false);
+
+ jedis.lpush(KEY, "stop1");
+ jedis.lpush(KEY, "stop2");
+ result.add("stop1");
+ result.add("stop2");
+ future1.get();
+ future2.get();
+
+ assertThat(accumulated).containsExactlyInAnyOrderElementsOf(result);
+ }
+
+ private void doBLPop(AtomicBoolean running, List<Object> accumulated) {
+ while (running.get()) {
+ accumulated.add(jedis.blpop(0, KEY).get(1));
+ }
+ }
+
+}
diff --git
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java
new file mode 100755
index 0000000..f77abc4
--- /dev/null
+++
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public class BLPopIntegrationTest extends AbstractBLPopIntegrationTest {
+
+ @ClassRule
+ public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+ @Override
+ public int getPort() {
+ return server.getPort();
+ }
+
+ public void awaitEventDistributorSize(int size) {
+ EventDistributor distributor = server.getServer().getEventDistributor();
+ GeodeAwaitility.await().until(() -> distributor.getRegisteredKeys() ==
size);
+ }
+}
diff --git
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
index 73546e1..50000cb 100644
---
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
+++
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
@@ -575,6 +575,12 @@ public abstract class AbstractHitsMissesIntegrationTest
implements RedisIntegrat
/************* List related commands *************/
@Test
+ public void testBLPop() {
+ jedis.lpush(LIST_KEY, "element");
+ runCommandAndAssertNoStatUpdates(LIST_KEY, k -> jedis.blpop(0, k));
+ }
+
+ @Test
public void testLindex() {
runCommandAndAssertHitsAndMisses(LIST_KEY, k -> jedis.lindex(k, 1));
}
diff --git
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java
new file mode 100644
index 0000000..a984fc6
--- /dev/null
+++
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BlockingCommandListenerTest {
+
+ @Test
+ public void testTimeoutIsAdjusted() {
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+ Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+ BlockingCommandListener listener =
+ new BlockingCommandListener(context, command, Collections.emptyList(),
1.0D);
+
+ listener.resubmitCommand();
+
+ ArgumentCaptor<Command> argumentCaptor =
ArgumentCaptor.forClass(Command.class);
+ verify(context, times(1)).resubmitCommand(argumentCaptor.capture());
+
+ double timeout =
Coder.bytesToDouble(argumentCaptor.getValue().getCommandArguments().get(0));
+ assertThat(timeout).isLessThan(1.0D);
+ }
+
+ @Test
+ public void testAdjustedTimeoutDoesNotBecomeNegative() {
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+ Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+ BlockingCommandListener listener =
+ new BlockingCommandListener(context, command, Collections.emptyList(),
1e-9);
+
+ listener.resubmitCommand();
+
+ ArgumentCaptor<Command> argumentCaptor =
ArgumentCaptor.forClass(Command.class);
+ verify(context, times(1)).resubmitCommand(argumentCaptor.capture());
+
+ double timeout =
Coder.bytesToDouble(argumentCaptor.getValue().getCommandArguments().get(0));
+ assertThat(timeout).isEqualTo(1e-9);
+ }
+
+ @Test
+ public void testListenerIsRemovedAfterTimeout() {
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+ Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+ BlockingCommandListener listener =
+ new BlockingCommandListener(context, command, Collections.emptyList(),
1e-9);
+ EventDistributor eventDistributor = new EventDistributor();
+
+ eventDistributor.registerListener(listener);
+
+ await().atMost(Duration.ofSeconds(1))
+ .untilAsserted(() ->
assertThat(eventDistributor.getRegisteredKeys()).isEqualTo(0));
+ }
+
+ @Test
+ public void
testListenersAffectedByBucketMovementAreInactiveAndDoNotProcessTimeout() {
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ RedisKey key = new RedisKey("KEY".getBytes());
+ List<byte[]> commandArgs = Arrays.asList(key.toBytes(), "0".getBytes());
+ Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+ BlockingCommandListener listener =
+ new BlockingCommandListener(context, command,
Collections.singletonList(key), 1);
+ EventDistributor eventDistributor = new EventDistributor();
+
+ eventDistributor.registerListener(listener);
+
+ eventDistributor.afterBucketRemoved(key.getBucketId(), null);
+
+ verify(context, atMost(1)).resubmitCommand(any());
+ await().atMost(Duration.ofSeconds(5))
+ .during(Duration.ofSeconds(2))
+ .untilAsserted(() -> verify(context, atMost(0)).writeToChannel(any()));
+ assertThat(eventDistributor.getRegisteredKeys()).isEqualTo(0);
+ }
+
+ @Test
+ public void testResubmitAndTimeoutDoNotBothExecute() {
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+ Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+ BlockingCommandListener listener =
+ new BlockingCommandListener(context, command,
Arrays.asList(command.getKey()), 0);
+ AtomicReference<BlockingCommandListener> listenerRef = new
AtomicReference<>(listener);
+ EventDistributor eventDistributor = new EventDistributor();
+
+ eventDistributor.registerListener(listener);
+
+ new ConcurrentLoopingThreads(10_000,
+ i -> listenerRef.get().process(null, command.getKey()),
+ i -> listenerRef.get().timeout(eventDistributor))
+ .runWithAction(() -> {
+ // Verify that resubmitCommand and timeout have not both been
called
+ try {
+ verify(context, atLeastOnce()).resubmitCommand(any());
+ verify(context, never()).writeToChannel(any());
+ } catch (AssertionError e) {
+ verify(context, never()).resubmitCommand(any());
+ verify(context, atLeastOnce()).writeToChannel(any());
+ }
+ reset(context);
+ eventDistributor.removeListener(listenerRef.get());
+ listenerRef.set(new BlockingCommandListener(context, command,
+ Arrays.asList(command.getKey()), 0));
+ eventDistributor.registerListener(listenerRef.get());
+ });
+ }
+
+ @Test
+ public void concurrencyOfManyRegistrationsForTheSameKeys() {
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ EventDistributor distributor = new EventDistributor();
+ RedisKey keyA = new RedisKey("keyA".getBytes());
+ RedisKey keyB = new RedisKey("keyB".getBytes());
+ RedisKey keyC = new RedisKey("keyC".getBytes());
+
+ // Should not produce any exceptions
+ new ConcurrentLoopingThreads(10_000,
+ i -> registerListener(distributor, context, keyA, keyB, keyC),
+ i -> registerListener(distributor, context, keyB, keyC, keyA),
+ i -> registerListener(distributor, context, keyC, keyA, keyB),
+ i -> distributor.afterBucketRemoved(keyA.getBucketId(), null),
+ i -> distributor.afterBucketRemoved(keyB.getBucketId(), null),
+ i -> distributor.afterBucketRemoved(keyC.getBucketId(), null),
+ i -> distributor.fireEvent(null, keyA),
+ i -> distributor.fireEvent(null, keyB),
+ i -> distributor.fireEvent(null, keyC))
+ .run();
+
+ await().atMost(Duration.ofSeconds(1))
+ .untilAsserted(() ->
assertThat(distributor.getRegisteredKeys()).isEqualTo(0));
+ }
+
+ private void registerListener(EventDistributor distributor,
ExecutionHandlerContext context,
+ RedisKey... keys) {
+ List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+ Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+ BlockingCommandListener listener =
+ new BlockingCommandListener(context, command, Arrays.asList(keys),
1e-9);
+ distributor.registerListener(listener);
+ }
+}
diff --git
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java
new file mode 100644
index 0000000..d8a4d1c
--- /dev/null
+++
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributorTest {
+
+ public static class TestEventListener implements EventListener {
+ private final List<RedisKey> keys;
+ private int fired = 0;
+
+ public TestEventListener(RedisKey... keys) {
+ this.keys = Arrays.stream(keys).collect(Collectors.toList());
+ }
+
+ public int getFired() {
+ return fired;
+ }
+
+ @Override
+ public EventResponse process(RedisCommandType commandType, RedisKey key) {
+ fired += 1;
+ return EventResponse.REMOVE_AND_STOP;
+ }
+
+ @Override
+ public List<RedisKey> keys() {
+ return keys;
+ }
+
+ @Override
+ public void resubmitCommand() {}
+
+ @Override
+ public void scheduleTimeout(ScheduledExecutorService executor,
EventDistributor distributor) {}
+ }
+
+ @Test
+ public void firingEventRemovesListener() {
+ RedisKey keyA = new RedisKey("a".getBytes());
+ RedisKey keyB = new RedisKey("b".getBytes());
+ EventDistributor distributor = new EventDistributor();
+ TestEventListener listener = new TestEventListener(keyA, keyB);
+ distributor.registerListener(listener);
+
+ distributor.fireEvent(null, keyA);
+ assertThat(listener.getFired()).isEqualTo(1);
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+ }
+
+ @Test
+ public void firingEventRemovesFirstListener_whenMultipleExist() {
+ RedisKey keyA = new RedisKey("a".getBytes());
+ RedisKey keyB = new RedisKey("b".getBytes());
+ EventDistributor distributor = new EventDistributor();
+ TestEventListener listener1 = new TestEventListener(keyA, keyB);
+ TestEventListener listener2 = new TestEventListener(keyA, keyB);
+ distributor.registerListener(listener1);
+ distributor.registerListener(listener2);
+
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(4);
+
+ distributor.fireEvent(null, keyA);
+ assertThat(listener1.getFired()).isEqualTo(1);
+ assertThat(listener2.getFired()).isEqualTo(0);
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(2);
+
+ distributor.fireEvent(null, keyA);
+ assertThat(listener1.getFired()).isEqualTo(1);
+ assertThat(listener2.getFired()).isEqualTo(1);
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+ }
+
+ @Test
+ public void listenerIsRemovedAfterBucketMoves() {
+ RedisKey keyA = new RedisKey("a".getBytes());
+ RedisKey keyB = new RedisKey("b".getBytes());
+ EventDistributor distributor = new EventDistributor();
+ TestEventListener listener1 = new TestEventListener(keyA);
+ distributor.registerListener(listener1);
+ distributor.registerListener(new TestEventListener(keyB));
+
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(2);
+
+ distributor.afterBucketRemoved(keyA.getBucketId(), null);
+
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(1);
+ }
+
+ @Test
+ public void concurrencyOfManyRegistrationsAndBucketMovementForTheSameKeys() {
+ EventDistributor distributor = new EventDistributor();
+ RedisKey keyA = new RedisKey("keyA".getBytes());
+ RedisKey keyB = new RedisKey("keyB".getBytes());
+ RedisKey keyC = new RedisKey("keyC".getBytes());
+
+ // Should not produce any exceptions
+ new ConcurrentLoopingThreads(10_000,
+ i -> distributor.registerListener(new TestEventListener(keyA, keyB,
keyC)),
+ i -> distributor.registerListener(new TestEventListener(keyB, keyC,
keyA)),
+ i -> distributor.registerListener(new TestEventListener(keyC, keyA,
keyB)),
+ i -> distributor.afterBucketRemoved(keyA.getBucketId(), null),
+ i -> distributor.afterBucketRemoved(keyB.getBucketId(), null),
+ i -> distributor.afterBucketRemoved(keyC.getBucketId(), null),
+ i -> distributor.fireEvent(null, keyA),
+ i -> distributor.fireEvent(null, keyB),
+ i -> distributor.fireEvent(null, keyC))
+ .runInLockstep();
+
+ distributor.fireEvent(null, keyA);
+ distributor.fireEvent(null, keyB);
+ distributor.fireEvent(null, keyC);
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+ }
+
+ @Test
+ public void concurrencyOfManyRegistrationAndRemovalOfSameListener() {
+ EventDistributor distributor = new EventDistributor();
+ RedisKey keyA = new RedisKey("keyA".getBytes());
+ RedisKey keyB = new RedisKey("keyB".getBytes());
+ RedisKey keyC = new RedisKey("keyC".getBytes());
+ AtomicReference<EventListener> listenerRef1 =
+ new AtomicReference<>(new TestEventListener(keyA, keyB, keyC));
+ AtomicReference<EventListener> listenerRef2 =
+ new AtomicReference<>(new TestEventListener(keyB, keyC, keyA));
+ AtomicReference<EventListener> listenerRef3 =
+ new AtomicReference<>(new TestEventListener(keyC, keyA, keyB));
+
+ // Should not produce any exceptions
+ new ConcurrentLoopingThreads(10_000,
+ i -> distributor.registerListener(listenerRef1.get()),
+ i -> distributor.registerListener(listenerRef2.get()),
+ i -> distributor.registerListener(listenerRef3.get()),
+ i -> distributor.removeListener(listenerRef1.get()),
+ i -> distributor.removeListener(listenerRef2.get()),
+ i -> distributor.removeListener(listenerRef3.get()),
+ i -> distributor.fireEvent(null, keyA),
+ i -> distributor.fireEvent(null, keyB),
+ i -> distributor.fireEvent(null, keyC))
+ .runWithAction(() -> {
+ listenerRef1.set(new TestEventListener(keyA, keyB, keyC));
+ listenerRef2.set(new TestEventListener(keyB, keyC, keyA));
+ listenerRef3.set(new TestEventListener(keyC, keyA, keyB));
+ });
+
+ distributor.fireEvent(null, keyA);
+ distributor.fireEvent(null, keyB);
+ distributor.fireEvent(null, keyC);
+ assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+ }
+
+ /**
+ * When a key points to a Queue (of listeners) that becomes empty that key is
+ * removed. Without correct synchronization we could add a new listener to a
queue
+ * just before the key (and thus the queue) is removed. This test ensures
that is
+ * not happening.
+ */
+ @Test
+ public void ensureNotRegisteringListenerOnQueueJustRemoved() {
+ EventDistributor distributor = new EventDistributor();
+ RedisKey keyA = new RedisKey("keyA".getBytes());
+ AtomicReference<EventListener> listenerRef1 =
+ new AtomicReference<>(new TestEventListener(keyA));
+ AtomicReference<EventListener> listenerRef2 =
+ new AtomicReference<>(new TestEventListener(keyA));
+
+ distributor.registerListener(listenerRef1.get());
+ AtomicInteger iteration = new AtomicInteger(0);
+
+ new ConcurrentLoopingThreads(10_000,
+ iteration::set,
+ i -> distributor.registerListener(listenerRef2.get()),
+ i -> distributor.removeListener(listenerRef1.get()))
+ .runWithAction(() -> {
+ assertThat(distributor.getRegisteredKeys())
+ .as("Iteration = " + iteration.get()).isEqualTo(1);
+ distributor.removeListener(listenerRef2.get());
+
+ listenerRef1.set(new TestEventListener(keyA));
+ listenerRef2.set(new TestEventListener(keyA));
+
+ distributor.registerListener(listenerRef1.get());
+ });
+ }
+
+}
diff --git
a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index fee9c78..29d0c21 100644
---
a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++
b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -1,20 +1,21 @@
-org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction
-org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap
-org/apache/geode/redis/internal/data/collections/SizeableByteArrayList
-org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor
-org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor
+org/apache/geode/redis/internal/RedisException
+org/apache/geode/redis/internal/commands/RedisCommandType
+org/apache/geode/redis/internal/commands/RedisCommandType$Category
+org/apache/geode/redis/internal/commands/RedisCommandType$Flag
+org/apache/geode/redis/internal/commands/executor/sortedset/ZAggregator
+org/apache/geode/redis/internal/commands/executor/string/BitOpExecutor$BitOp
org/apache/geode/redis/internal/data/RedisCrossSlotException
org/apache/geode/redis/internal/data/RedisDataMovedException
org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
-org/apache/geode/redis/internal/commands/executor/sortedset/ZAggregator
-org/apache/geode/redis/internal/RedisException
org/apache/geode/redis/internal/data/RedisHash$Hash
org/apache/geode/redis/internal/data/RedisKeyExistsException
org/apache/geode/redis/internal/data/RedisSet$MemberSet
org/apache/geode/redis/internal/data/RedisSortedSet$MemberMap
-org/apache/geode/redis/internal/commands/executor/string/BitOpExecutor$BitOp
+org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap
+org/apache/geode/redis/internal/data/collections/SizeableByteArrayList
+org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor
+org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor
+org/apache/geode/redis/internal/eventing/EventResponse
org/apache/geode/redis/internal/pubsub/Publisher$PublishFunction
+org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction
org/apache/geode/redis/internal/services/locking/StripedExecutorService$State
-org/apache/geode/redis/internal/commands/RedisCommandType
-org/apache/geode/redis/internal/commands/RedisCommandType$Category
-org/apache/geode/redis/internal/commands/RedisCommandType$Flag
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 8db6c30..29a0ed5 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -27,6 +27,7 @@ import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.internal.statistics.StatisticsClockFactory;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
import org.apache.geode.redis.internal.netty.Coder;
import org.apache.geode.redis.internal.netty.NettyRedisServer;
import org.apache.geode.redis.internal.pubsub.PubSub;
@@ -62,6 +63,7 @@ public class GeodeRedisServer {
private final RegionProvider regionProvider;
private final PubSub pubSub;
private final RedisStats redisStats;
+ private final EventDistributor eventDistributor;
private boolean shutdown;
/**
@@ -81,7 +83,8 @@ public class GeodeRedisServer {
StripedCoordinator stripedCoordinator = new LockingStripedCoordinator();
RedisMemberInfoRetrievalFunction infoFunction =
RedisMemberInfoRetrievalFunction.register();
- regionProvider = new RegionProvider(cache, stripedCoordinator, redisStats);
+ eventDistributor = new EventDistributor();
+ regionProvider = new RegionProvider(cache, stripedCoordinator, redisStats,
eventDistributor);
pubSub = new PubSubImpl(new Subscriptions(redisStats), regionProvider,
redisStats);
activeExpirationManager = new ActiveExpirationManager(regionProvider);
@@ -92,7 +95,7 @@ public class GeodeRedisServer {
nettyRedisServer = new NettyRedisServer(() ->
cache.getInternalDistributedSystem().getConfig(),
regionProvider, pubSub,
this::allowUnsupportedCommands, port, bindAddress, redisStats,
- member, securityService);
+ member, securityService, eventDistributor);
infoFunction.initialize(member, bindAddress, nettyRedisServer.getPort());
}
@@ -102,6 +105,11 @@ public class GeodeRedisServer {
return ((PubSubImpl) pubSub).getSubscriptionCount();
}
+ @VisibleForTesting
+ public EventDistributor getEventDistributor() {
+ return eventDistributor;
+ }
+
private static RedisStats createStats(InternalCache cache) {
InternalDistributedSystem system = cache.getInternalDistributedSystem();
StatisticsClock statisticsClock =
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
index acb1093..1be3ccc 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
@@ -104,4 +104,6 @@ public class RedisConstants {
"ERR wrong number of arguments for '%s' command";
public static final String ERROR_BITOP_NOT_MUST_USE_SINGLE_KEY =
"ERR BITOP NOT must be called with a single source key.";
+ public static final String ERROR_TIMEOUT_INVALID = "ERR timeout is not a
float or out of range";
+ public static final String ERROR_NEGATIVE_TIMEOUT = "ERR timeout is
negative";
}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
index 8758a00..a44b7f8 100755
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
@@ -22,8 +22,6 @@ import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.stream.Collectors;
-import io.netty.channel.ChannelHandlerContext;
-
import org.apache.geode.redis.internal.commands.executor.RedisResponse;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -47,6 +45,16 @@ public class Command {
commandType = null;
}
+ public Command(RedisCommandType command, List<byte[]> commandElems) {
+ if (commandElems == null || commandElems.isEmpty()) {
+ throw new IllegalArgumentException(
+ "List of command elements cannot be empty -> List:" + commandElems);
+ }
+
+ this.commandType = command;
+ this.commandElems = commandElems;
+ }
+
/**
* Constructor for {@link Command}. Must initialize Command with a {@link
SocketChannel} and a
* {@link List} of command elements
@@ -222,13 +230,4 @@ public class Command {
return result;
}
- private ChannelHandlerContext channelHandlerContext;
-
- public void setChannelHandlerContext(ChannelHandlerContext ctx) {
- channelHandlerContext = ctx;
- }
-
- public ChannelHandlerContext getChannelHandlerContext() {
- return channelHandlerContext;
- }
}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
index d5e7d58..e734c06 100755
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
@@ -78,6 +78,7 @@ import
org.apache.geode.redis.internal.commands.executor.key.RestoreExecutor;
import org.apache.geode.redis.internal.commands.executor.key.ScanExecutor;
import org.apache.geode.redis.internal.commands.executor.key.TTLExecutor;
import org.apache.geode.redis.internal.commands.executor.key.TypeExecutor;
+import org.apache.geode.redis.internal.commands.executor.list.BLPopExecutor;
import org.apache.geode.redis.internal.commands.executor.list.LIndexExecutor;
import org.apache.geode.redis.internal.commands.executor.list.LInsertExecutor;
import org.apache.geode.redis.internal.commands.executor.list.LLenExecutor;
@@ -388,6 +389,8 @@ public enum RedisCommandType {
/************** Lists *****************/
+ BLPOP(new BLPopExecutor(), Category.LIST, SUPPORTED,
+ new Parameter().min(3).lastKey(-2).flags(WRITE, NOSCRIPT)),
LINDEX(new LIndexExecutor(), Category.LIST, SUPPORTED, new
Parameter().exact(3).flags(READONLY)),
LINSERT(new LInsertExecutor(), Category.LIST, SUPPORTED,
new Parameter().exact(5).flags(WRITE, DENYOOM)),
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
index 22a8fae..2577814 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
@@ -42,6 +42,11 @@ import org.apache.geode.redis.internal.netty.CoderException;
public class RedisResponse {
+ /**
+ * Static value returned by blocking commands. Effectively a no-op.
+ */
+ public static final RedisResponse BLOCKED = new RedisResponse(x -> null);
+
private final Function<ByteBuf, ByteBuf> coderCallback;
private Runnable afterWriteCallback;
@@ -119,6 +124,13 @@ public class RedisResponse {
}
@Immutable
+ private static final RedisResponse NIL_ARRAY = new
RedisResponse(Coder::getNilArrayResponse);
+
+ public static RedisResponse nilArray() {
+ return NIL_ARRAY;
+ }
+
+ @Immutable
private static final RedisResponse EMPTY = new
RedisResponse(Coder::getEmptyResponse);
public static RedisResponse flattenedArray(Collection<Collection<?>>
nestedCollection) {
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java
new file mode 100755
index 0000000..2981aa0
--- /dev/null
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static
org.apache.geode.redis.internal.RedisConstants.ERROR_NEGATIVE_TIMEOUT;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.executor.CommandExecutor;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.data.RedisList;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BLPopExecutor implements CommandExecutor {
+
+ @Override
+ public RedisResponse executeCommand(Command command, ExecutionHandlerContext
context) {
+ List<byte[]> arguments = command.getCommandArguments();
+ double timeoutSeconds;
+ try {
+ timeoutSeconds = Coder.bytesToDouble(arguments.get(arguments.size() -
1));
+ } catch (NumberFormatException e) {
+ return RedisResponse.error(RedisConstants.ERROR_TIMEOUT_INVALID);
+ }
+
+ if (timeoutSeconds < 0) {
+ return RedisResponse.error(ERROR_NEGATIVE_TIMEOUT);
+ }
+
+ int keyCount = arguments.size() - 1;
+ List<RedisKey> keys = new ArrayList<>(keyCount);
+ for (int i = 0; i < keyCount; i++) {
+ keys.add(new RedisKey(arguments.get(i)));
+ }
+ // The order of keys is important and, since locking may alter the order
passed into
+ // lockedExecute, we create a copy here.
+ List<RedisKey> keysForLocking = new ArrayList<>(keys);
+
+ List<byte[]> popped = context.lockedExecute(keys.get(0), keysForLocking,
+ () -> RedisList.blpop(context, command, keys, timeoutSeconds));
+
+ return popped == null ? RedisResponse.BLOCKED :
RedisResponse.array(popped, true);
+ }
+}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
index 8e4b7e3..01763fd 100755
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
@@ -16,11 +16,9 @@ package
org.apache.geode.redis.internal.commands.executor.list;
import java.util.List;
-import org.apache.geode.cache.Region;
import org.apache.geode.redis.internal.commands.Command;
import org.apache.geode.redis.internal.commands.executor.CommandExecutor;
import org.apache.geode.redis.internal.commands.executor.RedisResponse;
-import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -30,13 +28,12 @@ public class LPushExecutor implements CommandExecutor {
public final RedisResponse executeCommand(final Command command,
final ExecutionHandlerContext context) {
List<byte[]> commandElements = command.getProcessedCommand();
- Region<RedisKey, RedisData> region = context.getRegion();
RedisKey key = command.getKey();
List<byte[]> elementsToAdd = commandElements.subList(2,
commandElements.size());
final long newLength = context.listLockedExecute(key, false,
- list -> list.lpush(elementsToAdd, region, key,
shouldPushOnlyIfKeyExists()));
+ list -> list.lpush(context, elementsToAdd, key,
shouldPushOnlyIfKeyExists()));
return RedisResponse.integer(newLength);
}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
index fc313df..7549aba 100755
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
@@ -36,7 +36,7 @@ public class RPushExecutor implements CommandExecutor {
List<byte[]> elementsToAdd = commandElements.subList(2,
commandElements.size());
final long newLength = context.listLockedExecute(key, false,
- list -> list.rpush(elementsToAdd, region, key,
shouldPushOnlyIfKeyExists()));
+ list -> list.rpush(context, elementsToAdd, key,
shouldPushOnlyIfKeyExists()));
return RedisResponse.integer(newLength);
}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
index c1f14a7..f14fc5a 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
@@ -20,6 +20,8 @@ import java.util.Collections;
import java.util.List;
import org.apache.geode.cache.Region;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
class NullRedisList extends RedisList {
@@ -44,22 +46,23 @@ class NullRedisList extends RedisList {
}
@Override
- public long lpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData>
region, RedisKey key,
- final boolean onlyIfExists) {
+ public long lpush(ExecutionHandlerContext context, List<byte[]>
elementsToAdd, RedisKey key,
+ boolean onlyIfExists) {
if (onlyIfExists) {
return 0;
}
-
RedisList newList = new RedisList();
for (byte[] element : elementsToAdd) {
newList.elementPushHead(element);
}
- region.create(key, newList);
+ context.getRegion().create(key, newList);
+ context.fireEvent(RedisCommandType.LPUSH, key);
+
return elementsToAdd.size();
}
@Override
- public long rpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData>
region, RedisKey key,
+ public long rpush(ExecutionHandlerContext context, List<byte[]>
elementsToAdd, RedisKey key,
final boolean onlyIfExists) {
if (onlyIfExists) {
return 0;
@@ -69,7 +72,9 @@ class NullRedisList extends RedisList {
for (byte[] element : elementsToAdd) {
newList.elementPushTail(element);
}
- region.create(key, newList);
+ context.getRegion().create(key, newList);
+ context.fireEvent(RedisCommandType.RPUSH, key);
+
return elementsToAdd.size();
}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
index 64d233c..37fbfa9 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
@@ -36,12 +36,17 @@ import
org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.redis.internal.RedisException;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
import org.apache.geode.redis.internal.data.collections.SizeableByteArrayList;
import org.apache.geode.redis.internal.data.delta.AddByteArrays;
import org.apache.geode.redis.internal.data.delta.AddByteArraysTail;
import org.apache.geode.redis.internal.data.delta.InsertByteArray;
import org.apache.geode.redis.internal.data.delta.RemoveElementsByIndex;
import org.apache.geode.redis.internal.data.delta.ReplaceByteArrayAtOffset;
+import org.apache.geode.redis.internal.eventing.BlockingCommandListener;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.services.RegionProvider;
public class RedisList extends AbstractRedisData {
protected static final int REDIS_LIST_OVERHEAD =
memoryOverhead(RedisList.class);
@@ -158,40 +163,44 @@ public class RedisList extends AbstractRedisData {
}
/**
- * @param elementsToAdd elements to add to this list; NOTE this list may be
modified by this call
- * @param region the region this instance is stored in
- * @param key the name of the list to add to
+ * @param context the context of the executing command
+ * @param elementsToAdd elements to add to this list
+ * @param key the name of the set to add to
* @param onlyIfExists if true then the elements should only be added if the
key already exists
* and holds a list, otherwise no operation is performed.
* @return the length of the list after the operation
*/
- public long lpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData>
region,
- RedisKey key, final boolean onlyIfExists) {
+ public long lpush(ExecutionHandlerContext context, List<byte[]>
elementsToAdd,
+ RedisKey key, boolean onlyIfExists) {
byte newVersion;
synchronized (this) {
newVersion = incrementAndGetVersion();
elementsPushHead(elementsToAdd);
}
- storeChanges(region, key, new AddByteArrays(elementsToAdd, newVersion));
+ storeChanges(context.getRegion(), key, new AddByteArrays(elementsToAdd,
newVersion));
+ context.fireEvent(RedisCommandType.LPUSH, key);
+
return elementList.size();
}
/**
+ * @param context the context of the executing command
* @param elementsToAdd elements to add to this list;
- * @param region the region this instance is stored in
* @param key the name of the list to add to
* @param onlyIfExists if true then the elements should only be added if the
key already exists
* and holds a list, otherwise no operation is performed.
* @return the length of the list after the operation
*/
- public long rpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData>
region,
- RedisKey key, final boolean onlyIfExists) {
+ public long rpush(ExecutionHandlerContext context, List<byte[]>
elementsToAdd, RedisKey key,
+ boolean onlyIfExists) {
byte newVersion;
synchronized (this) {
newVersion = incrementAndGetVersion();
elementsToAdd.forEach(this::elementPushTail);
}
- storeChanges(region, key, new AddByteArraysTail(newVersion,
elementsToAdd));
+ storeChanges(context.getRegion(), key, new AddByteArraysTail(newVersion,
elementsToAdd));
+ context.fireEvent(RedisCommandType.RPUSH, key);
+
return elementList.size();
}
@@ -214,6 +223,27 @@ public class RedisList extends AbstractRedisData {
return popped;
}
+ public static List<byte[]> blpop(ExecutionHandlerContext context, Command
command,
+ List<RedisKey> keys, double timeoutSeconds) {
+ RegionProvider regionProvider = context.getRegionProvider();
+ for (RedisKey key : keys) {
+ RedisList list = regionProvider.getTypedRedisData(REDIS_LIST, key,
false);
+ if (!list.isNull()) {
+ byte[] poppedValue = list.lpop(context.getRegion(), key);
+
+ // return the key and value
+ List<byte[]> result = new ArrayList<>(2);
+ result.add(key.toBytes());
+ result.add(poppedValue);
+ return result;
+ }
+ }
+
+ context.registerListener(new BlockingCommandListener(context, command,
keys, timeoutSeconds));
+
+ return null;
+ }
+
/**
* @return the number of elements in the list
*/
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
new file mode 100644
index 0000000..bce6756
--- /dev/null
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BlockingCommandListener implements EventListener {
+
+ private final ExecutionHandlerContext context;
+ private final Command command;
+ private final List<RedisKey> keys;
+ private final double timeoutSeconds;
+ private final long timeSubmitted;
+ private final AtomicBoolean active = new AtomicBoolean(true);
+
+ /**
+ * Constructor to create an instance of a BlockingCommandListener in
response to a blocking
+ * command. When receiving a relevant event, blocking commands simply
resubmit the command
+ * into the Netty pipeline.
+ *
+ * @param context the associated ExecutionHandlerContext
+ * @param command the blocking command associated with this listener
+ * @param keys the list of keys the command is interested in
+ * @param timeoutSeconds the timeout for the command to block in seconds
+ */
+ public BlockingCommandListener(ExecutionHandlerContext context, Command
command,
+ List<RedisKey> keys, double timeoutSeconds) {
+ this.context = context;
+ this.command = command;
+ this.timeoutSeconds = timeoutSeconds;
+ this.keys = Collections.unmodifiableList(keys);
+ timeSubmitted = System.nanoTime();
+ }
+
+ @Override
+ public List<RedisKey> keys() {
+ return keys;
+ }
+
+ @Override
+ public EventResponse process(RedisCommandType commandType, RedisKey key) {
+ if (!keys.contains(key)) {
+ return EventResponse.CONTINUE;
+ }
+
+ resubmitCommand();
+ return EventResponse.REMOVE_AND_STOP;
+ }
+
+ @Override
+ public void resubmitCommand() {
+ if (!active.compareAndSet(true, false)) {
+ return;
+ }
+
+ // Recalculate the timeout since we've already been waiting
+ double adjustedTimeoutSeconds = 0;
+ if (timeoutSeconds > 0.0D) {
+ long timeoutNanos = (long) (timeoutSeconds * 1e9);
+ long adjustedTimeoutNanos = timeoutNanos - (System.nanoTime() -
timeSubmitted);
+ adjustedTimeoutNanos = Math.max(1, adjustedTimeoutNanos);
+ adjustedTimeoutSeconds = ((double) adjustedTimeoutNanos) / 1e9;
+ }
+
+ // The commands we are currently supporting all have the timeout at the
end of the argument
+ // list. Some newer Redis 7 commands (BLMPOP and BZMPOP) have the timeout
as the first argument
+ // after the command. We'll need to adjust this once those commands are
supported.
+ List<byte[]> commandArguments = command.getCommandArguments();
+ commandArguments.set(commandArguments.size() - 1,
Coder.doubleToBytes(adjustedTimeoutSeconds));
+
+ context.resubmitCommand(command);
+ }
+
+ @Override
+ public void scheduleTimeout(ScheduledExecutorService executor,
EventDistributor distributor) {
+ if (timeoutSeconds == 0 || !active.get()) {
+ return;
+ }
+
+ long timeoutNanos = (long) (timeoutSeconds * 1e9);
+ executor.schedule(() -> timeout(distributor), timeoutNanos,
TimeUnit.NANOSECONDS);
+ }
+
+ @VisibleForTesting
+ void timeout(EventDistributor distributor) {
+ if (active.compareAndSet(true, false)) {
+ distributor.removeListener(this);
+ context.writeToChannel(RedisResponse.nilArray());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BlockingCommandListener [command:" +
command.getCommandType().name()
+ + " keys:" + keys + "]";
+ }
+
+}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
new file mode 100644
index 0000000..1a806bc
--- /dev/null
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.internal.lang.utils.JavaWorkarounds;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributor extends PartitionListenerAdapter {
+
+ private final Map<RedisKey, Queue<EventListener>> listeners = new
ConcurrentHashMap<>();
+
+ private final ScheduledThreadPoolExecutor timerExecutor =
+ new ScheduledThreadPoolExecutor(1,
+ new LoggingThreadFactory("GeodeForRedisEventTimer-", true));
+
+ public EventDistributor() {
+ timerExecutor.setRemoveOnCancelPolicy(true);
+ }
+
+ public synchronized void registerListener(EventListener listener) {
+ for (RedisKey key : listener.keys()) {
+ JavaWorkarounds.computeIfAbsent(listeners, key, k -> new
LinkedBlockingQueue<>())
+ .add(listener);
+ }
+
+ listener.scheduleTimeout(timerExecutor, this);
+ }
+
+ public void fireEvent(RedisCommandType command, RedisKey key) {
+ Queue<EventListener> listenerList = listeners.get(key);
+ if (listenerList == null) {
+ return;
+ }
+
+ for (EventListener listener : listenerList) {
+ if (listener.process(command, key) == EventResponse.REMOVE_AND_STOP) {
+ removeListener(listener);
+ break;
+ }
+ }
+ }
+
+ /**
+ * The total number of keys registered by all listeners (includes
duplicates).
+ */
+ @VisibleForTesting
+ public int getRegisteredKeys() {
+ return listeners.values().stream().mapToInt(Collection::size).sum();
+ }
+
+ @Override
+ public void afterBucketRemoved(int bucketId, Iterable<?> keys) {
+ Set<EventListener> resubmittingList = new HashSet<>();
+ for (Map.Entry<RedisKey, Queue<EventListener>> entry :
listeners.entrySet()) {
+ if (entry.getKey().getBucketId() == bucketId) {
+ resubmittingList.addAll(entry.getValue());
+ }
+ }
+
+ resubmittingList.forEach(x -> {
+ removeListener(x);
+ x.resubmitCommand();
+ });
+ }
+
+ public synchronized void removeListener(EventListener listener) {
+ for (RedisKey key : listener.keys()) {
+ Queue<EventListener> listenerList = listeners.get(key);
+ if (listenerList == null) {
+ continue;
+ }
+ listenerList.remove(listener);
+ if (listenerList.isEmpty()) {
+ listeners.remove(key);
+ }
+ }
+ }
+
+}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
new file mode 100644
index 0000000..c0e1f54
--- /dev/null
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Interface intended to be implemented in order to receive Redis events.
Specifically this would be
+ * keyspace events or blocking commands. EventListeners are registered with the
+ * {@link EventDistributor}.
+ */
+public interface EventListener {
+
+ /**
+ * Receive and process an event. This method should execute very quickly.
The return value
+ * determines additional process steps for the given event.
+ *
+ * @param commandType the command triggering the event
+ * @param key the key triggering the event
+ * @return response determining subsequent processing steps
+ */
+ EventResponse process(RedisCommandType commandType, RedisKey key);
+
+ /**
+ * Return the list of keys this listener is interested in.
+ */
+ List<RedisKey> keys();
+
+ /**
+ * Method to resubmit a command if appropriate. This is only relevant for
listeners that process
+ * events for blocking commands. Listeners that handle keyspace event
notification will not use
+ * this.
+ */
+ void resubmitCommand();
+
+ /**
+ * Schedule the removal of this listener using the passed in {@link
ScheduledExecutorService}
+ * and {@link EventDistributor}. The implementation is responsible for
cancelling the timeout if
+ * necessary.
+ *
+ * @param executor the ScheduledExecutorService to use for scheduling
+ * @param distributor the EventDistributor which would be used to remove
this listener
+ */
+ void scheduleTimeout(ScheduledExecutorService executor, EventDistributor
distributor);
+}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java
new file mode 100644
index 0000000..538e49d
--- /dev/null
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Response returned by {@link EventListener#process(RedisCommandType,
RedisKey)}
+ */
+public enum EventResponse {
+ /**
+ * Response indicating that processing should continue.
+ */
+ CONTINUE,
+ /**
+ * Response indicating that processing should stop and the listener should be
+ * removed.
+ */
+ REMOVE_AND_STOP
+}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
index 50e71d3..1094636 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
@@ -155,11 +155,11 @@ public class Client {
return result;
}
- public ChannelFuture writeToChannel(RedisResponse response) {
+ public void writeToChannel(RedisResponse response) {
if (!logger.isDebugEnabled() && !response.hasAfterWriteCallback()) {
- return channel.writeAndFlush(response.encode(byteBufAllocator));
+ channel.writeAndFlush(response.encode(byteBufAllocator));
} else {
- return channel.writeAndFlush(response.encode(byteBufAllocator))
+ channel.writeAndFlush(response.encode(byteBufAllocator))
.addListener((ChannelFutureListener) f -> {
response.afterWrite();
logResponse(response, channel.remoteAddress(), f.cause());
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 62efc01..ee24e66 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -27,6 +27,7 @@ import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.INF;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.INFINITY;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.INTEGER_ID;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.NIL;
+import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.NIL_ARRAY;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INF;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INFINITY;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INF_STRING;
@@ -238,6 +239,11 @@ public class Coder {
return buffer;
}
+ public static ByteBuf getNilArrayResponse(ByteBuf buffer) {
+ buffer.writeBytes(NIL_ARRAY);
+ return buffer;
+ }
+
public static ByteBuf getEmptyResponse(ByteBuf buffer) {
return buffer;
}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 1c37039..5bb01a7 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -31,9 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;
@@ -62,6 +60,8 @@ import org.apache.geode.redis.internal.data.RedisList;
import org.apache.geode.redis.internal.data.RedisSet;
import org.apache.geode.redis.internal.data.RedisSortedSet;
import org.apache.geode.redis.internal.data.RedisString;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
+import org.apache.geode.redis.internal.eventing.EventListener;
import org.apache.geode.redis.internal.pubsub.PubSub;
import org.apache.geode.redis.internal.services.RegionProvider;
import org.apache.geode.redis.internal.services.locking.RedisSecurityService;
@@ -103,9 +103,10 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
private final RedisStats redisStats;
private final DistributedMember member;
private final RedisSecurityService securityService;
+ private final EventDistributor eventDistributor;
private BigInteger scanCursor;
private final AtomicBoolean channelInactive = new AtomicBoolean();
- private final ChannelId channelId;
+ private final Channel channel;
private final int serverPort;
@@ -122,7 +123,9 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
String username,
int serverPort,
DistributedMember member,
- RedisSecurityService securityService) {
+ RedisSecurityService securityService,
+ EventDistributor eventDistributor) {
+ this.channel = channel;
this.regionProvider = regionProvider;
this.pubsub = pubsub;
this.allowUnsupportedSupplier = allowUnsupportedSupplier;
@@ -132,15 +135,17 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
this.serverPort = serverPort;
this.member = member;
this.securityService = securityService;
+ this.eventDistributor = eventDistributor;
scanCursor = new BigInteger("0");
- channelId = channel.id();
redisStats.addClient();
channel.closeFuture().addListener(future -> logout());
}
- public ChannelFuture writeToChannel(RedisResponse response) {
- return client.writeToChannel(response);
+ public void writeToChannel(RedisResponse response) {
+ if (response != RedisResponse.BLOCKED) {
+ client.writeToChannel(response);
+ }
}
/**
@@ -149,13 +154,12 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Command command = (Command) msg;
- command.setChannelHandlerContext(ctx);
if (!channelInactive.get()) {
try {
- executeCommand(command);
+ executeCommand(ctx, command);
redisStats.incCommandsProcessed();
} catch (Throwable ex) {
- exceptionCaught(command.getChannelHandlerContext(), ex);
+ exceptionCaught(ctx, ex);
}
}
}
@@ -232,7 +236,14 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
}
}
- private void executeCommand(Command command) throws Exception {
+ public void resubmitCommand(Command command) {
+ ChannelHandlerContext ctx =
+ channel.pipeline().context(ByteToCommandDecoder.class.getSimpleName());
+ ctx.fireChannelRead(command);
+ }
+
+ private void executeCommand(ChannelHandlerContext channelContext, Command
command)
+ throws Exception {
try {
if (logger.isDebugEnabled()) {
logger.debug("Executing Redis command: {} - {}", command,
@@ -277,7 +288,7 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
}
if (command.isOfType(RedisCommandType.QUIT)) {
- channelInactive(command.getChannelHandlerContext());
+ channelInactive(channelContext);
}
} catch (Exception e) {
if (!(e instanceof RedisDataMovedException)) {
@@ -306,7 +317,7 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
*/
private void logout() {
if (subject != null) {
- securityService.logout(channelId);
+ securityService.logout(channel.id());
subject = null;
}
}
@@ -393,7 +404,7 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
}
public Subject login(Properties properties) {
- subject = securityService.login(channelId, properties);
+ subject = securityService.login(channel.id(), properties);
return subject;
}
@@ -408,6 +419,13 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
}
}
+ public void registerListener(EventListener listener) {
+ eventDistributor.registerListener(listener);
+ }
+
+ public void fireEvent(RedisCommandType command, RedisKey key) {
+ eventDistributor.fireEvent(command, key);
+ }
public Region<RedisKey, RedisData> getRegion() {
return getRegionProvider().getLocalDataRegion();
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
index ff3c5f5..0d85e19 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
@@ -63,6 +63,7 @@ import
org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.ManagementException;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
import org.apache.geode.redis.internal.pubsub.PubSub;
import org.apache.geode.redis.internal.services.RegionProvider;
import org.apache.geode.redis.internal.services.locking.RedisSecurityService;
@@ -87,12 +88,14 @@ public class NettyRedisServer {
private final int serverPort;
private final DistributedMember member;
private final RedisSecurityService securityService;
+ private final EventDistributor eventDistributor;
private final int writeTimeoutSeconds;
public NettyRedisServer(Supplier<DistributionConfig> configSupplier,
RegionProvider regionProvider, PubSub pubsub, Supplier<Boolean>
allowUnsupportedSupplier,
int port, String requestedAddress, RedisStats redisStats,
- DistributedMember member, RedisSecurityService securityService) {
+ DistributedMember member, RedisSecurityService securityService,
+ EventDistributor eventDistributor) {
this.configSupplier = configSupplier;
this.regionProvider = regionProvider;
this.pubsub = pubsub;
@@ -100,6 +103,7 @@ public class NettyRedisServer {
this.redisStats = redisStats;
this.member = member;
this.securityService = securityService;
+ this.eventDistributor = eventDistributor;
writeTimeoutSeconds =
getIntegerSystemProperty(WRITE_TIMEOUT_SECONDS,
DEFAULT_REDIS_WRITE_TIMEOUT_SECONDS, 1);
@@ -170,7 +174,7 @@ public class NettyRedisServer {
pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(),
new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,
allowUnsupportedSupplier, redisStats, redisUsername,
- getPort(), member, securityService));
+ getPort(), member, securityService, eventDistributor));
}
};
}
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
index 62bd890..4f21013 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
@@ -62,6 +62,12 @@ public class StringBytesGlossary {
public static final byte[] NIL = stringToBytes("$-1\r\n");
/**
+ * byte array of a nil array response
+ */
+ @MakeImmutable
+ public static final byte[] NIL_ARRAY = stringToBytes("*-1\r\n");
+
+ /**
* byte array of an empty array
*/
@MakeImmutable
diff --git
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
index a130736..3fd6278 100644
---
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
+++
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
@@ -33,6 +33,7 @@ import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
@@ -82,7 +83,7 @@ public class RegionProvider {
private final String redisRegionName;
public RegionProvider(InternalCache cache, StripedCoordinator
stripedCoordinator,
- RedisStats redisStats) {
+ RedisStats redisStats, PartitionListener partitionListener) {
this.stripedCoordinator = stripedCoordinator;
this.redisStats = redisStats;
@@ -95,6 +96,7 @@ public class RegionProvider {
attributesFactory.setRedundantCopies(config.getRedisRedundantCopies());
attributesFactory.setPartitionResolver(new RedisPartitionResolver());
attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS);
+ attributesFactory.addPartitionListener(partitionListener);
redisDataRegionFactory.setPartitionAttributes(attributesFactory.create());
redisRegionName =
diff --git
a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
index 6e4895b..fefe3e1 100644
---
a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
+++
b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
@@ -42,7 +42,7 @@ public class ExecutionHandlerContextTest {
when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
RedisStats redisStats = mock(RedisStats.class);
return new ExecutionHandlerContext(channel, null, null, null, redisStats,
null, 0, null,
- securityService);
+ securityService, null);
}
@Test