This is an automated email from the ASF dual-hosted git repository. dschneider pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 16a3d1e GEODE-8516: Add Redis tests for multiple subscriptions for the same client (#5535) 16a3d1e is described below commit 16a3d1e20d73e85f97ec870f75053dcada3b8102 Author: Sarah Abbey <41928668+sabbeypivo...@users.noreply.github.com> AuthorDate: Wed Sep 23 12:27:40 2020 -0400 GEODE-8516: Add Redis tests for multiple subscriptions for the same client (#5535) Fixed test flakiness by awaiting for successful subscribe and psubscribe notification in the Lettuce client. --- .../LettucePubSubNativeRedisAcceptanceTest.java | 32 +++ .../SubscriptionsNativeRedisAcceptanceTest.java | 25 ++ .../apache/geode/redis/mocks/MockSubscriber.java | 42 ++- .../pubsub/LettucePubSubIntegrationTest.java | 283 ++++++++++++++++++++- .../pubsub/SubscriptionsIntegrationTest.java | 56 +++- .../geode/redis/internal/RedisCommandType.java | 14 + .../internal/netty/ExecutionHandlerContext.java | 7 + 7 files changed, 454 insertions(+), 5 deletions(-) diff --git a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubNativeRedisAcceptanceTest.java b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubNativeRedisAcceptanceTest.java new file mode 100755 index 0000000..966c8d5 --- /dev/null +++ b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubNativeRedisAcceptanceTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.pubsub; + + +import io.lettuce.core.RedisClient; +import org.junit.Before; +import org.junit.ClassRule; + +import org.apache.geode.NativeRedisTestRule; + +public class LettucePubSubNativeRedisAcceptanceTest extends LettucePubSubIntegrationTest { + @ClassRule + public static NativeRedisTestRule redis = new NativeRedisTestRule(); + + @Before + public void before() { + client = RedisClient.create("redis://localhost:" + redis.getPort()); + } +} diff --git a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsNativeRedisAcceptanceTest.java b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsNativeRedisAcceptanceTest.java new file mode 100755 index 0000000..5eeef87 --- /dev/null +++ b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsNativeRedisAcceptanceTest.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.redis.internal.executor.pubsub; + + +import org.junit.ClassRule; + +import org.apache.geode.NativeRedisTestRule; + +public class SubscriptionsNativeRedisAcceptanceTest extends SubscriptionsIntegrationTest { + @ClassRule + public static NativeRedisTestRule redis = new NativeRedisTestRule(); +} diff --git a/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java index e55e4cb..63cfa74 100644 --- a/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java +++ b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java @@ -25,12 +25,16 @@ import java.util.concurrent.TimeUnit; import redis.clients.jedis.Client; import redis.clients.jedis.JedisPubSub; + public class MockSubscriber extends JedisPubSub { private final CountDownLatch subscriptionLatch; + private final CountDownLatch psubscriptionLatch; private final CountDownLatch unsubscriptionLatch; private final List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>()); private final List<String> receivedPMessages = Collections.synchronizedList(new ArrayList<>()); + private final List<String> receivedPings = Collections.synchronizedList(new ArrayList<>()); + private final List<String> receivedEvents = Collections.synchronizedList(new ArrayList<>()); public final List<UnsubscribeInfo> unsubscribeInfos = Collections.synchronizedList(new ArrayList<>()); public final List<UnsubscribeInfo> punsubscribeInfos = @@ -42,11 +46,13 @@ public class MockSubscriber extends JedisPubSub { } public MockSubscriber(CountDownLatch subscriptionLatch) { - this(subscriptionLatch, new CountDownLatch(1)); + this(subscriptionLatch, new CountDownLatch(1), new CountDownLatch(1)); } - public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch unsubscriptionLatch) { + public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch unsubscriptionLatch, + CountDownLatch psubscriptionLatch) { this.subscriptionLatch = subscriptionLatch; + this.psubscriptionLatch = psubscriptionLatch; this.unsubscriptionLatch = unsubscriptionLatch; } @@ -75,16 +81,26 @@ public class MockSubscriber extends JedisPubSub { return new ArrayList<>(receivedPMessages); } + public List<String> getReceivedPings() { + return receivedPings; + } + + public List<String> getReceivedEvents() { + return receivedEvents; + } + @Override public void onMessage(String channel, String message) { switchThreadName(String.format("MESSAGE %s %s", channel, message)); receivedMessages.add(message); + receivedEvents.add("message"); } @Override public void onPMessage(String pattern, String channel, String message) { switchThreadName(String.format("PMESSAGE %s %s %s", pattern, channel, message)); receivedPMessages.add(message); + receivedEvents.add("pmessage"); } @Override @@ -93,6 +109,18 @@ public class MockSubscriber extends JedisPubSub { subscriptionLatch.countDown(); } + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + switchThreadName(String.format("PSUBSCRIBE %s", pattern)); + psubscriptionLatch.countDown(); + } + + @Override + public void onPong(String pattern) { + switchThreadName(String.format("PONG %s", pattern)); + receivedPings.add(pattern); + } + private static final int AWAIT_TIMEOUT_MILLIS = 30000; public void awaitSubscribe(String channel) { @@ -105,6 +133,16 @@ public class MockSubscriber extends JedisPubSub { } } + public void awaitPSubscribe(String pattern) { + try { + if (!psubscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new RuntimeException("awaitSubscribe timed out for channel: " + pattern); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override public void onUnsubscribe(String channel, int subscribedChannels) { switchThreadName(String.format("UNSUBSCRIBE %s %d", channel, subscribedChannels)); diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java index 23c0073..2ecd177 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java @@ -18,24 +18,34 @@ package org.apache.geode.redis.internal.executor.pubsub; import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.apache.geode.redis.GeodeRedisServerRule; +import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.junit.rules.ExecutorServiceRule; public class LettucePubSubIntegrationTest { private static final String CHANNEL = "best-channel"; - private RedisClient client; + private static final String PATTERN = "best-*"; + protected RedisClient client; @ClassRule public static GeodeRedisServerRule server = new GeodeRedisServerRule(); @@ -53,6 +63,277 @@ public class LettucePubSubIntegrationTest { client.shutdown(); } + @Test + public void multiSubscribeSameClient() { + StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub(); + StatefulRedisPubSubConnection<String, String> publisher = client.connectPubSub(); + List<Map> messages = Collections.synchronizedList(new ArrayList<>()); + AtomicLong subscriptionCount = new AtomicLong(0); + + RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() { + @Override + public void subscribed(String channel, long count) { + subscriptionCount.getAndSet(count); + } + + @Override + public void message(String channel, String message) { + Map<String, String> channelMessageMap = new HashMap<>(); + channelMessageMap.put(channel, message); + messages.add(channelMessageMap); + } + }; + + subscriber.addListener(listener); + subscriber.sync().subscribe(CHANNEL); + subscriber.sync().subscribe(CHANNEL); + subscriber.sync().subscribe("newChannel!"); + GeodeAwaitility.await().untilAsserted(() -> assertThat(subscriptionCount.get()).isEqualTo(2)); + + long publishCount1 = publisher.sync().publish(CHANNEL, "message!"); + long publishCount2 = publisher.sync().publish("newChannel!", "message from new channel"); + + + Map<String, String> expectedMap1 = new HashMap<>(); + expectedMap1.put(CHANNEL, "message!"); + Map<String, String> expectedMap2 = new HashMap<>(); + expectedMap2.put("newChannel!", "message from new channel"); + + assertThat(publishCount1).isEqualTo(1); + assertThat(publishCount2).isEqualTo(1); + GeodeAwaitility.await().untilAsserted(() -> assertThat(messages).hasSize(2)); + assertThat(messages).containsExactly(expectedMap1, expectedMap2); + + subscriber.sync().unsubscribe(); + } + + @Test + public void multiPsubscribeSameClient() { + StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub(); + StatefulRedisPubSubConnection<String, String> publisher = client.connectPubSub(); + List<Map> messages = Collections.synchronizedList(new ArrayList<>()); + AtomicLong psubscriptionCount = new AtomicLong(0); + + RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() { + @Override + public void psubscribed(String pattern, long count) { + psubscriptionCount.getAndSet(count); + } + + @Override + public void message(String pattern, String channel, String message) { + Map<String, String> patternMessageMap = new HashMap<>(); + patternMessageMap.put(pattern, message); + messages.add(patternMessageMap); + } + }; + + subscriber.addListener(listener); + subscriber.sync().psubscribe(PATTERN); + subscriber.sync().psubscribe(PATTERN); + subscriber.sync().psubscribe("new-*"); + GeodeAwaitility.await().untilAsserted(() -> assertThat(psubscriptionCount.get()).isEqualTo(2)); + + long publishCount1 = publisher.sync().publish(CHANNEL, "message!"); + long publishCount2 = publisher.sync().publish("new-channel!", "message from new channel"); + + + Map<String, String> expectedMap1 = new HashMap<>(); + expectedMap1.put(PATTERN, "message!"); + Map<String, String> expectedMap2 = new HashMap<>(); + expectedMap2.put("new-*", "message from new channel"); + + assertThat(publishCount1).isEqualTo(1); + assertThat(publishCount2).isEqualTo(1); + GeodeAwaitility.await().untilAsserted(() -> assertThat(messages).hasSize(2)); + assertThat(messages).containsExactly(expectedMap1, expectedMap2); + + subscriber.sync().unsubscribe(); + } + + @Test + @Ignore("GEODE-8498") + public void subscribePsubscribeSameClient() throws InterruptedException { + StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub(); + StatefulRedisPubSubConnection<String, String> publisher = client.connectPubSub(); + List<String> messages = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch subscriberLatch = new CountDownLatch(1); + CountDownLatch psubscriberLatch = new CountDownLatch(1); + + RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() { + @Override + public void subscribed(String channel, long count) { + subscriberLatch.countDown(); + } + + @Override + public void psubscribed(String pattern, long count) { + psubscriberLatch.countDown(); + } + + @Override + public void message(String channel, String message) { + messages.add("message"); + } + + @Override + public void message(String pattern, String channel, String message) { + messages.add("pmessage"); + } + }; + subscriber.addListener(listener); + subscriber.sync().subscribe(CHANNEL); + subscriberLatch.await(); + subscriber.sync().psubscribe("best-*"); + psubscriberLatch.await(); + long publishCount = publisher.sync().publish(CHANNEL, "message!"); + + + assertThat(publishCount).isEqualTo(2); + GeodeAwaitility.await().untilAsserted(() -> assertThat(messages).hasSize(2)); + assertThat(messages).containsExactly("message", "pmessage"); + + subscriber.sync().unsubscribe(); + } + + @Test + public void multiUnsubscribe() throws InterruptedException { + StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub(); + List<Map> counts = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch subscriberLatch = new CountDownLatch(2); + + RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() { + @Override + public void subscribed(String channel, long count) { + subscriberLatch.countDown(); + } + + @Override + public void unsubscribed(String channel, long remainingSubscriptions) { + Map<String, Long> channelCount = new HashMap<>(); + channelCount.put(channel, remainingSubscriptions); + counts.add(channelCount); + } + }; + subscriber.addListener(listener); + subscriber.sync().subscribe(CHANNEL); + subscriber.sync().subscribe("new-channel!"); + subscriberLatch.await(); + subscriber.sync().unsubscribe(CHANNEL); + subscriber.sync().unsubscribe(CHANNEL); + subscriber.sync().unsubscribe("new-channel!"); + + Map<String, Long> expectedMap1 = new HashMap<>(); + expectedMap1.put(CHANNEL, 1L); + Map<String, Long> expectedMap2 = new HashMap<>(); + expectedMap2.put(CHANNEL, 1L); + Map<String, Long> expectedMap3 = new HashMap<>(); + expectedMap3.put("new-channel!", 0L); + + GeodeAwaitility.await().untilAsserted(() -> assertThat(counts).hasSize(3)); + assertThat(counts).containsExactly(expectedMap1, expectedMap2, expectedMap3); + } + + @Test + public void multiPunsubscribe() throws InterruptedException { + StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub(); + List<Map> counts = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch psubscriberLatch = new CountDownLatch(2); + RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() { + @Override + public void psubscribed(String pattern, long count) { + psubscriberLatch.countDown(); + } + + @Override + public void punsubscribed(String pattern, long remainingSubscriptions) { + Map<String, Long> patternCount = new HashMap<>(); + patternCount.put(pattern, remainingSubscriptions); + counts.add(patternCount); + } + }; + + subscriber.addListener(listener); + subscriber.sync().psubscribe(PATTERN); + subscriber.sync().psubscribe("new-*"); + psubscriberLatch.await(); + subscriber.sync().punsubscribe(PATTERN); + subscriber.sync().punsubscribe(PATTERN); + subscriber.sync().punsubscribe("new-*"); + + Map<String, Long> expectedMap1 = new HashMap<>(); + expectedMap1.put(PATTERN, 1L); + Map<String, Long> expectedMap2 = new HashMap<>(); + expectedMap2.put(PATTERN, 1L); + Map<String, Long> expectedMap3 = new HashMap<>(); + expectedMap3.put("new-*", 0L); + + GeodeAwaitility.await().untilAsserted(() -> assertThat(counts).hasSize(3)); + assertThat(counts).containsExactly(expectedMap1, expectedMap2, expectedMap3); + } + + @Test + public void unsubscribePunsubscribe() throws InterruptedException { + StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub(); + List<String> counts = Collections.synchronizedList(new ArrayList<>()); + CountDownLatch subscriberLatch = new CountDownLatch(1); + CountDownLatch psubscriberLatch = new CountDownLatch(1); + RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() { + + @Override + public void subscribed(String channel, long count) { + subscriberLatch.countDown(); + } + + @Override + public void psubscribed(String pattern, long count) { + psubscriberLatch.countDown(); + } + + @Override + public void unsubscribed(String channel, long numUnsubscribed) { + counts.add("unsubscribe"); + } + + @Override + public void punsubscribed(String pattern, long numPunsubscribed) { + counts.add("punsubscribe"); + } + }; + subscriber.addListener(listener); + subscriber.sync().subscribe(CHANNEL); + subscriberLatch.await(); + subscriber.sync().psubscribe("best-*"); + psubscriberLatch.await(); + + subscriber.sync().unsubscribe(CHANNEL); + subscriber.sync().punsubscribe("best-*"); + + GeodeAwaitility.await().untilAsserted(() -> assertThat(counts).hasSize(2)); + assertThat(counts).containsExactly("unsubscribe", "punsubscribe"); + } + + @Test + public void quitWhileSubscribe() throws InterruptedException { + StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub(); + StatefulRedisPubSubConnection<String, String> publisher = client.connectPubSub(); + CountDownLatch subscriberLatch = new CountDownLatch(1); + RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() { + @Override + public void subscribed(String channel, long count) { + subscriberLatch.countDown(); + } + }; + subscriber.addListener(listener); + subscriber.sync().subscribe(CHANNEL); + subscriberLatch.await(); + + String quitResponse = subscriber.sync().quit(); + assertThat(quitResponse).isEqualTo("OK"); + + long publishCount = publisher.sync().publish(CHANNEL, "hello there"); + assertThat(publishCount).isEqualTo(0L); + } @Test public void concurrentPublishersToMultipleSubscribers_doNotLosePublishMessages() diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java index 76f6e28..e272e60 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java @@ -17,10 +17,13 @@ package org.apache.geode.redis.internal.executor.pubsub; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import redis.clients.jedis.Jedis; +import redis.clients.jedis.Protocol; import org.apache.geode.redis.GeodeRedisServerRule; import org.apache.geode.redis.mocks.MockSubscriber; @@ -36,11 +39,60 @@ public class SubscriptionsIntegrationTest { public static ExecutorServiceRule executor = new ExecutorServiceRule(); @Test - public void testForLeakedSubscriptions() { + @Ignore("GEODE-8515") + public void pingWhileSubscribed() { + Jedis client = new Jedis("localhost", server.getPort()); + MockSubscriber mockSubscriber = new MockSubscriber(); + executor.submit(() -> client.subscribe(mockSubscriber, "same")); + mockSubscriber.awaitSubscribe("same"); + mockSubscriber.ping(); + GeodeAwaitility.await() + .untilAsserted(() -> assertThat(mockSubscriber.getReceivedPings().size()).isEqualTo(1)); + assertThat(mockSubscriber.getReceivedPings().get(0)).isEqualTo(""); + mockSubscriber.unsubscribe(); + client.close(); + } + + @Test + public void multiSubscribe() { + Jedis client = new Jedis("localhost", server.getPort()); + MockSubscriber mockSubscriber = new MockSubscriber(); + + executor.submit(() -> client.subscribe(mockSubscriber, "same")); + mockSubscriber.awaitSubscribe("same"); + mockSubscriber.psubscribe("sam*"); + mockSubscriber.awaitPSubscribe("sam*"); + + Jedis publisher = new Jedis("localhost", server.getPort()); + long publishCount = publisher.publish("same", "message"); + + assertThat(publishCount).isEqualTo(2L); + GeodeAwaitility.await() + .untilAsserted(() -> assertThat(mockSubscriber.getReceivedMessages()).hasSize(1)); + GeodeAwaitility.await() + .untilAsserted(() -> assertThat(mockSubscriber.getReceivedPMessages()).hasSize(1)); + assertThat(mockSubscriber.getReceivedEvents()).containsExactly("message", "pmessage"); + mockSubscriber.unsubscribe(); + client.close(); + } + + @Test + public void unallowedCommandsWhileSubscribed() { + Jedis client = new Jedis("localhost", server.getPort()); + + client.sendCommand(Protocol.Command.SUBSCRIBE, "hello"); + + assertThatThrownBy(() -> client.set("not", "supported")).hasMessageContaining( + "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); + client.sendCommand(Protocol.Command.UNSUBSCRIBE); + client.close(); + } + + @Test + public void leakedSubscriptions() { for (int i = 0; i < 100; i++) { Jedis client = new Jedis("localhost", server.getPort()); - client.ping(); MockSubscriber mockSubscriber = new MockSubscriber(); executor.submit(() -> client.subscribe(mockSubscriber, "same")); mockSubscriber.awaitSubscribe("same"); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java index 3c2f5ff..c8653eb 100755 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java @@ -404,6 +404,20 @@ public enum RedisCommandType { return supportLevel == UNIMPLEMENTED; } + public boolean isAllowedWhileSubscribed() { + switch (this) { + case SUBSCRIBE: + case PSUBSCRIBE: + case UNSUBSCRIBE: + case PUNSUBSCRIBE: + case PING: + case QUIT: + return true; + default: + return false; + } + } + public RedisResponse executeCommand(Command command, ExecutionHandlerContext executionHandlerContext) { parameterRequirements.checkParameters(command, executionHandlerContext); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java index 30b987c..fd9013e 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java @@ -289,6 +289,13 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { return; } + if (!getPubSub().findSubscribedChannels(getClient()).isEmpty()) { + if (!command.getCommandType().isAllowedWhileSubscribed()) { + writeToChannel(RedisResponse + .error("only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context")); + } + } + final long start = redisStats.startCommand(command.getCommandType()); try { writeToChannel(command.execute(this));