This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 16a3d1e  GEODE-8516: Add Redis tests for multiple subscriptions for 
the same client (#5535)
16a3d1e is described below

commit 16a3d1e20d73e85f97ec870f75053dcada3b8102
Author: Sarah Abbey <41928668+sabbeypivo...@users.noreply.github.com>
AuthorDate: Wed Sep 23 12:27:40 2020 -0400

    GEODE-8516: Add Redis tests for multiple subscriptions for the same client 
(#5535)
    
    Fixed test flakiness by awaiting for successful subscribe and psubscribe 
notification in the Lettuce client.
---
 .../LettucePubSubNativeRedisAcceptanceTest.java    |  32 +++
 .../SubscriptionsNativeRedisAcceptanceTest.java    |  25 ++
 .../apache/geode/redis/mocks/MockSubscriber.java   |  42 ++-
 .../pubsub/LettucePubSubIntegrationTest.java       | 283 ++++++++++++++++++++-
 .../pubsub/SubscriptionsIntegrationTest.java       |  56 +++-
 .../geode/redis/internal/RedisCommandType.java     |  14 +
 .../internal/netty/ExecutionHandlerContext.java    |   7 +
 7 files changed, 454 insertions(+), 5 deletions(-)

diff --git 
a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubNativeRedisAcceptanceTest.java
 
b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubNativeRedisAcceptanceTest.java
new file mode 100755
index 0000000..966c8d5
--- /dev/null
+++ 
b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubNativeRedisAcceptanceTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.pubsub;
+
+
+import io.lettuce.core.RedisClient;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import org.apache.geode.NativeRedisTestRule;
+
+public class LettucePubSubNativeRedisAcceptanceTest extends 
LettucePubSubIntegrationTest {
+  @ClassRule
+  public static NativeRedisTestRule redis = new NativeRedisTestRule();
+
+  @Before
+  public void before() {
+    client = RedisClient.create("redis://localhost:" + redis.getPort());
+  }
+}
diff --git 
a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsNativeRedisAcceptanceTest.java
 
b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsNativeRedisAcceptanceTest.java
new file mode 100755
index 0000000..5eeef87
--- /dev/null
+++ 
b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsNativeRedisAcceptanceTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.pubsub;
+
+
+import org.junit.ClassRule;
+
+import org.apache.geode.NativeRedisTestRule;
+
+public class SubscriptionsNativeRedisAcceptanceTest extends 
SubscriptionsIntegrationTest {
+  @ClassRule
+  public static NativeRedisTestRule redis = new NativeRedisTestRule();
+}
diff --git 
a/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
 
b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index e55e4cb..63cfa74 100644
--- 
a/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++ 
b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -25,12 +25,16 @@ import java.util.concurrent.TimeUnit;
 import redis.clients.jedis.Client;
 import redis.clients.jedis.JedisPubSub;
 
+
 public class MockSubscriber extends JedisPubSub {
 
   private final CountDownLatch subscriptionLatch;
+  private final CountDownLatch psubscriptionLatch;
   private final CountDownLatch unsubscriptionLatch;
   private final List<String> receivedMessages = 
Collections.synchronizedList(new ArrayList<>());
   private final List<String> receivedPMessages = 
Collections.synchronizedList(new ArrayList<>());
+  private final List<String> receivedPings = Collections.synchronizedList(new 
ArrayList<>());
+  private final List<String> receivedEvents = Collections.synchronizedList(new 
ArrayList<>());
   public final List<UnsubscribeInfo> unsubscribeInfos =
       Collections.synchronizedList(new ArrayList<>());
   public final List<UnsubscribeInfo> punsubscribeInfos =
@@ -42,11 +46,13 @@ public class MockSubscriber extends JedisPubSub {
   }
 
   public MockSubscriber(CountDownLatch subscriptionLatch) {
-    this(subscriptionLatch, new CountDownLatch(1));
+    this(subscriptionLatch, new CountDownLatch(1), new CountDownLatch(1));
   }
 
-  public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch 
unsubscriptionLatch) {
+  public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch 
unsubscriptionLatch,
+      CountDownLatch psubscriptionLatch) {
     this.subscriptionLatch = subscriptionLatch;
+    this.psubscriptionLatch = psubscriptionLatch;
     this.unsubscriptionLatch = unsubscriptionLatch;
   }
 
@@ -75,16 +81,26 @@ public class MockSubscriber extends JedisPubSub {
     return new ArrayList<>(receivedPMessages);
   }
 
+  public List<String> getReceivedPings() {
+    return receivedPings;
+  }
+
+  public List<String> getReceivedEvents() {
+    return receivedEvents;
+  }
+
   @Override
   public void onMessage(String channel, String message) {
     switchThreadName(String.format("MESSAGE %s %s", channel, message));
     receivedMessages.add(message);
+    receivedEvents.add("message");
   }
 
   @Override
   public void onPMessage(String pattern, String channel, String message) {
     switchThreadName(String.format("PMESSAGE %s %s %s", pattern, channel, 
message));
     receivedPMessages.add(message);
+    receivedEvents.add("pmessage");
   }
 
   @Override
@@ -93,6 +109,18 @@ public class MockSubscriber extends JedisPubSub {
     subscriptionLatch.countDown();
   }
 
+  @Override
+  public void onPSubscribe(String pattern, int subscribedChannels) {
+    switchThreadName(String.format("PSUBSCRIBE %s", pattern));
+    psubscriptionLatch.countDown();
+  }
+
+  @Override
+  public void onPong(String pattern) {
+    switchThreadName(String.format("PONG %s", pattern));
+    receivedPings.add(pattern);
+  }
+
   private static final int AWAIT_TIMEOUT_MILLIS = 30000;
 
   public void awaitSubscribe(String channel) {
@@ -105,6 +133,16 @@ public class MockSubscriber extends JedisPubSub {
     }
   }
 
+  public void awaitPSubscribe(String pattern) {
+    try {
+      if (!psubscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
+        throw new RuntimeException("awaitSubscribe timed out for channel: " + 
pattern);
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   public void onUnsubscribe(String channel, int subscribedChannels) {
     switchThreadName(String.format("UNSUBSCRIBE %s %d", channel, 
subscribedChannels));
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
index 23c0073..2ecd177 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
@@ -18,24 +18,34 @@ package org.apache.geode.redis.internal.executor.pubsub;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 
 import io.lettuce.core.RedisClient;
 import io.lettuce.core.RedisFuture;
+import io.lettuce.core.pubsub.RedisPubSubAdapter;
+import io.lettuce.core.pubsub.RedisPubSubListener;
 import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.geode.redis.GeodeRedisServerRule;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class LettucePubSubIntegrationTest {
 
   private static final String CHANNEL = "best-channel";
-  private RedisClient client;
+  private static final String PATTERN = "best-*";
+  protected RedisClient client;
 
   @ClassRule
   public static GeodeRedisServerRule server = new GeodeRedisServerRule();
@@ -53,6 +63,277 @@ public class LettucePubSubIntegrationTest {
     client.shutdown();
   }
 
+  @Test
+  public void multiSubscribeSameClient() {
+    StatefulRedisPubSubConnection<String, String> subscriber = 
client.connectPubSub();
+    StatefulRedisPubSubConnection<String, String> publisher = 
client.connectPubSub();
+    List<Map> messages = Collections.synchronizedList(new ArrayList<>());
+    AtomicLong subscriptionCount = new AtomicLong(0);
+
+    RedisPubSubListener<String, String> listener = new 
RedisPubSubAdapter<String, String>() {
+      @Override
+      public void subscribed(String channel, long count) {
+        subscriptionCount.getAndSet(count);
+      }
+
+      @Override
+      public void message(String channel, String message) {
+        Map<String, String> channelMessageMap = new HashMap<>();
+        channelMessageMap.put(channel, message);
+        messages.add(channelMessageMap);
+      }
+    };
+
+    subscriber.addListener(listener);
+    subscriber.sync().subscribe(CHANNEL);
+    subscriber.sync().subscribe(CHANNEL);
+    subscriber.sync().subscribe("newChannel!");
+    GeodeAwaitility.await().untilAsserted(() -> 
assertThat(subscriptionCount.get()).isEqualTo(2));
+
+    long publishCount1 = publisher.sync().publish(CHANNEL, "message!");
+    long publishCount2 = publisher.sync().publish("newChannel!", "message from 
new channel");
+
+
+    Map<String, String> expectedMap1 = new HashMap<>();
+    expectedMap1.put(CHANNEL, "message!");
+    Map<String, String> expectedMap2 = new HashMap<>();
+    expectedMap2.put("newChannel!", "message from new channel");
+
+    assertThat(publishCount1).isEqualTo(1);
+    assertThat(publishCount2).isEqualTo(1);
+    GeodeAwaitility.await().untilAsserted(() -> 
assertThat(messages).hasSize(2));
+    assertThat(messages).containsExactly(expectedMap1, expectedMap2);
+
+    subscriber.sync().unsubscribe();
+  }
+
+  @Test
+  public void multiPsubscribeSameClient() {
+    StatefulRedisPubSubConnection<String, String> subscriber = 
client.connectPubSub();
+    StatefulRedisPubSubConnection<String, String> publisher = 
client.connectPubSub();
+    List<Map> messages = Collections.synchronizedList(new ArrayList<>());
+    AtomicLong psubscriptionCount = new AtomicLong(0);
+
+    RedisPubSubListener<String, String> listener = new 
RedisPubSubAdapter<String, String>() {
+      @Override
+      public void psubscribed(String pattern, long count) {
+        psubscriptionCount.getAndSet(count);
+      }
+
+      @Override
+      public void message(String pattern, String channel, String message) {
+        Map<String, String> patternMessageMap = new HashMap<>();
+        patternMessageMap.put(pattern, message);
+        messages.add(patternMessageMap);
+      }
+    };
+
+    subscriber.addListener(listener);
+    subscriber.sync().psubscribe(PATTERN);
+    subscriber.sync().psubscribe(PATTERN);
+    subscriber.sync().psubscribe("new-*");
+    GeodeAwaitility.await().untilAsserted(() -> 
assertThat(psubscriptionCount.get()).isEqualTo(2));
+
+    long publishCount1 = publisher.sync().publish(CHANNEL, "message!");
+    long publishCount2 = publisher.sync().publish("new-channel!", "message 
from new channel");
+
+
+    Map<String, String> expectedMap1 = new HashMap<>();
+    expectedMap1.put(PATTERN, "message!");
+    Map<String, String> expectedMap2 = new HashMap<>();
+    expectedMap2.put("new-*", "message from new channel");
+
+    assertThat(publishCount1).isEqualTo(1);
+    assertThat(publishCount2).isEqualTo(1);
+    GeodeAwaitility.await().untilAsserted(() -> 
assertThat(messages).hasSize(2));
+    assertThat(messages).containsExactly(expectedMap1, expectedMap2);
+
+    subscriber.sync().unsubscribe();
+  }
+
+  @Test
+  @Ignore("GEODE-8498")
+  public void subscribePsubscribeSameClient() throws InterruptedException {
+    StatefulRedisPubSubConnection<String, String> subscriber = 
client.connectPubSub();
+    StatefulRedisPubSubConnection<String, String> publisher = 
client.connectPubSub();
+    List<String> messages = Collections.synchronizedList(new ArrayList<>());
+    CountDownLatch subscriberLatch = new CountDownLatch(1);
+    CountDownLatch psubscriberLatch = new CountDownLatch(1);
+
+    RedisPubSubListener<String, String> listener = new 
RedisPubSubAdapter<String, String>() {
+      @Override
+      public void subscribed(String channel, long count) {
+        subscriberLatch.countDown();
+      }
+
+      @Override
+      public void psubscribed(String pattern, long count) {
+        psubscriberLatch.countDown();
+      }
+
+      @Override
+      public void message(String channel, String message) {
+        messages.add("message");
+      }
+
+      @Override
+      public void message(String pattern, String channel, String message) {
+        messages.add("pmessage");
+      }
+    };
+    subscriber.addListener(listener);
+    subscriber.sync().subscribe(CHANNEL);
+    subscriberLatch.await();
+    subscriber.sync().psubscribe("best-*");
+    psubscriberLatch.await();
+    long publishCount = publisher.sync().publish(CHANNEL, "message!");
+
+
+    assertThat(publishCount).isEqualTo(2);
+    GeodeAwaitility.await().untilAsserted(() -> 
assertThat(messages).hasSize(2));
+    assertThat(messages).containsExactly("message", "pmessage");
+
+    subscriber.sync().unsubscribe();
+  }
+
+  @Test
+  public void multiUnsubscribe() throws InterruptedException {
+    StatefulRedisPubSubConnection<String, String> subscriber = 
client.connectPubSub();
+    List<Map> counts = Collections.synchronizedList(new ArrayList<>());
+    CountDownLatch subscriberLatch = new CountDownLatch(2);
+
+    RedisPubSubListener<String, String> listener = new 
RedisPubSubAdapter<String, String>() {
+      @Override
+      public void subscribed(String channel, long count) {
+        subscriberLatch.countDown();
+      }
+
+      @Override
+      public void unsubscribed(String channel, long remainingSubscriptions) {
+        Map<String, Long> channelCount = new HashMap<>();
+        channelCount.put(channel, remainingSubscriptions);
+        counts.add(channelCount);
+      }
+    };
+    subscriber.addListener(listener);
+    subscriber.sync().subscribe(CHANNEL);
+    subscriber.sync().subscribe("new-channel!");
+    subscriberLatch.await();
+    subscriber.sync().unsubscribe(CHANNEL);
+    subscriber.sync().unsubscribe(CHANNEL);
+    subscriber.sync().unsubscribe("new-channel!");
+
+    Map<String, Long> expectedMap1 = new HashMap<>();
+    expectedMap1.put(CHANNEL, 1L);
+    Map<String, Long> expectedMap2 = new HashMap<>();
+    expectedMap2.put(CHANNEL, 1L);
+    Map<String, Long> expectedMap3 = new HashMap<>();
+    expectedMap3.put("new-channel!", 0L);
+
+    GeodeAwaitility.await().untilAsserted(() -> assertThat(counts).hasSize(3));
+    assertThat(counts).containsExactly(expectedMap1, expectedMap2, 
expectedMap3);
+  }
+
+  @Test
+  public void multiPunsubscribe() throws InterruptedException {
+    StatefulRedisPubSubConnection<String, String> subscriber = 
client.connectPubSub();
+    List<Map> counts = Collections.synchronizedList(new ArrayList<>());
+    CountDownLatch psubscriberLatch = new CountDownLatch(2);
+    RedisPubSubListener<String, String> listener = new 
RedisPubSubAdapter<String, String>() {
+      @Override
+      public void psubscribed(String pattern, long count) {
+        psubscriberLatch.countDown();
+      }
+
+      @Override
+      public void punsubscribed(String pattern, long remainingSubscriptions) {
+        Map<String, Long> patternCount = new HashMap<>();
+        patternCount.put(pattern, remainingSubscriptions);
+        counts.add(patternCount);
+      }
+    };
+
+    subscriber.addListener(listener);
+    subscriber.sync().psubscribe(PATTERN);
+    subscriber.sync().psubscribe("new-*");
+    psubscriberLatch.await();
+    subscriber.sync().punsubscribe(PATTERN);
+    subscriber.sync().punsubscribe(PATTERN);
+    subscriber.sync().punsubscribe("new-*");
+
+    Map<String, Long> expectedMap1 = new HashMap<>();
+    expectedMap1.put(PATTERN, 1L);
+    Map<String, Long> expectedMap2 = new HashMap<>();
+    expectedMap2.put(PATTERN, 1L);
+    Map<String, Long> expectedMap3 = new HashMap<>();
+    expectedMap3.put("new-*", 0L);
+
+    GeodeAwaitility.await().untilAsserted(() -> assertThat(counts).hasSize(3));
+    assertThat(counts).containsExactly(expectedMap1, expectedMap2, 
expectedMap3);
+  }
+
+  @Test
+  public void unsubscribePunsubscribe() throws InterruptedException {
+    StatefulRedisPubSubConnection<String, String> subscriber = 
client.connectPubSub();
+    List<String> counts = Collections.synchronizedList(new ArrayList<>());
+    CountDownLatch subscriberLatch = new CountDownLatch(1);
+    CountDownLatch psubscriberLatch = new CountDownLatch(1);
+    RedisPubSubListener<String, String> listener = new 
RedisPubSubAdapter<String, String>() {
+
+      @Override
+      public void subscribed(String channel, long count) {
+        subscriberLatch.countDown();
+      }
+
+      @Override
+      public void psubscribed(String pattern, long count) {
+        psubscriberLatch.countDown();
+      }
+
+      @Override
+      public void unsubscribed(String channel, long numUnsubscribed) {
+        counts.add("unsubscribe");
+      }
+
+      @Override
+      public void punsubscribed(String pattern, long numPunsubscribed) {
+        counts.add("punsubscribe");
+      }
+    };
+    subscriber.addListener(listener);
+    subscriber.sync().subscribe(CHANNEL);
+    subscriberLatch.await();
+    subscriber.sync().psubscribe("best-*");
+    psubscriberLatch.await();
+
+    subscriber.sync().unsubscribe(CHANNEL);
+    subscriber.sync().punsubscribe("best-*");
+
+    GeodeAwaitility.await().untilAsserted(() -> assertThat(counts).hasSize(2));
+    assertThat(counts).containsExactly("unsubscribe", "punsubscribe");
+  }
+
+  @Test
+  public void quitWhileSubscribe() throws InterruptedException {
+    StatefulRedisPubSubConnection<String, String> subscriber = 
client.connectPubSub();
+    StatefulRedisPubSubConnection<String, String> publisher = 
client.connectPubSub();
+    CountDownLatch subscriberLatch = new CountDownLatch(1);
+    RedisPubSubListener<String, String> listener = new 
RedisPubSubAdapter<String, String>() {
+      @Override
+      public void subscribed(String channel, long count) {
+        subscriberLatch.countDown();
+      }
+    };
+    subscriber.addListener(listener);
+    subscriber.sync().subscribe(CHANNEL);
+    subscriberLatch.await();
+
+    String quitResponse = subscriber.sync().quit();
+    assertThat(quitResponse).isEqualTo("OK");
+
+    long publishCount = publisher.sync().publish(CHANNEL, "hello there");
+    assertThat(publishCount).isEqualTo(0L);
+  }
 
   @Test
   public void 
concurrentPublishersToMultipleSubscribers_doNotLosePublishMessages()
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java
 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java
index 76f6e28..e272e60 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/SubscriptionsIntegrationTest.java
@@ -17,10 +17,13 @@ package org.apache.geode.redis.internal.executor.pubsub;
 
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
 
 import org.apache.geode.redis.GeodeRedisServerRule;
 import org.apache.geode.redis.mocks.MockSubscriber;
@@ -36,11 +39,60 @@ public class SubscriptionsIntegrationTest {
   public static ExecutorServiceRule executor = new ExecutorServiceRule();
 
   @Test
-  public void testForLeakedSubscriptions() {
+  @Ignore("GEODE-8515")
+  public void pingWhileSubscribed() {
+    Jedis client = new Jedis("localhost", server.getPort());
+    MockSubscriber mockSubscriber = new MockSubscriber();
 
+    executor.submit(() -> client.subscribe(mockSubscriber, "same"));
+    mockSubscriber.awaitSubscribe("same");
+    mockSubscriber.ping();
+    GeodeAwaitility.await()
+        .untilAsserted(() -> 
assertThat(mockSubscriber.getReceivedPings().size()).isEqualTo(1));
+    assertThat(mockSubscriber.getReceivedPings().get(0)).isEqualTo("");
+    mockSubscriber.unsubscribe();
+    client.close();
+  }
+
+  @Test
+  public void multiSubscribe() {
+    Jedis client = new Jedis("localhost", server.getPort());
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    executor.submit(() -> client.subscribe(mockSubscriber, "same"));
+    mockSubscriber.awaitSubscribe("same");
+    mockSubscriber.psubscribe("sam*");
+    mockSubscriber.awaitPSubscribe("sam*");
+
+    Jedis publisher = new Jedis("localhost", server.getPort());
+    long publishCount = publisher.publish("same", "message");
+
+    assertThat(publishCount).isEqualTo(2L);
+    GeodeAwaitility.await()
+        .untilAsserted(() -> 
assertThat(mockSubscriber.getReceivedMessages()).hasSize(1));
+    GeodeAwaitility.await()
+        .untilAsserted(() -> 
assertThat(mockSubscriber.getReceivedPMessages()).hasSize(1));
+    assertThat(mockSubscriber.getReceivedEvents()).containsExactly("message", 
"pmessage");
+    mockSubscriber.unsubscribe();
+    client.close();
+  }
+
+  @Test
+  public void unallowedCommandsWhileSubscribed() {
+    Jedis client = new Jedis("localhost", server.getPort());
+
+    client.sendCommand(Protocol.Command.SUBSCRIBE, "hello");
+
+    assertThatThrownBy(() -> client.set("not", 
"supported")).hasMessageContaining(
+        "ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this 
context");
+    client.sendCommand(Protocol.Command.UNSUBSCRIBE);
+    client.close();
+  }
+
+  @Test
+  public void leakedSubscriptions() {
     for (int i = 0; i < 100; i++) {
       Jedis client = new Jedis("localhost", server.getPort());
-      client.ping();
       MockSubscriber mockSubscriber = new MockSubscriber();
       executor.submit(() -> client.subscribe(mockSubscriber, "same"));
       mockSubscriber.awaitSubscribe("same");
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 3c2f5ff..c8653eb 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -404,6 +404,20 @@ public enum RedisCommandType {
     return supportLevel == UNIMPLEMENTED;
   }
 
+  public boolean isAllowedWhileSubscribed() {
+    switch (this) {
+      case SUBSCRIBE:
+      case PSUBSCRIBE:
+      case UNSUBSCRIBE:
+      case PUNSUBSCRIBE:
+      case PING:
+      case QUIT:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext executionHandlerContext) {
     parameterRequirements.checkParameters(command, executionHandlerContext);
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 30b987c..fd9013e 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -289,6 +289,13 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
         return;
       }
 
+      if (!getPubSub().findSubscribedChannels(getClient()).isEmpty()) {
+        if (!command.getCommandType().isAllowedWhileSubscribed()) {
+          writeToChannel(RedisResponse
+              .error("only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed 
in this context"));
+        }
+      }
+
       final long start = redisStats.startCommand(command.getCommandType());
       try {
         writeToChannel(command.execute(this));

Reply via email to