This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6b43b76  GEODE-10048: Add framework for Redis events and BLPOP command 
(#7408)
6b43b76 is described below

commit 6b43b76e3d5a9c5b0c84df8e1995d61528871068
Author: Jens Deppe <jde...@vmware.com>
AuthorDate: Tue Mar 22 19:47:00 2022 -0700

    GEODE-10048: Add framework for Redis events and BLPOP command (#7408)
    
    * GEODE-10048: Add framework for Redis events and BLPOP command
    
    This commit adds a simple eventing framework to be used by blocking
    commands as well as keyspace event notification (still to be
    implemented). The main components are:
    
    - EventListener: an interface to be immplemented by anything wishing to
      receive events. Currently only implemented for blocking commands in
      the form of BlockingCommandListener.
    - EventDistributor: the component to which listeners are registered and
      where events are received and distributed. A single EventDistributor
      exists in the system and is associated with each
      ExecutionHandlerContext.
    - Event: not implemented as a separate class but logically consists of
      the command (RedisCommandType) and key (RedisKey).
    
    When a blocking command receives a relevant event the command is
    resubmitted into the Netty pipeline. This also means that something
    could happen (another command) that causes the blocking command to
    re-block and not complete. This is also what happens with native Redis.
    For example:
    
    - BLPOP 0 A executes and blocks
    - LPUSH A some-value
    - Before LPUSH fires an event, LPOP A is received but needs to wait to
      acquire the lock on A
    - LPUSH fires an event which the BLPOP listener receives and resubmits
      BLPOP into the pipeline
    - Once LPUSH completes, LPOP is next and removes A
    - BLPOP A runs and ends up blocking again because there is nothing to
      pop from A
---
 .../list/BLPopNativeRedisAcceptanceTest.java       |  41 ++++
 .../commands/executor/list/BLPopDUnitTest.java     | 120 ++++++++++++
 .../list/AbstractBLPopIntegrationTest.java         | 201 +++++++++++++++++++
 .../executor/list/BLPopIntegrationTest.java        |  37 ++++
 .../server/AbstractHitsMissesIntegrationTest.java  |   6 +
 .../eventing/BlockingCommandListenerTest.java      | 182 ++++++++++++++++++
 .../internal/eventing/EventDistributorTest.java    | 213 +++++++++++++++++++++
 .../apache/geode/codeAnalysis/excludedClasses.txt  |  23 +--
 .../geode/redis/internal/GeodeRedisServer.java     |  12 +-
 .../geode/redis/internal/RedisConstants.java       |   2 +
 .../geode/redis/internal/commands/Command.java     |  21 +-
 .../redis/internal/commands/RedisCommandType.java  |   3 +
 .../internal/commands/executor/RedisResponse.java  |  12 ++
 .../commands/executor/list/BLPopExecutor.java      |  61 ++++++
 .../commands/executor/list/LPushExecutor.java      |   5 +-
 .../commands/executor/list/RPushExecutor.java      |   2 +-
 .../geode/redis/internal/data/NullRedisList.java   |  17 +-
 .../geode/redis/internal/data/RedisList.java       |  50 ++++-
 .../internal/eventing/BlockingCommandListener.java | 123 ++++++++++++
 .../redis/internal/eventing/EventDistributor.java  | 105 ++++++++++
 .../redis/internal/eventing/EventListener.java     |  62 ++++++
 .../redis/internal/eventing/EventResponse.java     |  34 ++++
 .../apache/geode/redis/internal/netty/Client.java  |   6 +-
 .../apache/geode/redis/internal/netty/Coder.java   |   6 +
 .../internal/netty/ExecutionHandlerContext.java    |  46 +++--
 .../redis/internal/netty/NettyRedisServer.java     |   8 +-
 .../redis/internal/netty/StringBytesGlossary.java  |   6 +
 .../redis/internal/services/RegionProvider.java    |   4 +-
 .../netty/ExecutionHandlerContextTest.java         |   2 +-
 29 files changed, 1344 insertions(+), 66 deletions(-)

diff --git 
a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java
 
b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java
new file mode 100755
index 0000000..2c54bdd
--- /dev/null
+++ 
b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopNativeRedisAcceptanceTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.NativeRedisClusterTestRule;
+
+public class BLPopNativeRedisAcceptanceTest extends 
AbstractBLPopIntegrationTest {
+
+  @ClassRule
+  public static NativeRedisClusterTestRule redis = new 
NativeRedisClusterTestRule();
+
+  @Override
+  public int getPort() {
+    return redis.getExposedPorts().get(0);
+  }
+
+  @Override
+  public void flushAll() {
+    redis.flushAll();
+  }
+
+  @Override
+  public void awaitEventDistributorSize(int size) throws Exception {
+    Thread.sleep(500);
+  }
+}
diff --git 
a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java
 
b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java
new file mode 100644
index 0000000..ff1b394
--- /dev/null
+++ 
b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopDUnitTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.redis.internal.GeodeRedisService;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class BLPopDUnitTest {
+
+  @ClassRule
+  public static RedisClusterStartupRule cluster = new 
RedisClusterStartupRule(4);
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  private static final String LIST_KEY = "key";
+  private static JedisCluster jedis;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+  private static int locatorPort;
+  private static int redisServerPort;
+
+  @BeforeClass
+  public static void classSetup() {
+    MemberVM locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startRedisVM(1, locatorPort);
+    server2 = cluster.startRedisVM(2, locatorPort);
+
+    redisServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    server3 = cluster.startRedisVM(3, Integer.toString(redisServerPort), 
locatorPort);
+
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), 
REDIS_CLIENT_TIMEOUT,
+        20);
+  }
+
+  @After
+  public void tearDown() {
+    cluster.flushAll();
+  }
+
+  @Test
+  public void testClientRepeatsBLpopAfterServerCrash() throws Exception {
+    cluster.moveBucketForKey(LIST_KEY, "server-3");
+    Future<List<String>> blpopFuture = executor.submit(() -> jedis.blpop(0, 
LIST_KEY));
+
+    try {
+      cluster.crashVM(3);
+      jedis.lpush(LIST_KEY, "value");
+      List<String> result = blpopFuture.get();
+      assertThat(result).containsExactly(LIST_KEY, "value");
+    } finally {
+      cluster.startRedisVM(3, Integer.toString(redisServerPort), locatorPort);
+      cluster.rebalanceAllRegions();
+    }
+  }
+
+  @Test
+  public void testBLPopFollowsBucketMovement() throws Exception {
+    Future<List<String>> blpopFuture = executor.submit(() -> jedis.blpop(0, 
LIST_KEY));
+
+    for (int i = 0; i < 11; i++) {
+      cluster.moveBucketForKey(LIST_KEY);
+      Thread.sleep(500);
+    }
+
+    jedis.lpush(LIST_KEY, "value");
+
+    assertThat(blpopFuture.get()).containsExactly(LIST_KEY, "value");
+
+    int registeredListeners = getRegisteredListeners(server1);
+    registeredListeners += getRegisteredListeners(server2);
+    registeredListeners += getRegisteredListeners(server3);
+
+    assertThat(registeredListeners).isEqualTo(0);
+  }
+
+  private int getRegisteredListeners(MemberVM vm) {
+    return vm.invoke(() -> {
+      GeodeRedisService service = 
ClusterStartupRule.getCache().getService(GeodeRedisService.class);
+      return 
service.getRedisServer().getEventDistributor().getRegisteredKeys();
+    });
+  }
+
+
+}
diff --git 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
new file mode 100755
index 0000000..fd9b12a
--- /dev/null
+++ 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractBLPopIntegrationTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public abstract class AbstractBLPopIntegrationTest implements 
RedisIntegrationTest {
+  private static final String KEY = "key";
+
+  protected JedisCluster jedis;
+
+  public abstract void awaitEventDistributorSize(int size) throws Exception;
+
+  @ClassRule
+  public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void testInvalidArguments_throwErrors() {
+    assertAtLeastNArgs(jedis, Protocol.Command.BLPOP, 2);
+  }
+
+  @Test
+  public void testInvalidTimeout_throwsError() {
+    assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP, 
"key1",
+        "0.A"))
+            .hasMessage(RedisConstants.ERROR_TIMEOUT_INVALID);
+  }
+
+  @Test
+  public void testKeysInDifferentSlots_throwsError() {
+    assertThatThrownBy(() -> jedis.sendCommand("key1", Protocol.Command.BLPOP, 
"key1",
+        "key2", "0"))
+            .hasMessage(RedisConstants.ERROR_WRONG_SLOT);
+  }
+
+  @Test
+  public void testNegativeTimeout_throwsError() {
+    assertThatThrownBy(() -> jedis.blpop(-1, "key1"))
+        .hasMessage(RedisConstants.ERROR_NEGATIVE_TIMEOUT);
+  }
+
+  @Test
+  public void testBLPopForNonListKey() {
+    jedis.set("not-a-list", "value");
+
+    assertThatThrownBy(() -> jedis.blpop(0, "not-a-list"))
+        .hasMessage(RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void testBLPopWhenValueExists() {
+    jedis.lpush(KEY, "value1", "value2");
+
+    List<String> result = jedis.blpop(0, KEY);
+
+    assertThat(result).containsExactly(KEY, "value2");
+    assertThat(jedis.lpop(KEY)).isEqualTo("value1");
+  }
+
+  @Test
+  public void testBLPopDoesNotError_whenTimeoutHasExponent() {
+    jedis.lpush(KEY, "value1", "value2");
+
+    Object result = jedis.sendCommand(KEY, Protocol.Command.BLPOP, KEY, 
"1E+3");
+
+    assertThat(result).isNotNull();
+  }
+
+  @Test
+  public void testBLPopWhenValueDoesNotExist() throws Exception {
+    Future<List<String>> future = executor.submit(() -> jedis.blpop(0, KEY));
+
+    awaitEventDistributorSize(1);
+    jedis.lpush(KEY, "value1", "value2");
+
+    assertThat(future.get()).containsExactly(KEY, "value2");
+    assertThat(jedis.lpop(KEY)).isEqualTo("value1");
+  }
+
+  @Test
+  public void testRPushFiresEventForBLPop() throws Exception {
+    Future<List<String>> future = executor.submit(() -> jedis.blpop(0, KEY));
+
+    awaitEventDistributorSize(1);
+    jedis.rpush(KEY, "value1", "value2");
+
+    assertThat(future.get()).containsExactly(KEY, "value1");
+    assertThat(jedis.lpop(KEY)).isEqualTo("value2");
+  }
+
+  @Test
+  public void testBLPopWhenTimeoutIsExceeded() {
+    int timeout = 10;
+    Future<List<String>> future = executor.submit(() -> jedis.blpop(timeout, 
KEY));
+    GeodeAwaitility.await().atMost(timeout * 2, TimeUnit.SECONDS)
+        .untilAsserted(() -> assertThat(future.get()).isNull());
+  }
+
+  @Test
+  public void testBLpopFirstKeyDoesNotExistButSecondOneDoes() {
+    jedis.lpush("{A}key2", "value2");
+
+    List<String> result = jedis.blpop(0, "{A}key1", "{A}key2");
+
+    assertThat(result).containsExactly("{A}key2", "value2");
+  }
+
+  @Test
+  public void testBLpopKeyOrderingIsCorrect() {
+    jedis.lpush("{A}key2", "value2");
+    jedis.lpush("{A}key3", "value3");
+
+    List<String> result = jedis.blpop(0, "{A}key1", "{A}key3", "{A}key2");
+
+    assertThat(result).containsExactly("{A}key3", "value3");
+  }
+
+  @Test
+  public void testConcurrentBLPop() throws Exception {
+    int totalElements = 10_000;
+    List<Object> accumulated = Collections.synchronizedList(new 
ArrayList<>(totalElements + 2));
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    Future<Void> future1 = executor.submit(() -> doBLPop(running, 
accumulated));
+    Future<Void> future2 = executor.submit(() -> doBLPop(running, 
accumulated));
+
+    List<String> result = new ArrayList<>();
+    for (int i = 0; i < totalElements; i++) {
+      jedis.lpush(KEY, "value" + i);
+      result.add("value" + i);
+    }
+
+    GeodeAwaitility.await()
+        .untilAsserted(() -> 
assertThat(accumulated.size()).isEqualTo(totalElements));
+
+    running.set(false);
+
+    jedis.lpush(KEY, "stop1");
+    jedis.lpush(KEY, "stop2");
+    result.add("stop1");
+    result.add("stop2");
+    future1.get();
+    future2.get();
+
+    assertThat(accumulated).containsExactlyInAnyOrderElementsOf(result);
+  }
+
+  private void doBLPop(AtomicBoolean running, List<Object> accumulated) {
+    while (running.get()) {
+      accumulated.add(jedis.blpop(0, KEY).get(1));
+    }
+  }
+
+}
diff --git 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java
 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java
new file mode 100755
index 0000000..f77abc4
--- /dev/null
+++ 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/BLPopIntegrationTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public class BLPopIntegrationTest extends AbstractBLPopIntegrationTest {
+
+  @ClassRule
+  public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+  @Override
+  public int getPort() {
+    return server.getPort();
+  }
+
+  public void awaitEventDistributorSize(int size) {
+    EventDistributor distributor = server.getServer().getEventDistributor();
+    GeodeAwaitility.await().until(() -> distributor.getRegisteredKeys() == 
size);
+  }
+}
diff --git 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
index 73546e1..50000cb 100644
--- 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
+++ 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
@@ -575,6 +575,12 @@ public abstract class AbstractHitsMissesIntegrationTest 
implements RedisIntegrat
 
   /************* List related commands *************/
   @Test
+  public void testBLPop() {
+    jedis.lpush(LIST_KEY, "element");
+    runCommandAndAssertNoStatUpdates(LIST_KEY, k -> jedis.blpop(0, k));
+  }
+
+  @Test
   public void testLindex() {
     runCommandAndAssertHitsAndMisses(LIST_KEY, k -> jedis.lindex(k, 1));
   }
diff --git 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java
 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java
new file mode 100644
index 0000000..a984fc6
--- /dev/null
+++ 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/BlockingCommandListenerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BlockingCommandListenerTest {
+
+  @Test
+  public void testTimeoutIsAdjusted() {
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+    Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+    BlockingCommandListener listener =
+        new BlockingCommandListener(context, command, Collections.emptyList(), 
1.0D);
+
+    listener.resubmitCommand();
+
+    ArgumentCaptor<Command> argumentCaptor = 
ArgumentCaptor.forClass(Command.class);
+    verify(context, times(1)).resubmitCommand(argumentCaptor.capture());
+
+    double timeout = 
Coder.bytesToDouble(argumentCaptor.getValue().getCommandArguments().get(0));
+    assertThat(timeout).isLessThan(1.0D);
+  }
+
+  @Test
+  public void testAdjustedTimeoutDoesNotBecomeNegative() {
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+    Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+    BlockingCommandListener listener =
+        new BlockingCommandListener(context, command, Collections.emptyList(), 
1e-9);
+
+    listener.resubmitCommand();
+
+    ArgumentCaptor<Command> argumentCaptor = 
ArgumentCaptor.forClass(Command.class);
+    verify(context, times(1)).resubmitCommand(argumentCaptor.capture());
+
+    double timeout = 
Coder.bytesToDouble(argumentCaptor.getValue().getCommandArguments().get(0));
+    assertThat(timeout).isEqualTo(1e-9);
+  }
+
+  @Test
+  public void testListenerIsRemovedAfterTimeout() {
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+    Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+    BlockingCommandListener listener =
+        new BlockingCommandListener(context, command, Collections.emptyList(), 
1e-9);
+    EventDistributor eventDistributor = new EventDistributor();
+
+    eventDistributor.registerListener(listener);
+
+    await().atMost(Duration.ofSeconds(1))
+        .untilAsserted(() -> 
assertThat(eventDistributor.getRegisteredKeys()).isEqualTo(0));
+  }
+
+  @Test
+  public void 
testListenersAffectedByBucketMovementAreInactiveAndDoNotProcessTimeout() {
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    RedisKey key = new RedisKey("KEY".getBytes());
+    List<byte[]> commandArgs = Arrays.asList(key.toBytes(), "0".getBytes());
+    Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+    BlockingCommandListener listener =
+        new BlockingCommandListener(context, command, 
Collections.singletonList(key), 1);
+    EventDistributor eventDistributor = new EventDistributor();
+
+    eventDistributor.registerListener(listener);
+
+    eventDistributor.afterBucketRemoved(key.getBucketId(), null);
+
+    verify(context, atMost(1)).resubmitCommand(any());
+    await().atMost(Duration.ofSeconds(5))
+        .during(Duration.ofSeconds(2))
+        .untilAsserted(() -> verify(context, atMost(0)).writeToChannel(any()));
+    assertThat(eventDistributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void testResubmitAndTimeoutDoNotBothExecute() {
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+    Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+    BlockingCommandListener listener =
+        new BlockingCommandListener(context, command, 
Arrays.asList(command.getKey()), 0);
+    AtomicReference<BlockingCommandListener> listenerRef = new 
AtomicReference<>(listener);
+    EventDistributor eventDistributor = new EventDistributor();
+
+    eventDistributor.registerListener(listener);
+
+    new ConcurrentLoopingThreads(10_000,
+        i -> listenerRef.get().process(null, command.getKey()),
+        i -> listenerRef.get().timeout(eventDistributor))
+            .runWithAction(() -> {
+              // Verify that resubmitCommand and timeout have not both been 
called
+              try {
+                verify(context, atLeastOnce()).resubmitCommand(any());
+                verify(context, never()).writeToChannel(any());
+              } catch (AssertionError e) {
+                verify(context, never()).resubmitCommand(any());
+                verify(context, atLeastOnce()).writeToChannel(any());
+              }
+              reset(context);
+              eventDistributor.removeListener(listenerRef.get());
+              listenerRef.set(new BlockingCommandListener(context, command,
+                  Arrays.asList(command.getKey()), 0));
+              eventDistributor.registerListener(listenerRef.get());
+            });
+  }
+
+  @Test
+  public void concurrencyOfManyRegistrationsForTheSameKeys() {
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    EventDistributor distributor = new EventDistributor();
+    RedisKey keyA = new RedisKey("keyA".getBytes());
+    RedisKey keyB = new RedisKey("keyB".getBytes());
+    RedisKey keyC = new RedisKey("keyC".getBytes());
+
+    // Should not produce any exceptions
+    new ConcurrentLoopingThreads(10_000,
+        i -> registerListener(distributor, context, keyA, keyB, keyC),
+        i -> registerListener(distributor, context, keyB, keyC, keyA),
+        i -> registerListener(distributor, context, keyC, keyA, keyB),
+        i -> distributor.afterBucketRemoved(keyA.getBucketId(), null),
+        i -> distributor.afterBucketRemoved(keyB.getBucketId(), null),
+        i -> distributor.afterBucketRemoved(keyC.getBucketId(), null),
+        i -> distributor.fireEvent(null, keyA),
+        i -> distributor.fireEvent(null, keyB),
+        i -> distributor.fireEvent(null, keyC))
+            .run();
+
+    await().atMost(Duration.ofSeconds(1))
+        .untilAsserted(() -> 
assertThat(distributor.getRegisteredKeys()).isEqualTo(0));
+  }
+
+  private void registerListener(EventDistributor distributor, 
ExecutionHandlerContext context,
+      RedisKey... keys) {
+    List<byte[]> commandArgs = Arrays.asList("KEY".getBytes(), "0".getBytes());
+    Command command = new Command(RedisCommandType.BLPOP, commandArgs);
+    BlockingCommandListener listener =
+        new BlockingCommandListener(context, command, Arrays.asList(keys), 
1e-9);
+    distributor.registerListener(listener);
+  }
+}
diff --git 
a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java
 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java
new file mode 100644
index 0000000..d8a4d1c
--- /dev/null
+++ 
b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/eventing/EventDistributorTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributorTest {
+
+  public static class TestEventListener implements EventListener {
+    private final List<RedisKey> keys;
+    private int fired = 0;
+
+    public TestEventListener(RedisKey... keys) {
+      this.keys = Arrays.stream(keys).collect(Collectors.toList());
+    }
+
+    public int getFired() {
+      return fired;
+    }
+
+    @Override
+    public EventResponse process(RedisCommandType commandType, RedisKey key) {
+      fired += 1;
+      return EventResponse.REMOVE_AND_STOP;
+    }
+
+    @Override
+    public List<RedisKey> keys() {
+      return keys;
+    }
+
+    @Override
+    public void resubmitCommand() {}
+
+    @Override
+    public void scheduleTimeout(ScheduledExecutorService executor, 
EventDistributor distributor) {}
+  }
+
+  @Test
+  public void firingEventRemovesListener() {
+    RedisKey keyA = new RedisKey("a".getBytes());
+    RedisKey keyB = new RedisKey("b".getBytes());
+    EventDistributor distributor = new EventDistributor();
+    TestEventListener listener = new TestEventListener(keyA, keyB);
+    distributor.registerListener(listener);
+
+    distributor.fireEvent(null, keyA);
+    assertThat(listener.getFired()).isEqualTo(1);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void firingEventRemovesFirstListener_whenMultipleExist() {
+    RedisKey keyA = new RedisKey("a".getBytes());
+    RedisKey keyB = new RedisKey("b".getBytes());
+    EventDistributor distributor = new EventDistributor();
+    TestEventListener listener1 = new TestEventListener(keyA, keyB);
+    TestEventListener listener2 = new TestEventListener(keyA, keyB);
+    distributor.registerListener(listener1);
+    distributor.registerListener(listener2);
+
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(4);
+
+    distributor.fireEvent(null, keyA);
+    assertThat(listener1.getFired()).isEqualTo(1);
+    assertThat(listener2.getFired()).isEqualTo(0);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(2);
+
+    distributor.fireEvent(null, keyA);
+    assertThat(listener1.getFired()).isEqualTo(1);
+    assertThat(listener2.getFired()).isEqualTo(1);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void listenerIsRemovedAfterBucketMoves() {
+    RedisKey keyA = new RedisKey("a".getBytes());
+    RedisKey keyB = new RedisKey("b".getBytes());
+    EventDistributor distributor = new EventDistributor();
+    TestEventListener listener1 = new TestEventListener(keyA);
+    distributor.registerListener(listener1);
+    distributor.registerListener(new TestEventListener(keyB));
+
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(2);
+
+    distributor.afterBucketRemoved(keyA.getBucketId(), null);
+
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(1);
+  }
+
+  @Test
+  public void concurrencyOfManyRegistrationsAndBucketMovementForTheSameKeys() {
+    EventDistributor distributor = new EventDistributor();
+    RedisKey keyA = new RedisKey("keyA".getBytes());
+    RedisKey keyB = new RedisKey("keyB".getBytes());
+    RedisKey keyC = new RedisKey("keyC".getBytes());
+
+    // Should not produce any exceptions
+    new ConcurrentLoopingThreads(10_000,
+        i -> distributor.registerListener(new TestEventListener(keyA, keyB, 
keyC)),
+        i -> distributor.registerListener(new TestEventListener(keyB, keyC, 
keyA)),
+        i -> distributor.registerListener(new TestEventListener(keyC, keyA, 
keyB)),
+        i -> distributor.afterBucketRemoved(keyA.getBucketId(), null),
+        i -> distributor.afterBucketRemoved(keyB.getBucketId(), null),
+        i -> distributor.afterBucketRemoved(keyC.getBucketId(), null),
+        i -> distributor.fireEvent(null, keyA),
+        i -> distributor.fireEvent(null, keyB),
+        i -> distributor.fireEvent(null, keyC))
+            .runInLockstep();
+
+    distributor.fireEvent(null, keyA);
+    distributor.fireEvent(null, keyB);
+    distributor.fireEvent(null, keyC);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  @Test
+  public void concurrencyOfManyRegistrationAndRemovalOfSameListener() {
+    EventDistributor distributor = new EventDistributor();
+    RedisKey keyA = new RedisKey("keyA".getBytes());
+    RedisKey keyB = new RedisKey("keyB".getBytes());
+    RedisKey keyC = new RedisKey("keyC".getBytes());
+    AtomicReference<EventListener> listenerRef1 =
+        new AtomicReference<>(new TestEventListener(keyA, keyB, keyC));
+    AtomicReference<EventListener> listenerRef2 =
+        new AtomicReference<>(new TestEventListener(keyB, keyC, keyA));
+    AtomicReference<EventListener> listenerRef3 =
+        new AtomicReference<>(new TestEventListener(keyC, keyA, keyB));
+
+    // Should not produce any exceptions
+    new ConcurrentLoopingThreads(10_000,
+        i -> distributor.registerListener(listenerRef1.get()),
+        i -> distributor.registerListener(listenerRef2.get()),
+        i -> distributor.registerListener(listenerRef3.get()),
+        i -> distributor.removeListener(listenerRef1.get()),
+        i -> distributor.removeListener(listenerRef2.get()),
+        i -> distributor.removeListener(listenerRef3.get()),
+        i -> distributor.fireEvent(null, keyA),
+        i -> distributor.fireEvent(null, keyB),
+        i -> distributor.fireEvent(null, keyC))
+            .runWithAction(() -> {
+              listenerRef1.set(new TestEventListener(keyA, keyB, keyC));
+              listenerRef2.set(new TestEventListener(keyB, keyC, keyA));
+              listenerRef3.set(new TestEventListener(keyC, keyA, keyB));
+            });
+
+    distributor.fireEvent(null, keyA);
+    distributor.fireEvent(null, keyB);
+    distributor.fireEvent(null, keyC);
+    assertThat(distributor.getRegisteredKeys()).isEqualTo(0);
+  }
+
+  /**
+   * When a key points to a Queue (of listeners) that becomes empty that key is
+   * removed. Without correct synchronization we could add a new listener to a 
queue
+   * just before the key (and thus the queue) is removed. This test ensures 
that is
+   * not happening.
+   */
+  @Test
+  public void ensureNotRegisteringListenerOnQueueJustRemoved() {
+    EventDistributor distributor = new EventDistributor();
+    RedisKey keyA = new RedisKey("keyA".getBytes());
+    AtomicReference<EventListener> listenerRef1 =
+        new AtomicReference<>(new TestEventListener(keyA));
+    AtomicReference<EventListener> listenerRef2 =
+        new AtomicReference<>(new TestEventListener(keyA));
+
+    distributor.registerListener(listenerRef1.get());
+    AtomicInteger iteration = new AtomicInteger(0);
+
+    new ConcurrentLoopingThreads(10_000,
+        iteration::set,
+        i -> distributor.registerListener(listenerRef2.get()),
+        i -> distributor.removeListener(listenerRef1.get()))
+            .runWithAction(() -> {
+              assertThat(distributor.getRegisteredKeys())
+                  .as("Iteration = " + iteration.get()).isEqualTo(1);
+              distributor.removeListener(listenerRef2.get());
+
+              listenerRef1.set(new TestEventListener(keyA));
+              listenerRef2.set(new TestEventListener(keyA));
+
+              distributor.registerListener(listenerRef1.get());
+            });
+  }
+
+}
diff --git 
a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
 
b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index fee9c78..29d0c21 100644
--- 
a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ 
b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -1,20 +1,21 @@
-org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction
-org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap
-org/apache/geode/redis/internal/data/collections/SizeableByteArrayList
-org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor
-org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor
+org/apache/geode/redis/internal/RedisException
+org/apache/geode/redis/internal/commands/RedisCommandType
+org/apache/geode/redis/internal/commands/RedisCommandType$Category
+org/apache/geode/redis/internal/commands/RedisCommandType$Flag
+org/apache/geode/redis/internal/commands/executor/sortedset/ZAggregator
+org/apache/geode/redis/internal/commands/executor/string/BitOpExecutor$BitOp
 org/apache/geode/redis/internal/data/RedisCrossSlotException
 org/apache/geode/redis/internal/data/RedisDataMovedException
 org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
-org/apache/geode/redis/internal/commands/executor/sortedset/ZAggregator
-org/apache/geode/redis/internal/RedisException
 org/apache/geode/redis/internal/data/RedisHash$Hash
 org/apache/geode/redis/internal/data/RedisKeyExistsException
 org/apache/geode/redis/internal/data/RedisSet$MemberSet
 org/apache/geode/redis/internal/data/RedisSortedSet$MemberMap
-org/apache/geode/redis/internal/commands/executor/string/BitOpExecutor$BitOp
+org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap
+org/apache/geode/redis/internal/data/collections/SizeableByteArrayList
+org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor
+org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor
+org/apache/geode/redis/internal/eventing/EventResponse
 org/apache/geode/redis/internal/pubsub/Publisher$PublishFunction
+org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction
 org/apache/geode/redis/internal/services/locking/StripedExecutorService$State
-org/apache/geode/redis/internal/commands/RedisCommandType
-org/apache/geode/redis/internal/commands/RedisCommandType$Category
-org/apache/geode/redis/internal/commands/RedisCommandType$Flag
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 8db6c30..29a0ed5 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -27,6 +27,7 @@ import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.internal.statistics.StatisticsClockFactory;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.NettyRedisServer;
 import org.apache.geode.redis.internal.pubsub.PubSub;
@@ -62,6 +63,7 @@ public class GeodeRedisServer {
   private final RegionProvider regionProvider;
   private final PubSub pubSub;
   private final RedisStats redisStats;
+  private final EventDistributor eventDistributor;
   private boolean shutdown;
 
   /**
@@ -81,7 +83,8 @@ public class GeodeRedisServer {
     StripedCoordinator stripedCoordinator = new LockingStripedCoordinator();
     RedisMemberInfoRetrievalFunction infoFunction = 
RedisMemberInfoRetrievalFunction.register();
 
-    regionProvider = new RegionProvider(cache, stripedCoordinator, redisStats);
+    eventDistributor = new EventDistributor();
+    regionProvider = new RegionProvider(cache, stripedCoordinator, redisStats, 
eventDistributor);
     pubSub = new PubSubImpl(new Subscriptions(redisStats), regionProvider, 
redisStats);
 
     activeExpirationManager = new ActiveExpirationManager(regionProvider);
@@ -92,7 +95,7 @@ public class GeodeRedisServer {
     nettyRedisServer = new NettyRedisServer(() -> 
cache.getInternalDistributedSystem().getConfig(),
         regionProvider, pubSub,
         this::allowUnsupportedCommands, port, bindAddress, redisStats,
-        member, securityService);
+        member, securityService, eventDistributor);
 
     infoFunction.initialize(member, bindAddress, nettyRedisServer.getPort());
   }
@@ -102,6 +105,11 @@ public class GeodeRedisServer {
     return ((PubSubImpl) pubSub).getSubscriptionCount();
   }
 
+  @VisibleForTesting
+  public EventDistributor getEventDistributor() {
+    return eventDistributor;
+  }
+
   private static RedisStats createStats(InternalCache cache) {
     InternalDistributedSystem system = cache.getInternalDistributedSystem();
     StatisticsClock statisticsClock =
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
index acb1093..1be3ccc 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
@@ -104,4 +104,6 @@ public class RedisConstants {
       "ERR wrong number of arguments for '%s' command";
   public static final String ERROR_BITOP_NOT_MUST_USE_SINGLE_KEY =
       "ERR BITOP NOT must be called with a single source key.";
+  public static final String ERROR_TIMEOUT_INVALID = "ERR timeout is not a 
float or out of range";
+  public static final String ERROR_NEGATIVE_TIMEOUT = "ERR timeout is 
negative";
 }
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
index 8758a00..a44b7f8 100755
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/Command.java
@@ -22,8 +22,6 @@ import java.nio.channels.SocketChannel;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import io.netty.channel.ChannelHandlerContext;
-
 import org.apache.geode.redis.internal.commands.executor.RedisResponse;
 import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -47,6 +45,16 @@ public class Command {
     commandType = null;
   }
 
+  public Command(RedisCommandType command, List<byte[]> commandElems) {
+    if (commandElems == null || commandElems.isEmpty()) {
+      throw new IllegalArgumentException(
+          "List of command elements cannot be empty -> List:" + commandElems);
+    }
+
+    this.commandType = command;
+    this.commandElems = commandElems;
+  }
+
   /**
    * Constructor for {@link Command}. Must initialize Command with a {@link 
SocketChannel} and a
    * {@link List} of command elements
@@ -222,13 +230,4 @@ public class Command {
     return result;
   }
 
-  private ChannelHandlerContext channelHandlerContext;
-
-  public void setChannelHandlerContext(ChannelHandlerContext ctx) {
-    channelHandlerContext = ctx;
-  }
-
-  public ChannelHandlerContext getChannelHandlerContext() {
-    return channelHandlerContext;
-  }
 }
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
index d5e7d58..e734c06 100755
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
@@ -78,6 +78,7 @@ import 
org.apache.geode.redis.internal.commands.executor.key.RestoreExecutor;
 import org.apache.geode.redis.internal.commands.executor.key.ScanExecutor;
 import org.apache.geode.redis.internal.commands.executor.key.TTLExecutor;
 import org.apache.geode.redis.internal.commands.executor.key.TypeExecutor;
+import org.apache.geode.redis.internal.commands.executor.list.BLPopExecutor;
 import org.apache.geode.redis.internal.commands.executor.list.LIndexExecutor;
 import org.apache.geode.redis.internal.commands.executor.list.LInsertExecutor;
 import org.apache.geode.redis.internal.commands.executor.list.LLenExecutor;
@@ -388,6 +389,8 @@ public enum RedisCommandType {
 
   /************** Lists *****************/
 
+  BLPOP(new BLPopExecutor(), Category.LIST, SUPPORTED,
+      new Parameter().min(3).lastKey(-2).flags(WRITE, NOSCRIPT)),
   LINDEX(new LIndexExecutor(), Category.LIST, SUPPORTED, new 
Parameter().exact(3).flags(READONLY)),
   LINSERT(new LInsertExecutor(), Category.LIST, SUPPORTED,
       new Parameter().exact(5).flags(WRITE, DENYOOM)),
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
index 22a8fae..2577814 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/RedisResponse.java
@@ -42,6 +42,11 @@ import org.apache.geode.redis.internal.netty.CoderException;
 
 public class RedisResponse {
 
+  /**
+   * Static value returned by blocking commands. Effectively a no-op.
+   */
+  public static final RedisResponse BLOCKED = new RedisResponse(x -> null);
+
   private final Function<ByteBuf, ByteBuf> coderCallback;
 
   private Runnable afterWriteCallback;
@@ -119,6 +124,13 @@ public class RedisResponse {
   }
 
   @Immutable
+  private static final RedisResponse NIL_ARRAY = new 
RedisResponse(Coder::getNilArrayResponse);
+
+  public static RedisResponse nilArray() {
+    return NIL_ARRAY;
+  }
+
+  @Immutable
   private static final RedisResponse EMPTY = new 
RedisResponse(Coder::getEmptyResponse);
 
   public static RedisResponse flattenedArray(Collection<Collection<?>> 
nestedCollection) {
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java
new file mode 100755
index 0000000..2981aa0
--- /dev/null
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/BLPopExecutor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.list;
+
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_NEGATIVE_TIMEOUT;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.executor.CommandExecutor;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.data.RedisList;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BLPopExecutor implements CommandExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> arguments = command.getCommandArguments();
+    double timeoutSeconds;
+    try {
+      timeoutSeconds = Coder.bytesToDouble(arguments.get(arguments.size() - 
1));
+    } catch (NumberFormatException e) {
+      return RedisResponse.error(RedisConstants.ERROR_TIMEOUT_INVALID);
+    }
+
+    if (timeoutSeconds < 0) {
+      return RedisResponse.error(ERROR_NEGATIVE_TIMEOUT);
+    }
+
+    int keyCount = arguments.size() - 1;
+    List<RedisKey> keys = new ArrayList<>(keyCount);
+    for (int i = 0; i < keyCount; i++) {
+      keys.add(new RedisKey(arguments.get(i)));
+    }
+    // The order of keys is important and, since locking may alter the order 
passed into
+    // lockedExecute, we create a copy here.
+    List<RedisKey> keysForLocking = new ArrayList<>(keys);
+
+    List<byte[]> popped = context.lockedExecute(keys.get(0), keysForLocking,
+        () -> RedisList.blpop(context, command, keys, timeoutSeconds));
+
+    return popped == null ? RedisResponse.BLOCKED : 
RedisResponse.array(popped, true);
+  }
+}
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
index 8e4b7e3..01763fd 100755
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/LPushExecutor.java
@@ -16,11 +16,9 @@ package 
org.apache.geode.redis.internal.commands.executor.list;
 
 import java.util.List;
 
-import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.commands.Command;
 import org.apache.geode.redis.internal.commands.executor.CommandExecutor;
 import org.apache.geode.redis.internal.commands.executor.RedisResponse;
-import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
@@ -30,13 +28,12 @@ public class LPushExecutor implements CommandExecutor {
   public final RedisResponse executeCommand(final Command command,
       final ExecutionHandlerContext context) {
     List<byte[]> commandElements = command.getProcessedCommand();
-    Region<RedisKey, RedisData> region = context.getRegion();
     RedisKey key = command.getKey();
 
     List<byte[]> elementsToAdd = commandElements.subList(2, 
commandElements.size());
 
     final long newLength = context.listLockedExecute(key, false,
-        list -> list.lpush(elementsToAdd, region, key, 
shouldPushOnlyIfKeyExists()));
+        list -> list.lpush(context, elementsToAdd, key, 
shouldPushOnlyIfKeyExists()));
 
     return RedisResponse.integer(newLength);
   }
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
index fc313df..7549aba 100755
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/list/RPushExecutor.java
@@ -36,7 +36,7 @@ public class RPushExecutor implements CommandExecutor {
     List<byte[]> elementsToAdd = commandElements.subList(2, 
commandElements.size());
 
     final long newLength = context.listLockedExecute(key, false,
-        list -> list.rpush(elementsToAdd, region, key, 
shouldPushOnlyIfKeyExists()));
+        list -> list.rpush(context, elementsToAdd, key, 
shouldPushOnlyIfKeyExists()));
 
     return RedisResponse.integer(newLength);
   }
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
index c1f14a7..f14fc5a 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisList.java
@@ -20,6 +20,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
 class NullRedisList extends RedisList {
 
@@ -44,22 +46,23 @@ class NullRedisList extends RedisList {
   }
 
   @Override
-  public long lpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> 
region, RedisKey key,
-      final boolean onlyIfExists) {
+  public long lpush(ExecutionHandlerContext context, List<byte[]> 
elementsToAdd, RedisKey key,
+      boolean onlyIfExists) {
     if (onlyIfExists) {
       return 0;
     }
-
     RedisList newList = new RedisList();
     for (byte[] element : elementsToAdd) {
       newList.elementPushHead(element);
     }
-    region.create(key, newList);
+    context.getRegion().create(key, newList);
+    context.fireEvent(RedisCommandType.LPUSH, key);
+
     return elementsToAdd.size();
   }
 
   @Override
-  public long rpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> 
region, RedisKey key,
+  public long rpush(ExecutionHandlerContext context, List<byte[]> 
elementsToAdd, RedisKey key,
       final boolean onlyIfExists) {
     if (onlyIfExists) {
       return 0;
@@ -69,7 +72,9 @@ class NullRedisList extends RedisList {
     for (byte[] element : elementsToAdd) {
       newList.elementPushTail(element);
     }
-    region.create(key, newList);
+    context.getRegion().create(key, newList);
+    context.fireEvent(RedisCommandType.RPUSH, key);
+
     return elementsToAdd.size();
   }
 
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
index 64d233c..37fbfa9 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
@@ -36,12 +36,17 @@ import 
org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.redis.internal.RedisException;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
 import org.apache.geode.redis.internal.data.collections.SizeableByteArrayList;
 import org.apache.geode.redis.internal.data.delta.AddByteArrays;
 import org.apache.geode.redis.internal.data.delta.AddByteArraysTail;
 import org.apache.geode.redis.internal.data.delta.InsertByteArray;
 import org.apache.geode.redis.internal.data.delta.RemoveElementsByIndex;
 import org.apache.geode.redis.internal.data.delta.ReplaceByteArrayAtOffset;
+import org.apache.geode.redis.internal.eventing.BlockingCommandListener;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.services.RegionProvider;
 
 public class RedisList extends AbstractRedisData {
   protected static final int REDIS_LIST_OVERHEAD = 
memoryOverhead(RedisList.class);
@@ -158,40 +163,44 @@ public class RedisList extends AbstractRedisData {
   }
 
   /**
-   * @param elementsToAdd elements to add to this list; NOTE this list may be 
modified by this call
-   * @param region the region this instance is stored in
-   * @param key the name of the list to add to
+   * @param context the context of the executing command
+   * @param elementsToAdd elements to add to this list
+   * @param key the name of the set to add to
    * @param onlyIfExists if true then the elements should only be added if the 
key already exists
    *        and holds a list, otherwise no operation is performed.
    * @return the length of the list after the operation
    */
-  public long lpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> 
region,
-      RedisKey key, final boolean onlyIfExists) {
+  public long lpush(ExecutionHandlerContext context, List<byte[]> 
elementsToAdd,
+      RedisKey key, boolean onlyIfExists) {
     byte newVersion;
     synchronized (this) {
       newVersion = incrementAndGetVersion();
       elementsPushHead(elementsToAdd);
     }
-    storeChanges(region, key, new AddByteArrays(elementsToAdd, newVersion));
+    storeChanges(context.getRegion(), key, new AddByteArrays(elementsToAdd, 
newVersion));
+    context.fireEvent(RedisCommandType.LPUSH, key);
+
     return elementList.size();
   }
 
   /**
+   * @param context the context of the executing command
    * @param elementsToAdd elements to add to this list;
-   * @param region the region this instance is stored in
    * @param key the name of the list to add to
    * @param onlyIfExists if true then the elements should only be added if the 
key already exists
    *        and holds a list, otherwise no operation is performed.
    * @return the length of the list after the operation
    */
-  public long rpush(List<byte[]> elementsToAdd, Region<RedisKey, RedisData> 
region,
-      RedisKey key, final boolean onlyIfExists) {
+  public long rpush(ExecutionHandlerContext context, List<byte[]> 
elementsToAdd, RedisKey key,
+      boolean onlyIfExists) {
     byte newVersion;
     synchronized (this) {
       newVersion = incrementAndGetVersion();
       elementsToAdd.forEach(this::elementPushTail);
     }
-    storeChanges(region, key, new AddByteArraysTail(newVersion, 
elementsToAdd));
+    storeChanges(context.getRegion(), key, new AddByteArraysTail(newVersion, 
elementsToAdd));
+    context.fireEvent(RedisCommandType.RPUSH, key);
+
     return elementList.size();
   }
 
@@ -214,6 +223,27 @@ public class RedisList extends AbstractRedisData {
     return popped;
   }
 
+  public static List<byte[]> blpop(ExecutionHandlerContext context, Command 
command,
+      List<RedisKey> keys, double timeoutSeconds) {
+    RegionProvider regionProvider = context.getRegionProvider();
+    for (RedisKey key : keys) {
+      RedisList list = regionProvider.getTypedRedisData(REDIS_LIST, key, 
false);
+      if (!list.isNull()) {
+        byte[] poppedValue = list.lpop(context.getRegion(), key);
+
+        // return the key and value
+        List<byte[]> result = new ArrayList<>(2);
+        result.add(key.toBytes());
+        result.add(poppedValue);
+        return result;
+      }
+    }
+
+    context.registerListener(new BlockingCommandListener(context, command, 
keys, timeoutSeconds));
+
+    return null;
+  }
+
   /**
    * @return the number of elements in the list
    */
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
new file mode 100644
index 0000000..bce6756
--- /dev/null
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/BlockingCommandListener.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.redis.internal.commands.Command;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.commands.executor.RedisResponse;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class BlockingCommandListener implements EventListener {
+
+  private final ExecutionHandlerContext context;
+  private final Command command;
+  private final List<RedisKey> keys;
+  private final double timeoutSeconds;
+  private final long timeSubmitted;
+  private final AtomicBoolean active = new AtomicBoolean(true);
+
+  /**
+   * Constructor to create an instance of a BlockingCommandListener in 
response to a blocking
+   * command. When receiving a relevant event, blocking commands simply 
resubmit the command
+   * into the Netty pipeline.
+   *
+   * @param context the associated ExecutionHandlerContext
+   * @param command the blocking command associated with this listener
+   * @param keys the list of keys the command is interested in
+   * @param timeoutSeconds the timeout for the command to block in seconds
+   */
+  public BlockingCommandListener(ExecutionHandlerContext context, Command 
command,
+      List<RedisKey> keys, double timeoutSeconds) {
+    this.context = context;
+    this.command = command;
+    this.timeoutSeconds = timeoutSeconds;
+    this.keys = Collections.unmodifiableList(keys);
+    timeSubmitted = System.nanoTime();
+  }
+
+  @Override
+  public List<RedisKey> keys() {
+    return keys;
+  }
+
+  @Override
+  public EventResponse process(RedisCommandType commandType, RedisKey key) {
+    if (!keys.contains(key)) {
+      return EventResponse.CONTINUE;
+    }
+
+    resubmitCommand();
+    return EventResponse.REMOVE_AND_STOP;
+  }
+
+  @Override
+  public void resubmitCommand() {
+    if (!active.compareAndSet(true, false)) {
+      return;
+    }
+
+    // Recalculate the timeout since we've already been waiting
+    double adjustedTimeoutSeconds = 0;
+    if (timeoutSeconds > 0.0D) {
+      long timeoutNanos = (long) (timeoutSeconds * 1e9);
+      long adjustedTimeoutNanos = timeoutNanos - (System.nanoTime() - 
timeSubmitted);
+      adjustedTimeoutNanos = Math.max(1, adjustedTimeoutNanos);
+      adjustedTimeoutSeconds = ((double) adjustedTimeoutNanos) / 1e9;
+    }
+
+    // The commands we are currently supporting all have the timeout at the 
end of the argument
+    // list. Some newer Redis 7 commands (BLMPOP and BZMPOP) have the timeout 
as the first argument
+    // after the command. We'll need to adjust this once those commands are 
supported.
+    List<byte[]> commandArguments = command.getCommandArguments();
+    commandArguments.set(commandArguments.size() - 1, 
Coder.doubleToBytes(adjustedTimeoutSeconds));
+
+    context.resubmitCommand(command);
+  }
+
+  @Override
+  public void scheduleTimeout(ScheduledExecutorService executor, 
EventDistributor distributor) {
+    if (timeoutSeconds == 0 || !active.get()) {
+      return;
+    }
+
+    long timeoutNanos = (long) (timeoutSeconds * 1e9);
+    executor.schedule(() -> timeout(distributor), timeoutNanos, 
TimeUnit.NANOSECONDS);
+  }
+
+  @VisibleForTesting
+  void timeout(EventDistributor distributor) {
+    if (active.compareAndSet(true, false)) {
+      distributor.removeListener(this);
+      context.writeToChannel(RedisResponse.nilArray());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "BlockingCommandListener [command:" + 
command.getCommandType().name()
+        + " keys:" + keys + "]";
+  }
+
+}
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
new file mode 100644
index 0000000..1a806bc
--- /dev/null
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventDistributor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.internal.lang.utils.JavaWorkarounds;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class EventDistributor extends PartitionListenerAdapter {
+
+  private final Map<RedisKey, Queue<EventListener>> listeners = new 
ConcurrentHashMap<>();
+
+  private final ScheduledThreadPoolExecutor timerExecutor =
+      new ScheduledThreadPoolExecutor(1,
+          new LoggingThreadFactory("GeodeForRedisEventTimer-", true));
+
+  public EventDistributor() {
+    timerExecutor.setRemoveOnCancelPolicy(true);
+  }
+
+  public synchronized void registerListener(EventListener listener) {
+    for (RedisKey key : listener.keys()) {
+      JavaWorkarounds.computeIfAbsent(listeners, key, k -> new 
LinkedBlockingQueue<>())
+          .add(listener);
+    }
+
+    listener.scheduleTimeout(timerExecutor, this);
+  }
+
+  public void fireEvent(RedisCommandType command, RedisKey key) {
+    Queue<EventListener> listenerList = listeners.get(key);
+    if (listenerList == null) {
+      return;
+    }
+
+    for (EventListener listener : listenerList) {
+      if (listener.process(command, key) == EventResponse.REMOVE_AND_STOP) {
+        removeListener(listener);
+        break;
+      }
+    }
+  }
+
+  /**
+   * The total number of keys registered by all listeners (includes 
duplicates).
+   */
+  @VisibleForTesting
+  public int getRegisteredKeys() {
+    return listeners.values().stream().mapToInt(Collection::size).sum();
+  }
+
+  @Override
+  public void afterBucketRemoved(int bucketId, Iterable<?> keys) {
+    Set<EventListener> resubmittingList = new HashSet<>();
+    for (Map.Entry<RedisKey, Queue<EventListener>> entry : 
listeners.entrySet()) {
+      if (entry.getKey().getBucketId() == bucketId) {
+        resubmittingList.addAll(entry.getValue());
+      }
+    }
+
+    resubmittingList.forEach(x -> {
+      removeListener(x);
+      x.resubmitCommand();
+    });
+  }
+
+  public synchronized void removeListener(EventListener listener) {
+    for (RedisKey key : listener.keys()) {
+      Queue<EventListener> listenerList = listeners.get(key);
+      if (listenerList == null) {
+        continue;
+      }
+      listenerList.remove(listener);
+      if (listenerList.isEmpty()) {
+        listeners.remove(key);
+      }
+    }
+  }
+
+}
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
new file mode 100644
index 0000000..c0e1f54
--- /dev/null
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Interface intended to be implemented in order to receive Redis events. 
Specifically this would be
+ * keyspace events or blocking commands. EventListeners are registered with the
+ * {@link EventDistributor}.
+ */
+public interface EventListener {
+
+  /**
+   * Receive and process an event. This method should execute very quickly. 
The return value
+   * determines additional process steps for the given event.
+   *
+   * @param commandType the command triggering the event
+   * @param key the key triggering the event
+   * @return response determining subsequent processing steps
+   */
+  EventResponse process(RedisCommandType commandType, RedisKey key);
+
+  /**
+   * Return the list of keys this listener is interested in.
+   */
+  List<RedisKey> keys();
+
+  /**
+   * Method to resubmit a command if appropriate. This is only relevant for 
listeners that process
+   * events for blocking commands. Listeners that handle keyspace event 
notification will not use
+   * this.
+   */
+  void resubmitCommand();
+
+  /**
+   * Schedule the removal of this listener using the passed in {@link 
ScheduledExecutorService}
+   * and {@link EventDistributor}. The implementation is responsible for 
cancelling the timeout if
+   * necessary.
+   *
+   * @param executor the ScheduledExecutorService to use for scheduling
+   * @param distributor the EventDistributor which would be used to remove 
this listener
+   */
+  void scheduleTimeout(ScheduledExecutorService executor, EventDistributor 
distributor);
+}
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java
new file mode 100644
index 0000000..538e49d
--- /dev/null
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/eventing/EventResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.eventing;
+
+import org.apache.geode.redis.internal.commands.RedisCommandType;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Response returned by {@link EventListener#process(RedisCommandType, 
RedisKey)}
+ */
+public enum EventResponse {
+  /**
+   * Response indicating that processing should continue.
+   */
+  CONTINUE,
+  /**
+   * Response indicating that processing should stop and the listener should be
+   * removed.
+   */
+  REMOVE_AND_STOP
+}
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
index 50e71d3..1094636 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
@@ -155,11 +155,11 @@ public class Client {
     return result;
   }
 
-  public ChannelFuture writeToChannel(RedisResponse response) {
+  public void writeToChannel(RedisResponse response) {
     if (!logger.isDebugEnabled() && !response.hasAfterWriteCallback()) {
-      return channel.writeAndFlush(response.encode(byteBufAllocator));
+      channel.writeAndFlush(response.encode(byteBufAllocator));
     } else {
-      return channel.writeAndFlush(response.encode(byteBufAllocator))
+      channel.writeAndFlush(response.encode(byteBufAllocator))
           .addListener((ChannelFutureListener) f -> {
             response.afterWrite();
             logResponse(response, channel.remoteAddress(), f.cause());
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 62efc01..ee24e66 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -27,6 +27,7 @@ import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.INF;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.INFINITY;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.INTEGER_ID;
 import static org.apache.geode.redis.internal.netty.StringBytesGlossary.NIL;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.NIL_ARRAY;
 import static org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INF;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INFINITY;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.N_INF_STRING;
@@ -238,6 +239,11 @@ public class Coder {
     return buffer;
   }
 
+  public static ByteBuf getNilArrayResponse(ByteBuf buffer) {
+    buffer.writeBytes(NIL_ARRAY);
+    return buffer;
+  }
+
   public static ByteBuf getEmptyResponse(ByteBuf buffer) {
     return buffer;
   }
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 1c37039..5bb01a7 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -31,9 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelId;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.logging.log4j.Logger;
 import org.apache.shiro.subject.Subject;
@@ -62,6 +60,8 @@ import org.apache.geode.redis.internal.data.RedisList;
 import org.apache.geode.redis.internal.data.RedisSet;
 import org.apache.geode.redis.internal.data.RedisSortedSet;
 import org.apache.geode.redis.internal.data.RedisString;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
+import org.apache.geode.redis.internal.eventing.EventListener;
 import org.apache.geode.redis.internal.pubsub.PubSub;
 import org.apache.geode.redis.internal.services.RegionProvider;
 import org.apache.geode.redis.internal.services.locking.RedisSecurityService;
@@ -103,9 +103,10 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
   private final RedisStats redisStats;
   private final DistributedMember member;
   private final RedisSecurityService securityService;
+  private final EventDistributor eventDistributor;
   private BigInteger scanCursor;
   private final AtomicBoolean channelInactive = new AtomicBoolean();
-  private final ChannelId channelId;
+  private final Channel channel;
 
   private final int serverPort;
 
@@ -122,7 +123,9 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
       String username,
       int serverPort,
       DistributedMember member,
-      RedisSecurityService securityService) {
+      RedisSecurityService securityService,
+      EventDistributor eventDistributor) {
+    this.channel = channel;
     this.regionProvider = regionProvider;
     this.pubsub = pubsub;
     this.allowUnsupportedSupplier = allowUnsupportedSupplier;
@@ -132,15 +135,17 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
     this.serverPort = serverPort;
     this.member = member;
     this.securityService = securityService;
+    this.eventDistributor = eventDistributor;
     scanCursor = new BigInteger("0");
-    channelId = channel.id();
     redisStats.addClient();
 
     channel.closeFuture().addListener(future -> logout());
   }
 
-  public ChannelFuture writeToChannel(RedisResponse response) {
-    return client.writeToChannel(response);
+  public void writeToChannel(RedisResponse response) {
+    if (response != RedisResponse.BLOCKED) {
+      client.writeToChannel(response);
+    }
   }
 
   /**
@@ -149,13 +154,12 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
     Command command = (Command) msg;
-    command.setChannelHandlerContext(ctx);
     if (!channelInactive.get()) {
       try {
-        executeCommand(command);
+        executeCommand(ctx, command);
         redisStats.incCommandsProcessed();
       } catch (Throwable ex) {
-        exceptionCaught(command.getChannelHandlerContext(), ex);
+        exceptionCaught(ctx, ex);
       }
     }
   }
@@ -232,7 +236,14 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
     }
   }
 
-  private void executeCommand(Command command) throws Exception {
+  public void resubmitCommand(Command command) {
+    ChannelHandlerContext ctx =
+        channel.pipeline().context(ByteToCommandDecoder.class.getSimpleName());
+    ctx.fireChannelRead(command);
+  }
+
+  private void executeCommand(ChannelHandlerContext channelContext, Command 
command)
+      throws Exception {
     try {
       if (logger.isDebugEnabled()) {
         logger.debug("Executing Redis command: {} - {}", command,
@@ -277,7 +288,7 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
       }
 
       if (command.isOfType(RedisCommandType.QUIT)) {
-        channelInactive(command.getChannelHandlerContext());
+        channelInactive(channelContext);
       }
     } catch (Exception e) {
       if (!(e instanceof RedisDataMovedException)) {
@@ -306,7 +317,7 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
    */
   private void logout() {
     if (subject != null) {
-      securityService.logout(channelId);
+      securityService.logout(channel.id());
       subject = null;
     }
   }
@@ -393,7 +404,7 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
   }
 
   public Subject login(Properties properties) {
-    subject = securityService.login(channelId, properties);
+    subject = securityService.login(channel.id(), properties);
     return subject;
   }
 
@@ -408,6 +419,13 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
     }
   }
 
+  public void registerListener(EventListener listener) {
+    eventDistributor.registerListener(listener);
+  }
+
+  public void fireEvent(RedisCommandType command, RedisKey key) {
+    eventDistributor.fireEvent(command, key);
+  }
 
   public Region<RedisKey, RedisData> getRegion() {
     return getRegionProvider().getLocalDataRegion();
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
index ff3c5f5..0d85e19 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
@@ -63,6 +63,7 @@ import 
org.apache.geode.internal.security.SecurableCommunicationChannel;
 import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.management.ManagementException;
+import org.apache.geode.redis.internal.eventing.EventDistributor;
 import org.apache.geode.redis.internal.pubsub.PubSub;
 import org.apache.geode.redis.internal.services.RegionProvider;
 import org.apache.geode.redis.internal.services.locking.RedisSecurityService;
@@ -87,12 +88,14 @@ public class NettyRedisServer {
   private final int serverPort;
   private final DistributedMember member;
   private final RedisSecurityService securityService;
+  private final EventDistributor eventDistributor;
   private final int writeTimeoutSeconds;
 
   public NettyRedisServer(Supplier<DistributionConfig> configSupplier,
       RegionProvider regionProvider, PubSub pubsub, Supplier<Boolean> 
allowUnsupportedSupplier,
       int port, String requestedAddress, RedisStats redisStats,
-      DistributedMember member, RedisSecurityService securityService) {
+      DistributedMember member, RedisSecurityService securityService,
+      EventDistributor eventDistributor) {
     this.configSupplier = configSupplier;
     this.regionProvider = regionProvider;
     this.pubsub = pubsub;
@@ -100,6 +103,7 @@ public class NettyRedisServer {
     this.redisStats = redisStats;
     this.member = member;
     this.securityService = securityService;
+    this.eventDistributor = eventDistributor;
 
     writeTimeoutSeconds =
         getIntegerSystemProperty(WRITE_TIMEOUT_SECONDS, 
DEFAULT_REDIS_WRITE_TIMEOUT_SECONDS, 1);
@@ -170,7 +174,7 @@ public class NettyRedisServer {
         pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(),
             new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,
                 allowUnsupportedSupplier, redisStats, redisUsername,
-                getPort(), member, securityService));
+                getPort(), member, securityService, eventDistributor));
       }
     };
   }
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
index 62bd890..4f21013 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
@@ -62,6 +62,12 @@ public class StringBytesGlossary {
   public static final byte[] NIL = stringToBytes("$-1\r\n");
 
   /**
+   * byte array of a nil array response
+   */
+  @MakeImmutable
+  public static final byte[] NIL_ARRAY = stringToBytes("*-1\r\n");
+
+  /**
    * byte array of an empty array
    */
   @MakeImmutable
diff --git 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
index a130736..3fd6278 100644
--- 
a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
+++ 
b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/services/RegionProvider.java
@@ -33,6 +33,7 @@ import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.partition.PartitionListener;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -82,7 +83,7 @@ public class RegionProvider {
   private final String redisRegionName;
 
   public RegionProvider(InternalCache cache, StripedCoordinator 
stripedCoordinator,
-      RedisStats redisStats) {
+      RedisStats redisStats, PartitionListener partitionListener) {
     this.stripedCoordinator = stripedCoordinator;
     this.redisStats = redisStats;
 
@@ -95,6 +96,7 @@ public class RegionProvider {
     attributesFactory.setRedundantCopies(config.getRedisRedundantCopies());
     attributesFactory.setPartitionResolver(new RedisPartitionResolver());
     attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS);
+    attributesFactory.addPartitionListener(partitionListener);
     redisDataRegionFactory.setPartitionAttributes(attributesFactory.create());
 
     redisRegionName =
diff --git 
a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
 
b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
index 6e4895b..fefe3e1 100644
--- 
a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
+++ 
b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContextTest.java
@@ -42,7 +42,7 @@ public class ExecutionHandlerContextTest {
     when(channel.closeFuture()).thenReturn(mock(ChannelFuture.class));
     RedisStats redisStats = mock(RedisStats.class);
     return new ExecutionHandlerContext(channel, null, null, null, redisStats, 
null, 0, null,
-        securityService);
+        securityService, null);
   }
 
   @Test

Reply via email to