This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9f8a2ff  GEODE-7800: Add Redis PSUBSCRIBE and PUNSUBSCRIBE commands 
(#4705)
9f8a2ff is described below

commit 9f8a2ff2b43c183b4824dd5ab764ecd2243cb2e1
Author: Sarah Abbey <[email protected]>
AuthorDate: Sat Feb 15 21:59:42 2020 -0500

    GEODE-7800: Add Redis PSUBSCRIBE and PUNSUBSCRIBE commands (#4705)
    
    * GEODE-7800: Add Redis PSUBSCRIBE and PUNSUBSCRIBE commands
    
    Similar to `SUBSCRIBE` and `UNSUBSCRIBE`, `PSUBSCRIBE` allows a client
    to subscribe to a pattern. For example: `PSUBSCRIBE sal*s`
    
    The subscription pattern is in the form of a glob supporting `*`, `?`
    and ranges. https://redis.io/commands/psubscribe
    
    Pattern subscriptions must be unsubscribed verbatim. i.e., the above
    subscription would not be unsubscribed using the pattern `s*`, but must
    be unsubscribed using the complete subscribed pattern, namely `sal*s`.
    
    When clients subscribe to overlapping patterns (or channels) they will
    receive a message for every matched subscription. Matches for a single
    client are not conflated.
    
        Co-authored-by: Sarah Abbey <[email protected]>
        Co-authored-by: John Hutchison <[email protected]>
        Co-authored-by: Jens Deppe <[email protected]>
    
    * Fixes class names in sanctioned-geode-redis-serializables.txt
    
    * Fixes flaky test
    
    * Adds license to PublishResult
    
    * Clean code using spA and make test reliable
    
    stop propagating exceptions that get thrown when disconnecting Jedis
    
    * Cleans up subscribers and publishers after each test or after the whole 
class as needed
    
    * re-add ignoreExceptions to waitfor test helper method
    
    Co-authored-by: Jens Deppe <[email protected]>
    Co-authored-by: Venkateswara Prasath Durairaj 
<[email protected]>
---
 .../java/org/apache/geode/redis/PubSubTest.java    | 216 +++++++++++++--------
 .../apache/geode/redis/mocks/MockSubscriber.java   |  14 +-
 .../org/apache/geode/redis/GeodeRedisServer.java   |   3 +-
 .../{Subscriber.java => AbstractSubscription.java} |  46 +++--
 .../{Client.java => ChannelSubscription.java}      |  38 ++--
 .../org/apache/geode/redis/internal/Client.java    |   2 +-
 .../{Client.java => PatternSubscription.java}      |  38 ++--
 .../org/apache/geode/redis/internal/PubSub.java    |  21 ++
 .../apache/geode/redis/internal/PubSubImpl.java    |  91 ++++-----
 .../internal/{Client.java => PublishResult.java}   |  39 ++--
 .../geode/redis/internal/RedisCommandType.java     |  56 ++++++
 .../internal/{Client.java => Subscription.java}    |  56 +++---
 .../apache/geode/redis/internal/Subscriptions.java |  84 ++++++++
 .../executor/pubsub/PsubscribeExecutor.java        |  70 +++++++
 .../internal/executor/pubsub/PublishExecutor.java  |   4 +-
 ...lishExecutor.java => PunsubscribeExecutor.java} |  45 +++--
 .../internal/org/apache/hadoop/fs/GlobPattern.java |  18 ++
 .../sanctioned-geode-redis-serializables.txt       |   2 +
 .../geode/redis/internal/PubSubImplJUnitTest.java  |  52 +++++
 .../geode/redis/internal/SubscriptionsTest.java    | 170 ++++++++++++++++
 20 files changed, 780 insertions(+), 285 deletions(-)

diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
index 66f67a6..7897e36 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
@@ -18,13 +18,14 @@ package org.apache.geode.redis;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.AfterClass;
@@ -41,10 +42,11 @@ import org.apache.geode.test.junit.categories.RedisTest;
 
 @Category({RedisTest.class})
 public class PubSubTest {
-  private static Jedis jedis;
   private static GeodeRedisServer server;
   private static GemFireCache cache;
   private static Random rand;
+  private static Jedis publisher;
+  private static Jedis subscriber;
   private static int port = 6379;
 
   @BeforeClass
@@ -57,26 +59,25 @@ public class PubSubTest {
     cache = cf.create();
     port = AvailablePortHelper.getRandomAvailableTCPPort();
     server = new GeodeRedisServer("localhost", port);
+    subscriber = new Jedis("localhost", port);
+    publisher = new Jedis("localhost", port);
 
     server.start();
-    jedis = new Jedis("localhost", port, 10000000);
   }
 
   @AfterClass
   public static void tearDown() {
-    jedis.close();
+    subscriber.close();
+    publisher.close();
     cache.close();
     server.shutdown();
   }
 
   @Test
-  public void testOneSubscriberOneChannel() throws InterruptedException {
-    Jedis subscriber = new Jedis("localhost", port);
-    Jedis publisher = new Jedis("localhost", port);
+  public void testOneSubscriberOneChannel() {
     List<String> expectedMessages = Arrays.asList("hello");
 
-    CountDownLatch latch = new CountDownLatch(1);
-    MockSubscriber mockSubscriber = new MockSubscriber(latch);
+    MockSubscriber mockSubscriber = new MockSubscriber();
 
     Runnable runnable = () -> {
       subscriber.subscribe(mockSubscriber, "salutations");
@@ -84,41 +85,29 @@ public class PubSubTest {
 
     Thread subscriberThread = new Thread(runnable);
     subscriberThread.start();
-
-    assertThat(latch.await(2, TimeUnit.SECONDS))
-        .as("channel subscriptions were not received")
-        .isTrue();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
 
     Long result = publisher.publish("salutations", "hello");
     assertThat(result).isEqualTo(1);
 
     mockSubscriber.unsubscribe("salutations");
-
-    subscriberThread.join(2000);
-
-    assertThat(subscriberThread.isAlive())
-        .as("subscriber thread should not be alive")
-        .isFalse();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
 
     
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
   }
 
   @Test
-  public void testOneSubscriberSubscribingToTwoChannels() throws Exception {
-    Jedis subscriber = new Jedis("localhost", port);
-    Jedis publisher = new Jedis("localhost", port);
+  public void testOneSubscriberSubscribingToTwoChannels() {
     List<String> expectedMessages = Arrays.asList("hello", "howdy");
-    CountDownLatch latch = new CountDownLatch(2);
-    MockSubscriber mockSubscriber = new MockSubscriber(latch);
+    MockSubscriber mockSubscriber = new MockSubscriber();
 
     Runnable runnable = () -> subscriber.subscribe(mockSubscriber, 
"salutations", "yuletide");
 
     Thread subscriberThread = new Thread(runnable);
     subscriberThread.start();
 
-    assertThat(latch.await(2, TimeUnit.SECONDS))
-        .as("channel subscriptions were not received")
-        .isTrue();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
 
     Long result = publisher.publish("salutations", "hello");
     assertThat(result).isEqualTo(1);
@@ -126,28 +115,21 @@ public class PubSubTest {
     result = publisher.publish("yuletide", "howdy");
     assertThat(result).isEqualTo(1);
     mockSubscriber.unsubscribe("salutations");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
     mockSubscriber.unsubscribe("yuletide");
-
-    subscriberThread.join(2000);
-
-    assertThat(subscriberThread.isAlive())
-        .as("subscriber thread should not be alive")
-        .isFalse();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
 
     
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
   }
 
   @Test
-  public void testTwoSubscribersOneChannel() throws Exception {
-    Jedis subscriber1 = new Jedis("localhost", port);
+  public void testTwoSubscribersOneChannel() {
     Jedis subscriber2 = new Jedis("localhost", port);
-    Jedis publisher = new Jedis("localhost", port);
-    CountDownLatch latch = new CountDownLatch(2);
-    MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
-    MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
-
-    Runnable runnable1 = () -> subscriber1.subscribe(mockSubscriber1, 
"salutations");
+    MockSubscriber mockSubscriber1 = new MockSubscriber();
+    MockSubscriber mockSubscriber2 = new MockSubscriber();
 
+    Runnable runnable1 = () -> subscriber.subscribe(mockSubscriber1, 
"salutations");
     Runnable runnable2 = () -> subscriber2.subscribe(mockSubscriber2, 
"salutations");
 
     Thread subscriber1Thread = new Thread(runnable1);
@@ -155,78 +137,104 @@ public class PubSubTest {
     Thread subscriber2Thread = new Thread(runnable2);
     subscriber2Thread.start();
 
-    assertThat(latch.await(2, TimeUnit.SECONDS))
-        .as("channel subscriptions were not received")
-        .isTrue();
+    waitFor(() -> mockSubscriber1.getSubscribedChannels() == 1);
+    waitFor(() -> mockSubscriber2.getSubscribedChannels() == 1);
 
     Long result = publisher.publish("salutations", "hello");
     assertThat(result).isEqualTo(2);
     mockSubscriber1.unsubscribe("salutations");
-
-    subscriber1Thread.join(2000);
-
-    assertThat(subscriber1Thread.isAlive())
-        .as("subscriber1 thread should not be alive")
-        .isFalse();
+    waitFor(() -> mockSubscriber1.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriber1Thread.isAlive());
 
     result = publisher.publish("salutations", "goodbye");
     assertThat(result).isEqualTo(1);
     mockSubscriber2.unsubscribe("salutations");
-
-    subscriber2Thread.join(2000);
-    assertThat(subscriber2Thread.isAlive())
-        .as("subscriber2 thread should not be alive")
-        .isFalse();
+    waitFor(() -> mockSubscriber2.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriber2Thread.isAlive());
 
     
assertThat(mockSubscriber1.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
     
assertThat(mockSubscriber2.getReceivedMessages()).isEqualTo(Arrays.asList("hello",
 "goodbye"));
+
+    subscriber2.close();
   }
 
   @Test
-  public void testPublishToNonexistentChannel() throws Exception {
-    Jedis publisher = new Jedis("localhost", port);
+  public void testPublishToNonexistentChannel() {
     Long result = publisher.publish("thisChannelDoesn'tExist", "hello");
     assertThat(result).isEqualTo(0);
   }
 
   @Test
-  public void testOneSubscriberOneChannelTwoTimes() throws Exception {
-    Jedis subscriber = new Jedis("localhost", port);
-    Jedis publisher = new Jedis("localhost", port);
-    CountDownLatch latch = new CountDownLatch(2);
-    MockSubscriber mockSubscriber = new MockSubscriber(latch);
+  public void testOneSubscriberOneChannelTwoTimes() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
 
     Runnable runnable1 = () -> subscriber.subscribe(mockSubscriber, 
"salutations", "salutations");
 
     Thread subscriberThread = new Thread(runnable1);
     subscriberThread.start();
 
-    assertThat(latch.await(2, TimeUnit.SECONDS))
-        .as("channel subscriptions were not received")
-        .isTrue();
-    assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
 
     Long result = publisher.publish("salutations", "hello");
     assertThat(result).isEqualTo(1);
 
     mockSubscriber.unsubscribe("salutations");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+
+    waitFor(() -> !subscriberThread.isAlive());
+    
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
+  }
 
-    subscriberThread.join(2000);
+  @Test
+  public void testDeadSubscriber() {
+    Jedis deadSubscriber = new Jedis("localhost", port);
 
-    assertThat(subscriberThread.isAlive())
-        .as("subscriber1 thread should not be alive")
-        .isFalse();
+    MockSubscriber mockSubscriber = new MockSubscriber();
 
-    
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
+    Runnable runnable = () -> {
+      deadSubscriber.subscribe(mockSubscriber, "salutations");
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+    deadSubscriber.close();
+
+    waitFor(() -> !deadSubscriber.isConnected());
+    Long result = publisher.publish("salutations", "hello");
+
+    assertThat(result).isEqualTo(0);
+    assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
   }
 
   @Test
-  public void testDeadSubscriber() throws InterruptedException {
-    Jedis subscriber = new Jedis("localhost", port);
-    Jedis publisher = new Jedis("localhost", port);
+  public void testPatternSubscribe() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    Runnable runnable = () -> {
+      subscriber.psubscribe(mockSubscriber, "sal*s");
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
 
-    CountDownLatch latch = new CountDownLatch(1);
-    MockSubscriber mockSubscriber = new MockSubscriber(latch);
+    Long result = publisher.publish("salutations", "hello");
+    assertThat(result).isEqualTo(1);
+
+    assertThat(mockSubscriber.getReceivedMessages()).hasSize(1);
+    assertThat(mockSubscriber.getReceivedMessages()).contains("hello");
+
+    mockSubscriber.punsubscribe("sal*s");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
+  }
+
+  @Test
+  public void testPatternAndRegularSubscribe() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
 
     Runnable runnable = () -> {
       subscriber.subscribe(mockSubscriber, "salutations");
@@ -234,21 +242,57 @@ public class PubSubTest {
 
     Thread subscriberThread = new Thread(runnable);
     subscriberThread.start();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
 
-    assertThat(latch.await(2, TimeUnit.SECONDS))
-        .as("channel subscriptions were not received")
-        .isTrue();
+    mockSubscriber.psubscribe("sal*s");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
 
-    subscriber.close();
-    subscriberThread.join(2000);
+    Long result = publisher.publish("salutations", "hello");
+    assertThat(result).isEqualTo(2);
+
+    mockSubscriber.punsubscribe("sal*s");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+    mockSubscriber.unsubscribe("salutations");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+
+    waitFor(() -> !subscriberThread.isAlive());
+
+    assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello", 
"hello");
+  }
+
+  @Test
+  public void testPatternWithoutAGlob() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
 
-    assertThat(subscriberThread.isAlive())
-        .as("subscriber thread should not be alive")
-        .isFalse();
+    Runnable runnable = () -> {
+      subscriber.subscribe(mockSubscriber, "salutations");
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+    mockSubscriber.psubscribe("salutations");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
 
     Long result = publisher.publish("salutations", "hello");
-    assertThat(result).isEqualTo(0);
+    assertThat(result).isEqualTo(2);
 
-    assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
+    mockSubscriber.punsubscribe("salutations");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+    mockSubscriber.unsubscribe("salutations");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+
+    waitFor(() -> !subscriberThread.isAlive());
+
+    assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello", 
"hello");
+  }
+
+  private void waitFor(Callable<Boolean> booleanCallable) {
+    await().atMost(1, TimeUnit.SECONDS)
+        .ignoreExceptions() // ignoring socket closed exceptions
+        .until(booleanCallable);
   }
 }
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index 74eda78..a1e9d41 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -18,29 +18,23 @@ package org.apache.geode.redis.mocks;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 
 import redis.clients.jedis.JedisPubSub;
 
 public class MockSubscriber extends JedisPubSub {
-  private CountDownLatch latch;
-  private List<String> receivedMessages = new ArrayList<String>();
-
-  public MockSubscriber(CountDownLatch latch) {
-    this.latch = latch;
-  }
+  private List<String> receivedMessages = new ArrayList<>();
 
   public List<String> getReceivedMessages() {
     return receivedMessages;
   }
 
   @Override
-  public void onSubscribe(String channel, int subscribedChannels) {
-    latch.countDown();
+  public void onMessage(String channel, String message) {
+    receivedMessages.add(message);
   }
 
   @Override
-  public void onMessage(String channel, String message) {
+  public void onPMessage(String pattern, String channel, String message) {
     receivedMessages.add(message);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java 
b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
index 75a7e6a..0c639e5 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
@@ -75,6 +75,7 @@ import org.apache.geode.redis.internal.PubSub;
 import org.apache.geode.redis.internal.PubSubImpl;
 import org.apache.geode.redis.internal.RedisDataType;
 import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.Subscriptions;
 
 /**
  * The GeodeRedisServer is a server that understands the Redis protocol. As 
commands are sent to the
@@ -450,7 +451,7 @@ public class GeodeRedisServer {
         throw assErr;
       }
       this.keyRegistrar = new KeyRegistrar(redisMetaData);
-      this.pubSub = new PubSubImpl();
+      this.pubSub = new PubSubImpl(new Subscriptions());
       this.regionCache = new RegionProvider(stringsRegion, hLLRegion, 
this.keyRegistrar,
           expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
       redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriber.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
similarity index 68%
rename from 
geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriber.java
rename to 
geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
index 07bcfc0..c8514d5 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriber.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
@@ -25,32 +25,39 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
-class Subscriber {
+public abstract class AbstractSubscription implements Subscription {
   private static final Logger logger = LogService.getLogger();
-  public final Client client;
-  public final String channel;
-  private ExecutionHandlerContext context;
+  private final Client client;
+  private final ExecutionHandlerContext context;
 
-  public Subscriber(Client client, String channel, ExecutionHandlerContext 
context) {
+  AbstractSubscription(Client client, ExecutionHandlerContext context) {
+    if (client == null) {
+      throw new IllegalArgumentException("client cannot be null");
+    }
+    if (context == null) {
+      throw new IllegalArgumentException("context cannot be null");
+    }
     this.client = client;
-    this.channel = channel;
     this.context = context;
   }
 
-  public boolean isEqualTo(String channel, Client client) {
-    if (channel == null || client == null) {
-      return false;
-    }
-    return channel.equals(this.channel) && client.equals(this.client);
-  }
-
-  public boolean publishMessage(String channel, String message) {
+  @Override
+  public PublishResult publishMessage(String channel, String message) {
     ByteBuf messageByteBuffer = constructResponse(channel, message);
     if (messageByteBuffer == null) {
-      return false;
+      return new PublishResult(client, false);
     }
 
-    return writeToChannelSynchronously(messageByteBuffer);
+    return new PublishResult(client, 
writeToChannelSynchronously(messageByteBuffer));
+  }
+
+  Client getClient() {
+    return client;
+  }
+
+  @Override
+  public boolean matchesClient(Client client) {
+    return this.client.equals(client);
   }
 
   private ByteBuf constructResponse(String channel, String message) {
@@ -66,9 +73,9 @@ class Subscriber {
   }
 
   /**
-   * This method turns the response into a synchronous call. We want to
-   * determine if the response, to the client, resulted in an error - for 
example if the client has
-   * disconnected and the write fails. In such cases we need to be able to 
notify the caller.
+   * This method turns the response into a synchronous call. We want to 
determine if the response,
+   * to the client, resulted in an error - for example if the client has 
disconnected and the write
+   * fails. In such cases we need to be able to notify the caller.
    */
   private boolean writeToChannelSynchronously(ByteBuf messageByteBuffer) {
     ChannelFuture channelFuture = context.writeToChannel(messageByteBuffer);
@@ -78,6 +85,7 @@ class Subscriber {
     } catch (InterruptedException | ExecutionException e) {
       return false;
     }
+
     return channelFuture.cause() == null;
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
similarity index 55%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to 
geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
index 494bce3..b64f132 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
@@ -11,39 +11,35 @@
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
+ *
  */
 
 package org.apache.geode.redis.internal;
 
-import java.util.Objects;
-
-import io.netty.channel.Channel;
+/**
+ * This class represents a single channel subscription as created by the 
SUBSCRIBE command
+ */
+class ChannelSubscription extends AbstractSubscription {
+  private String channel;
 
-class Client {
-  private Channel channel;
+  public ChannelSubscription(Client client, String channel, 
ExecutionHandlerContext context) {
+    super(client, context);
 
-  public Client(Channel remoteAddress) {
-    this.channel = remoteAddress;
+    if (channel == null) {
+      throw new IllegalArgumentException("channel cannot be null");
+    }
+    this.channel = channel;
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    Client client = (Client) o;
-    return Objects.equals(channel, client.channel);
+  public boolean isEqualTo(Object channelOrPattern, Client client) {
+    return this.channel != null && this.channel.equals(channelOrPattern)
+        && this.getClient().equals(client);
   }
 
   @Override
-  public int hashCode() {
-    return Objects.hash(channel);
+  public boolean matches(String channel) {
+    return this.channel.equals(channel);
   }
 
-  public boolean isDead() {
-    return !this.channel.isOpen();
-  }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
index 494bce3..44a31a7 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
@@ -19,7 +19,7 @@ import java.util.Objects;
 
 import io.netty.channel.Channel;
 
-class Client {
+public class Client {
   private Channel channel;
 
   public Client(Channel remoteAddress) {
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
similarity index 53%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to 
geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
index 494bce3..212eeaa 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
@@ -11,39 +11,37 @@
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
+ *
  */
 
 package org.apache.geode.redis.internal;
 
-import java.util.Objects;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
 
-import io.netty.channel.Channel;
+/**
+ * This class represents a pattern subscription as created by the PSUBSCRIBE 
command
+ */
+class PatternSubscription extends AbstractSubscription {
+  final GlobPattern pattern;
 
-class Client {
-  private Channel channel;
+  public PatternSubscription(Client client, GlobPattern pattern, 
ExecutionHandlerContext context) {
+    super(client, context);
 
-  public Client(Channel remoteAddress) {
-    this.channel = remoteAddress;
+    if (pattern == null) {
+      throw new IllegalArgumentException("pattern cannot be null");
+    }
+    this.pattern = pattern;
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    Client client = (Client) o;
-    return Objects.equals(channel, client.channel);
+  public boolean isEqualTo(Object channelOrPattern, Client client) {
+    return this.pattern != null && this.pattern.equals(channelOrPattern)
+        && this.getClient().equals(client);
   }
 
   @Override
-  public int hashCode() {
-    return Objects.hash(channel);
+  public boolean matches(String channel) {
+    return pattern.matches(channel);
   }
 
-  public boolean isDead() {
-    return !this.channel.isOpen();
-  }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java
index 78a695e..4462d83 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java
@@ -16,6 +16,8 @@
 
 package org.apache.geode.redis.internal;
 
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
+
 /**
  * Interface that represents the ability to Publish, Subscribe and Unsubscribe 
from channels.
  */
@@ -41,6 +43,16 @@ public interface PubSub {
   long subscribe(String channel, ExecutionHandlerContext context, Client 
client);
 
   /**
+   * Subscribe to a pattern
+   *
+   * @param pattern glob pattern to subscribe to
+   * @param context ExecutionHandlerContext which will handle the client 
response
+   * @param client a Client instance making the request
+   * @return the number of channels subscribed to
+   */
+  long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client 
client);
+
+  /**
    * Unsubscribe a client from a channel
    *
    * @param channel the channel to unsubscribe from
@@ -48,4 +60,13 @@ public interface PubSub {
    * @return the number of channels still subscribed to by the client
    */
   long unsubscribe(String channel, Client client);
+
+  /**
+   * Unsubscribe from a previously subscribed pattern
+   *
+   * @param pattern the channel to unsubscribe from
+   * @param client the Client which is to be unsubscribed
+   * @return the number of channels still subscribed to by the client
+   */
+  long punsubscribe(GlobPattern pattern, Client client);
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java
index 46cee7b..5582cdf 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java
@@ -16,15 +16,16 @@
 
 package org.apache.geode.redis.internal;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
 
 /**
  * Concrete class that manages publish and subscribe functionality. Since 
Redis subscriptions
@@ -34,9 +35,11 @@ import org.apache.geode.cache.execute.ResultCollector;
 public class PubSubImpl implements PubSub {
   public static final String REDIS_PUB_SUB_FUNCTION_ID = 
"redisPubSubFunctionID";
 
-  private Subscribers subscribers = new Subscribers();
+  private final Subscriptions subscriptions;
+
+  public PubSubImpl(Subscriptions subscriptions) {
+    this.subscriptions = subscriptions;
 
-  public PubSubImpl() {
     registerPublishFunction();
   }
 
@@ -54,12 +57,23 @@ public class PubSubImpl implements PubSub {
 
   @Override
   public long subscribe(String channel, ExecutionHandlerContext context, 
Client client) {
-    if (subscribers.exists(channel, client)) {
-      return subscribers.findSubscribers(client).size();
+    if (subscriptions.exists(channel, client)) {
+      return subscriptions.findSubscriptions(client).size();
+    }
+    Subscription subscription = new ChannelSubscription(client, channel, 
context);
+    subscriptions.add(subscription);
+    return subscriptions.findSubscriptions(client).size();
+  }
+
+  @Override
+  public long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, 
Client client) {
+    if (subscriptions.exists(pattern, client)) {
+      return subscriptions.findSubscriptions(client).size();
     }
-    Subscriber subscriber = new Subscriber(client, channel, context);
-    subscribers.add(subscriber);
-    return subscribers.findSubscribers(client).size();
+    Subscription subscription = new PatternSubscription(client, pattern, 
context);
+    subscriptions.add(subscription);
+
+    return subscriptions.findSubscriptions(client).size();
   }
 
   private void registerPublishFunction() {
@@ -80,57 +94,36 @@ public class PubSubImpl implements PubSub {
 
   @Override
   public long unsubscribe(String channel, Client client) {
-    this.subscribers.remove(channel, client);
-    return this.subscribers.findSubscribers(client).size();
+    this.subscriptions.remove(channel, client);
+    return this.subscriptions.findSubscriptions(client).size();
   }
 
-  private long publishMessageToSubscribers(String channel, String message) {
-    Map<Boolean, List<Subscriber>> results = this.subscribers
-        .findSubscribers(channel)
+  @Override
+  public long punsubscribe(GlobPattern pattern, Client client) {
+    this.subscriptions.remove(pattern, client);
+    return this.subscriptions.findSubscriptions(client).size();
+  }
+
+  @VisibleForTesting
+  long publishMessageToSubscribers(String channel, String message) {
+
+    Map<Boolean, List<PublishResult>> results = this.subscriptions
+        .findSubscriptions(channel)
         .stream()
-        .collect(
-            Collectors.partitioningBy(subscriber -> 
subscriber.publishMessage(channel, message)));
+        .map(subscription -> subscription.publishMessage(channel, message))
+        .collect(Collectors.partitioningBy(PublishResult::isSuccessful));
 
     prune(results.get(false));
 
     return results.get(true).size();
   }
 
-  private void prune(List<Subscriber> failedSubscribers) {
-    failedSubscribers.forEach(subscriber -> {
-      if (subscriber.client.isDead()) {
-        subscribers.remove(subscriber.client);
+  private void prune(List<PublishResult> failedSubscriptions) {
+    failedSubscriptions.forEach(publishResult -> {
+      Client client = publishResult.getClient();
+      if (client.isDead()) {
+        subscriptions.remove(client);
       }
     });
   }
-
-  private class Subscribers {
-    List<Subscriber> subscribers = new ArrayList<>();
-
-    private boolean exists(String channel, Client client) {
-      return subscribers.stream().anyMatch(subscriber -> 
subscriber.isEqualTo(channel, client));
-    }
-
-    private List<Subscriber> findSubscribers(Client client) {
-      return subscribers.stream().filter(subscriber -> 
subscriber.client.equals(client))
-          .collect(Collectors.toList());
-    }
-
-    private List<Subscriber> findSubscribers(String channel) {
-      return subscribers.stream().filter(subscriber -> 
subscriber.channel.equals(channel))
-          .collect(Collectors.toList());
-    }
-
-    public void add(Subscriber subscriber) {
-      this.subscribers.add(subscriber);
-    }
-
-    public void remove(String channel, Client client) {
-      this.subscribers.removeIf(subscriber -> subscriber.isEqualTo(channel, 
client));
-    }
-
-    public void remove(Client client) {
-      this.subscribers.removeIf(subscriber -> 
subscriber.client.equals(client));
-    }
-  }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PublishResult.java
similarity index 59%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to 
geode-redis/src/main/java/org/apache/geode/redis/internal/PublishResult.java
index 494bce3..d4aa15b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PublishResult.java
@@ -15,35 +15,24 @@
 
 package org.apache.geode.redis.internal;
 
-import java.util.Objects;
-
-import io.netty.channel.Channel;
-
-class Client {
-  private Channel channel;
-
-  public Client(Channel remoteAddress) {
-    this.channel = remoteAddress;
-  }
+/**
+ * Represents the results of publishing a message to a subscription. Contains 
the client the message
+ * was published to as well as whether or not the message was published 
successfully.
+ */
+public class PublishResult {
+  private final Client client;
+  private final boolean result;
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    Client client = (Client) o;
-    return Objects.equals(channel, client.channel);
+  public PublishResult(Client client, boolean result) {
+    this.client = client;
+    this.result = result;
   }
 
-  @Override
-  public int hashCode() {
-    return Objects.hash(channel);
+  public Client getClient() {
+    return client;
   }
 
-  public boolean isDead() {
-    return !this.channel.isOpen();
+  public boolean isSuccessful() {
+    return result;
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index fc7a0ab..aa8bb2a 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -66,7 +66,9 @@ import 
org.apache.geode.redis.internal.executor.list.LTrimExecutor;
 import org.apache.geode.redis.internal.executor.list.RPopExecutor;
 import org.apache.geode.redis.internal.executor.list.RPushExecutor;
 import org.apache.geode.redis.internal.executor.list.RPushXExecutor;
+import org.apache.geode.redis.internal.executor.pubsub.PsubscribeExecutor;
 import org.apache.geode.redis.internal.executor.pubsub.PublishExecutor;
+import org.apache.geode.redis.internal.executor.pubsub.PunsubscribeExecutor;
 import org.apache.geode.redis.internal.executor.pubsub.SubscribeExecutor;
 import org.apache.geode.redis.internal.executor.pubsub.UnsubscribeExecutor;
 import org.apache.geode.redis.internal.executor.set.SAddExecutor;
@@ -2624,6 +2626,10 @@ public enum RedisCommandType {
       return this.dataType;
     }
   },
+
+  /**
+   * PUBLISH channel message
+   */
   PUBLISH {
     private Executor executor;
 
@@ -2642,6 +2648,10 @@ public enum RedisCommandType {
       return this.dataType;
     }
   },
+
+  /**
+   * UNSUBSCRIBE channel...
+   */
   UNSUBSCRIBE {
     private Executor executor;
 
@@ -2661,6 +2671,52 @@ public enum RedisCommandType {
     }
   },
 
+  /**
+   * PSUBSCRIBE channel-pattern...
+   * <p>
+   * subscribe to channel
+   */
+  PSUBSCRIBE {
+    private Executor executor;
+
+    @Override
+    public Executor getExecutor() {
+      if (executor == null) {
+        executor = new PsubscribeExecutor();
+      }
+      return executor;
+    }
+
+    private final RedisDataType dataType = RedisDataType.REDIS_PUBSUB;
+
+    @Override
+    public RedisDataType getDataType() {
+      return this.dataType;
+    }
+  },
+
+  /**
+   * PUNSUBSCRIBE channel...
+   */
+  PUNSUBSCRIBE {
+    private Executor executor;
+
+    @Override
+    public Executor getExecutor() {
+      if (executor == null) {
+        executor = new PunsubscribeExecutor();
+      }
+      return executor;
+    }
+
+    private final RedisDataType dataType = RedisDataType.REDIS_PUBSUB;
+
+    @Override
+    public RedisDataType getDataType() {
+      return this.dataType;
+    }
+  },
+
 
   /**************************************
    * Geospatial commands ****************
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
similarity index 54%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to 
geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
index 494bce3..46be25a 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
@@ -11,39 +11,33 @@
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
+ *
  */
 
 package org.apache.geode.redis.internal;
 
-import java.util.Objects;
-
-import io.netty.channel.Channel;
-
-class Client {
-  private Channel channel;
-
-  public Client(Channel remoteAddress) {
-    this.channel = remoteAddress;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    Client client = (Client) o;
-    return Objects.equals(channel, client.channel);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(channel);
-  }
-
-  public boolean isDead() {
-    return !this.channel.isOpen();
-  }
+/**
+ * Interface that represents the relationship between a channel or pattern and 
client.
+ */
+public interface Subscription {
+  /**
+   * Equality of a subscription is represented by a combination of client and 
one of channel or
+   * pattern
+   */
+  boolean isEqualTo(Object channelOrPattern, Client client);
+
+  /**
+   * Will publish a message to the designated channel.
+   */
+  PublishResult publishMessage(String channel, String message);
+
+  /**
+   * Verifies that the subscription is established with the designated client.
+   */
+  boolean matchesClient(Client client);
+
+  /**
+   * Verifies that the subscription channel or pattern matches the designated 
channel.
+   */
+  boolean matches(String channel);
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
new file mode 100644
index 0000000..b66770a
--- /dev/null
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Class that manages both channel and pattern subscriptions.
+ */
+public class Subscriptions {
+  private List<Subscription> subscriptions = new ArrayList<>();
+
+  /**
+   * Check whether a given client has already subscribed to a channel or 
pattern
+   *
+   * @param channelOrPattern channel or pattern
+   * @param client a client connection
+   */
+  public boolean exists(Object channelOrPattern, Client client) {
+    return subscriptions.stream()
+        .anyMatch(subscription -> subscription.isEqualTo(channelOrPattern, 
client));
+  }
+
+  /**
+   * Return all subscriptions for a given client
+   *
+   * @param client the subscribed client
+   * @return a list of subscriptions
+   */
+  public List<Subscription> findSubscriptions(Client client) {
+    return subscriptions.stream()
+        .filter(subscription -> subscription.matchesClient(client))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Return all subscriptions for a given channel or pattern
+   *
+   * @param channelOrPattern the channel or pattern
+   * @return a list of subscriptions
+   */
+  public List<Subscription> findSubscriptions(String channelOrPattern) {
+    return this.subscriptions.stream()
+        .filter(subscription -> subscription.matches(channelOrPattern))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Add a new subscription
+   */
+  public void add(Subscription subscription) {
+    this.subscriptions.add(subscription);
+  }
+
+  /**
+   * Remove all subscriptions for a given client
+   */
+  public void remove(Client client) {
+    this.subscriptions.removeIf(subscription -> 
subscription.matchesClient(client));
+  }
+
+  /**
+   * Remove a single subscription
+   */
+  public void remove(Object channel, Client client) {
+    this.subscriptions.removeIf(subscription -> 
subscription.isEqualTo(channel, client));
+  }
+}
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
new file mode 100644
index 0000000..5a91146
--- /dev/null
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import java.util.ArrayList;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.Coder;
+import org.apache.geode.redis.internal.CoderException;
+import org.apache.geode.redis.internal.Command;
+import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
+
+public class PsubscribeExecutor extends AbstractExecutor {
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+    ArrayList<ArrayList<Object>> items = new ArrayList<>();
+    for (int i = 1; i < command.getProcessedCommand().size(); i++) {
+      ArrayList<Object> item = new ArrayList<>();
+      byte[] pattern = command.getProcessedCommand().get(i);
+      long subscribedChannels =
+          context.getPubSub().psubscribe(
+              new GlobPattern(new String(pattern)), context, 
context.getClient());
+
+      item.add("psubscribe");
+      item.add(pattern);
+      item.add(subscribedChannels);
+
+      items.add(item);
+    }
+
+    writeResponse(command, context, items);
+  }
+
+  private void writeResponse(Command command, ExecutionHandlerContext context,
+      ArrayList<ArrayList<Object>> items) {
+    ByteBuf aggregatedResponse = context.getByteBufAllocator().buffer();
+    items.forEach(item -> {
+      ByteBuf response = null;
+      try {
+        response = Coder.getArrayResponse(context.getByteBufAllocator(), item);
+      } catch (CoderException e) {
+        logger.warn("Error encoding subscribe response", e);
+      }
+      aggregatedResponse.writeBytes(response);
+    });
+    command.setResponse(aggregatedResponse);
+  }
+
+}
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
index 0f8b585..75b6b26 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
@@ -22,10 +22,10 @@ import io.netty.buffer.ByteBuf;
 import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.Executor;
 import org.apache.geode.redis.internal.RedisConstants.ArityDef;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
 
-public class PublishExecutor implements Executor {
+public class PublishExecutor extends AbstractExecutor {
 
 
   @Override
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
similarity index 53%
copy from 
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
copy to 
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
index 0f8b585..0f1c1a3 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
@@ -11,41 +11,46 @@
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
+ *
  */
 
 package org.apache.geode.redis.internal.executor.pubsub;
 
-import java.util.List;
+import java.util.ArrayList;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.internal.Coder;
+import org.apache.geode.redis.internal.CoderException;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.Executor;
-import org.apache.geode.redis.internal.RedisConstants.ArityDef;
-
-public class PublishExecutor implements Executor {
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
 
+public class PunsubscribeExecutor extends AbstractExecutor {
+  private static final Logger logger = LogService.getLogger();
 
   @Override
   public void executeCommand(Command command, ExecutionHandlerContext context) 
{
-    List<byte[]> args = command.getProcessedCommand();
-    if (args.size() != 3) {
-      
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), 
ArityDef.PUBLISH));
-      return;
+    byte[] pattern = command.getProcessedCommand().get(1);
+    long subscriptionCount =
+        context
+            .getPubSub()
+            .punsubscribe(new GlobPattern(new String(pattern)), 
context.getClient());
+
+    ArrayList<Object> items = new ArrayList<>();
+    items.add("punsubscribe");
+    items.add(pattern);
+    items.add(subscriptionCount);
+
+    ByteBuf response = null;
+    try {
+      response = Coder.getArrayResponse(context.getByteBufAllocator(), items);
+    } catch (CoderException e) {
+      logger.warn("Error encoding unsubscribe response", e);
     }
-
-    String channelName = new String(args.get(1));
-    String message = new String(args.get(2));
-    long publishCount = context.getPubSub().publish(channelName, message);
-
-    writeResponse(command, context, publishCount);
-  }
-
-  private void writeResponse(Command command, ExecutionHandlerContext context,
-      long publishCount) {
-    ByteBuf response = Coder.getIntegerResponse(context.getByteBufAllocator(), 
publishCount);
     command.setResponse(response);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
index 94bc21b..7cedb5c 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
@@ -15,6 +15,7 @@
 
 package org.apache.geode.redis.internal.org.apache.hadoop.fs;
 
+import java.util.Objects;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -159,6 +160,23 @@ public class GlobPattern {
     return hasWildcard;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof GlobPattern)) {
+      return false;
+    }
+    GlobPattern that = (GlobPattern) o;
+    return this.compiled.pattern().equals(that.compiled.pattern());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(compiled, hasWildcard);
+  }
+
   private static void error(String message, String pattern, int pos) {
     throw new PatternSyntaxException(message, pattern, pos);
   }
diff --git 
a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
 
b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
index c98a91f..1883d15 100755
--- 
a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
+++ 
b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
@@ -31,6 +31,8 @@ 
org/apache/geode/redis/internal/RedisCommandType$12,false,dataType:org/apache/ge
 
org/apache/geode/redis/internal/RedisCommandType$120,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
 
org/apache/geode/redis/internal/RedisCommandType$121,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
 
org/apache/geode/redis/internal/RedisCommandType$122,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
+org/apache/geode/redis/internal/RedisCommandType$123,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
+org/apache/geode/redis/internal/RedisCommandType$124,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
 
org/apache/geode/redis/internal/RedisCommandType$13,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
 
org/apache/geode/redis/internal/RedisCommandType$14,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
 
org/apache/geode/redis/internal/RedisCommandType$15,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
diff --git 
a/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
 
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
new file mode 100644
index 0000000..e667da0
--- /dev/null
+++ 
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+
+public class PubSubImplJUnitTest {
+
+  @Test
+  public void testSubscriptionWithDeadClientIsPruned() {
+    Subscriptions subscriptions = new Subscriptions();
+    ExecutionHandlerContext mockExecutionHandlerContext = 
mock(ExecutionHandlerContext.class);
+
+    Client deadClient = mock(Client.class);
+    when(deadClient.isDead()).thenReturn(true);
+
+    ChannelSubscription subscription =
+        spy(new ChannelSubscription(deadClient,
+            "sally", mockExecutionHandlerContext));
+
+    doReturn(new PublishResult(deadClient, 
false)).when(subscription).publishMessage(any(), any());
+    subscriptions.add(subscription);
+
+    PubSubImpl subject = new PubSubImpl(subscriptions);
+
+    Long numberOfSubscriptions = subject.publishMessageToSubscribers("sally", 
"message");
+
+    assertThat(numberOfSubscriptions).isEqualTo(0);
+    assertThat(subscriptions.findSubscriptions(deadClient)).isEmpty();
+  }
+}
diff --git 
a/geode-redis/src/test/java/org/apache/geode/redis/internal/SubscriptionsTest.java
 
b/geode-redis/src/test/java/org/apache/geode/redis/internal/SubscriptionsTest.java
new file mode 100644
index 0000000..f216a04
--- /dev/null
+++ 
b/geode-redis/src/test/java/org/apache/geode/redis/internal/SubscriptionsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import io.netty.channel.Channel;
+import org.junit.Test;
+
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
+
+public class SubscriptionsTest {
+
+  @Test
+  public void correctlyIdentifiesChannelSubscriber() {
+    Subscriptions subscriptions = new Subscriptions();
+
+    Channel channel = mock(Channel.class);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    Client client = new Client(channel);
+
+    subscriptions.add(new ChannelSubscription(client, "subscriptions", 
context));
+
+    assertThat(subscriptions.exists("subscriptions", client)).isTrue();
+    assertThat(subscriptions.exists("unknown", client)).isFalse();
+  }
+
+  @Test
+  public void correctlyIdentifiesPatternSubscriber() {
+    Subscriptions subscriptions = new Subscriptions();
+
+    Channel channel = mock(Channel.class);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    Client client = new Client(channel);
+    GlobPattern pattern = new GlobPattern("sub*s");
+
+    subscriptions.add(new PatternSubscription(client, pattern, context));
+
+    assertThat(subscriptions.exists(pattern, client)).isTrue();
+  }
+
+  @Test
+  public void doesNotMisidentifyChannelAsPattern() {
+    Subscriptions subscriptions = new Subscriptions();
+
+    Channel channel = mock(Channel.class);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    Client client = new Client(channel);
+    GlobPattern globPattern1 = new GlobPattern("sub*s");
+    GlobPattern globPattern2 = new GlobPattern("subscriptions");
+
+    subscriptions.add(new ChannelSubscription(client, "subscriptions", 
context));
+
+    assertThat(subscriptions.exists(globPattern1, client)).isFalse();
+    assertThat(subscriptions.exists(globPattern2, client)).isFalse();
+  }
+
+  @Test
+  public void doesNotMisidentifyWhenBothTypesArePresent() {
+    Subscriptions subscriptions = new Subscriptions();
+
+    Channel channel = mock(Channel.class);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    Client client = new Client(channel);
+    GlobPattern globby = new GlobPattern("sub*s");
+
+    subscriptions.add(new ChannelSubscription(client, "subscriptions", 
context));
+    subscriptions.add(new PatternSubscription(client, globby, context));
+
+    assertThat(subscriptions.exists(globby, client)).isTrue();
+    assertThat(subscriptions.exists("subscriptions", client)).isTrue();
+  }
+
+
+  @Test
+  public void verifyDifferentMockChannelsNotEqual() {
+    Channel mockChannelOne = mock(Channel.class);
+    Channel mockChannelTwo = mock(Channel.class);
+
+    assertThat(mockChannelOne).isNotEqualTo(mockChannelTwo);
+    assertThat(mockChannelOne.equals(mockChannelTwo)).isFalse();
+  }
+
+  @Test
+  public void findSubscribers() {
+    Subscriptions subscriptions = new Subscriptions();
+
+    Channel mockChannelOne = mock(Channel.class);
+    Channel mockChannelTwo = mock(Channel.class);
+
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    Client clientOne = new Client(mockChannelOne);
+    Client clientTwo = new Client(mockChannelTwo);
+
+    ChannelSubscription subscriptionOne =
+        new ChannelSubscription(clientOne, "subscriptions", context);
+    ChannelSubscription subscriptionTwo = new ChannelSubscription(clientTwo, 
"monkeys", context);
+
+    subscriptions.add(subscriptionOne);
+    subscriptions.add(subscriptionTwo);
+
+    
assertThat(subscriptions.findSubscriptions(clientOne)).containsExactly(subscriptionOne);
+  }
+
+  @Test
+  public void removeByClient() {
+    Subscriptions subscriptions = new Subscriptions();
+    Channel mockChannelOne = mock(Channel.class);
+    Channel mockChannelTwo = mock(Channel.class);
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    Client clientOne = new Client(mockChannelOne);
+    Client clientTwo = new Client(mockChannelTwo);
+
+    ChannelSubscription subscriptionOne =
+        new ChannelSubscription(clientOne, "subscriptions", context);
+    ChannelSubscription subscriptionTwo = new ChannelSubscription(clientTwo, 
"monkeys", context);
+
+    subscriptions.add(subscriptionOne);
+    subscriptions.add(subscriptionTwo);
+
+    subscriptions.remove(clientOne);
+
+    assertThat(subscriptions.findSubscriptions(clientOne)).isEmpty();
+    
assertThat(subscriptions.findSubscriptions(clientTwo)).containsExactly(subscriptionTwo);
+  }
+
+  @Test
+  public void removeByClientAndPattern() {
+
+    Subscriptions subscriptions = new Subscriptions();
+    Channel mockChannelOne = mock(Channel.class);
+
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    Client client = new Client(mockChannelOne);
+
+    ChannelSubscription channelSubscriberOne =
+        new ChannelSubscription(client, "subscriptions", context);
+    GlobPattern pattern = new GlobPattern("monkeys");
+    PatternSubscription patternSubscriber = new PatternSubscription(client,
+        pattern, context);
+    ChannelSubscription channelSubscriberTwo = new ChannelSubscription(client, 
"monkeys", context);
+
+    subscriptions.add(channelSubscriberOne);
+    subscriptions.add(patternSubscriber);
+    subscriptions.add(channelSubscriberTwo);
+
+    subscriptions.remove(pattern, client);
+
+    assertThat(subscriptions
+        .findSubscriptions(client))
+            .containsExactlyInAnyOrder(
+                channelSubscriberOne,
+                channelSubscriberTwo);
+  }
+}

Reply via email to