This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5e2baea  Feature/expand pubsub support (#5284)
5e2baea is described below

commit 5e2baea7516b5e900f2699afddb63c59a20a1ab3
Author: Darrel Schneider <dschnei...@pivotal.io>
AuthorDate: Thu Jun 25 05:47:48 2020 -0700

    Feature/expand pubsub support (#5284)
    
    Co-authored-by: Jens Deppe <jde...@pivotal.io>
    Co-authored-by: Sarah <sab...@pivotal.io>
    Co-authored-by: Darrel Schneider <dschnei...@pivotal.io>
---
 geode-redis/build.gradle                           |   1 +
 .../executor/pubsub/PubSubIntegrationTest.java     | 329 +++++++++++++++++++++
 .../geode/redis/mocks/DummySubscription.java       |   5 +
 .../apache/geode/redis/mocks/MockSubscriber.java   |  53 ++++
 .../geode/redis/internal/GeodeRedisServer.java     |   4 +-
 .../geode/redis/internal/RedisCommandType.java     |   4 +-
 .../executor/pubsub/PunsubscribeExecutor.java      |  56 +++-
 .../executor/pubsub/UnsubscribeExecutor.java       |  55 +++-
 .../apache/geode/redis/internal/netty/Coder.java   |   2 +-
 .../internal/netty/ExecutionHandlerContext.java    |   7 +
 .../redis/internal/pubsub/ChannelSubscription.java |   4 +
 .../redis/internal/pubsub/PatternSubscription.java |   4 +
 .../apache/geode/redis/internal/pubsub/PubSub.java |  11 +
 .../geode/redis/internal/pubsub/PubSubImpl.java    |  29 +-
 .../geode/redis/internal/pubsub/Subscription.java  |   6 +
 .../geode/redis/internal/pubsub/Subscriptions.java |  39 ++-
 geode-redis/src/test/resources/expected-pom.xml    |  11 +
 17 files changed, 569 insertions(+), 51 deletions(-)

diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle
index 6932b29..bd58ce7 100644
--- a/geode-redis/build.gradle
+++ b/geode-redis/build.gradle
@@ -38,6 +38,7 @@ dependencies {
   implementation('io.netty:netty-all')
   implementation('org.apache.logging.log4j:log4j-api')
   implementation('commons-codec:commons-codec')
+  implementation('org.apache.commons:commons-lang3')
 
   testImplementation(project(':geode-junit'))
   testImplementation('org.mockito:mockito-core')
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
index 7389d5b..20c4a60 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -29,6 +30,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
 
 import org.apache.geode.redis.GeodeRedisServerRule;
 import org.apache.geode.redis.mocks.MockBinarySubscriber;
@@ -66,6 +68,27 @@ public class PubSubIntegrationTest {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
+  public void punsubscribe_whenNonexistent() {
+    assertThat((List<Object>) 
subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE, "Nonexistent"))
+        .containsExactly("punsubscribe".getBytes(), "Nonexistent".getBytes(), 
0L);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void unsubscribe_whenNoSubscriptionsExist_shouldNotHang() {
+    assertThat((List<Object>) 
subscriber.sendCommand(Protocol.Command.UNSUBSCRIBE))
+        .containsExactly("unsubscribe".getBytes(), null, 0L);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void punsubscribe_whenNoSubscriptionsExist_shouldNotHang() {
+    assertThat((List<Object>) 
subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE))
+        .containsExactly("punsubscribe".getBytes(), null, 0L);
+  }
+
+  @Test
   public void testOneSubscriberOneChannel() {
     List<String> expectedMessages = Arrays.asList("hello");
 
@@ -90,6 +113,170 @@ public class PubSubIntegrationTest {
   }
 
   @Test
+  public void punsubscribe_givenSubscribe_doesNotReduceSubscriptions() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    Runnable runnable = () -> {
+      subscriber.subscribe(mockSubscriber, "salutations");
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    try {
+      waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+      mockSubscriber.punsubscribe("salutations");
+      waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1);
+
+      
assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("salutations");
+      assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
+      assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+    } finally {
+      // now cleanup the actual subscription
+      mockSubscriber.unsubscribe("salutations");
+      waitFor(() -> !subscriberThread.isAlive());
+    }
+  }
+
+  @Test
+  public void unsubscribe_givenPsubscribe_doesNotReduceSubscriptions() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    Runnable runnable = () -> {
+      subscriber.psubscribe(mockSubscriber, "salutations");
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    try {
+      waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+      mockSubscriber.unsubscribe("salutations");
+      waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+      
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+      assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+      assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+    } finally {
+      // now cleanup the actual subscription
+      mockSubscriber.punsubscribe("salutations");
+      waitFor(() -> !subscriberThread.isAlive());
+    }
+  }
+
+  @Test
+  public void 
unsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    Runnable runnable = () -> {
+      subscriber.subscribe(mockSubscriber, "salutations");
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    try {
+      waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+      mockSubscriber.unsubscribe("NonExistent");
+      waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+      
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("NonExistent");
+      assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+      assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+    } finally {
+      // now cleanup the actual subscription
+      mockSubscriber.unsubscribe("salutations");
+      waitFor(() -> !subscriberThread.isAlive());
+    }
+  }
+
+  @Test
+  public void unsubscribe_whenGivenAnEmptyString() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    Runnable runnable = () -> subscriber.subscribe(mockSubscriber, 
"salutations");
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    try {
+      waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+      mockSubscriber.unsubscribe("");
+      waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+      assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("");
+      assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+      assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+    } finally {
+      // now cleanup the actual subscription
+      mockSubscriber.unsubscribe();
+      waitFor(() -> !subscriberThread.isAlive());
+    }
+  }
+
+  @Test
+  public void unsubscribeWithEmptyChannel_doesNotUnsubscribeExistingChannels() 
{
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    Runnable runnable = () -> subscriber.subscribe(mockSubscriber, 
"salutations");
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    try {
+      waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+      mockSubscriber.unsubscribe("");
+      waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+      Long result = publisher.publish("salutations", "heyho");
+      waitFor(() -> mockSubscriber.getReceivedMessages().size() == 1);
+
+      assertThat(result).isEqualTo(1);
+      
assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo("heyho");
+    } finally {
+      // now cleanup the actual subscription
+      mockSubscriber.unsubscribe();
+      waitFor(() -> !subscriberThread.isAlive());
+    }
+  }
+
+  @Test
+  public void canSubscribeToAnEmptyString() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "");
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+    Long result = publisher.publish("", "blank");
+    assertThat(result).isEqualTo(1);
+
+    mockSubscriber.unsubscribe("");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
+
+    assertThat(mockSubscriber.getReceivedMessages()).containsExactly("blank");
+  }
+
+  @Test
+  public void 
punsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    Runnable runnable = () -> {
+      subscriber.psubscribe(mockSubscriber, "salutations");
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    try {
+      waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+      mockSubscriber.punsubscribe("NonExistent");
+      waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1);
+
+      
assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("NonExistent");
+      assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
+      assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+    } finally {
+      // now cleanup the actual subscription
+      mockSubscriber.punsubscribe("salutations");
+      waitFor(() -> !subscriberThread.isAlive());
+    }
+  }
+
+  @Test
   public void testPublishBinaryData() {
     byte[] expectedMessage = new byte[256];
     for (int i = 0; i < 256; i++) {
@@ -117,6 +304,33 @@ public class PubSubIntegrationTest {
   }
 
   @Test
+  public void testSubscribeAndPublishUsingBinaryData() {
+    byte[] binaryBlob = new byte[256];
+    for (int i = 0; i < 256; i++) {
+      binaryBlob[i] = (byte) i;
+    }
+
+    MockBinarySubscriber mockSubscriber = new MockBinarySubscriber();
+
+    Runnable runnable = () -> {
+      subscriber.subscribe(mockSubscriber, binaryBlob);
+    };
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+    Long result = publisher.publish(binaryBlob, binaryBlob);
+    assertThat(result).isEqualTo(1);
+
+    mockSubscriber.unsubscribe(binaryBlob);
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
+
+    
assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(binaryBlob);
+  }
+
+  @Test
   public void testOneSubscriberSubscribingToTwoChannels() {
     List<String> expectedMessages = Arrays.asList("hello", "howdy");
     MockSubscriber mockSubscriber = new MockSubscriber();
@@ -135,14 +349,111 @@ public class PubSubIntegrationTest {
     assertThat(result).isEqualTo(1);
     mockSubscriber.unsubscribe("salutations");
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+    assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
+    
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+    assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+    mockSubscriber.unsubscribeInfos.clear();
     mockSubscriber.unsubscribe("yuletide");
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
+    
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("yuletide");
+    assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0);
     waitFor(() -> !subscriberThread.isAlive());
 
     
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
   }
 
   @Test
+  public void testSubscribingAndUnsubscribingFromMultipleChannels() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    Runnable runnable = () -> subscriber.subscribe(mockSubscriber, 
"salutations", "yuletide");
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+    mockSubscriber.unsubscribe("yuletide", "salutations");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
+
+    List<String> unsubscribedChannels = 
mockSubscriber.unsubscribeInfos.stream()
+        .map(x -> x.channel).collect(Collectors.toList());
+    assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", 
"yuletide");
+
+    List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
+        .map(x -> x.count).collect(Collectors.toList());
+    assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
+
+  }
+
+  @Test
+  public void testUnsubscribingImplicitlyFromAllChannels() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    Runnable runnable = () -> subscriber.subscribe(mockSubscriber, 
"salutations", "yuletide");
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+    mockSubscriber.unsubscribe();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
+
+    List<String> unsubscribedChannels = 
mockSubscriber.unsubscribeInfos.stream()
+        .map(x -> x.channel).collect(Collectors.toList());
+    assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", 
"yuletide");
+
+    List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
+        .map(x -> x.count).collect(Collectors.toList());
+    assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
+
+    Long result = publisher.publish("salutations", "greetings");
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void testPsubscribingAndPunsubscribingFromMultipleChannels() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", 
"yul*");
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+    mockSubscriber.punsubscribe("yul*", "sal*");
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
+    assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
+        new MockSubscriber.UnsubscribeInfo("yul*", 1),
+        new MockSubscriber.UnsubscribeInfo("sal*", 0));
+  }
+
+  @Test
+  public void testPunsubscribingImplicitlyFromAllChannels() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+
+    Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", 
"yul*");
+
+    Thread subscriberThread = new Thread(runnable);
+    subscriberThread.start();
+
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+    mockSubscriber.punsubscribe();
+    waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+    waitFor(() -> !subscriberThread.isAlive());
+    assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
+        new MockSubscriber.UnsubscribeInfo("sal*", 1),
+        new MockSubscriber.UnsubscribeInfo("yul*", 0));
+  }
+
+  @Test
   public void testTwoSubscribersOneChannel() {
     Jedis subscriber2 = new Jedis("localhost", getPort(), 
REDIS_CLIENT_TIMEOUT);
     MockSubscriber mockSubscriber1 = new MockSubscriber();
@@ -164,12 +475,18 @@ public class PubSubIntegrationTest {
     mockSubscriber1.unsubscribe("salutations");
     waitFor(() -> mockSubscriber1.getSubscribedChannels() == 0);
     waitFor(() -> !subscriber1Thread.isAlive());
+    assertThat(mockSubscriber1.unsubscribeInfos).hasSize(1);
+    
assertThat(mockSubscriber1.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+    assertThat(mockSubscriber1.unsubscribeInfos.get(0).count).isEqualTo(0);
 
     result = publisher.publish("salutations", "goodbye");
     assertThat(result).isEqualTo(1);
     mockSubscriber2.unsubscribe("salutations");
     waitFor(() -> mockSubscriber2.getSubscribedChannels() == 0);
     waitFor(() -> !subscriber2Thread.isAlive());
+    assertThat(mockSubscriber2.unsubscribeInfos).hasSize(1);
+    
assertThat(mockSubscriber2.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+    assertThat(mockSubscriber2.unsubscribeInfos.get(0).count).isEqualTo(0);
 
     
assertThat(mockSubscriber1.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
     
assertThat(mockSubscriber2.getReceivedMessages()).isEqualTo(Arrays.asList("hello",
 "goodbye"));
@@ -201,6 +518,8 @@ public class PubSubIntegrationTest {
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
 
     waitFor(() -> !subscriberThread.isAlive());
+    assertThat(mockSubscriber.unsubscribeInfos)
+        .containsExactly(new MockSubscriber.UnsubscribeInfo("salutations", 0));
     
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
   }
 
@@ -252,6 +571,8 @@ public class PubSubIntegrationTest {
     mockSubscriber.punsubscribe("sal*s");
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
     waitFor(() -> !subscriberThread.isAlive());
+    assertThat(mockSubscriber.punsubscribeInfos)
+        .containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0));
   }
 
   @Test
@@ -283,6 +604,8 @@ public class PubSubIntegrationTest {
     mockSubscriber.punsubscribe("sal*s");
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
     waitFor(() -> !subscriberThread.isAlive());
+    assertThat(mockSubscriber.punsubscribeInfos)
+        .containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0));
   }
 
   @Test
@@ -305,11 +628,17 @@ public class PubSubIntegrationTest {
 
     mockSubscriber.punsubscribe("sal*s");
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+    assertThat(mockSubscriber.punsubscribeInfos).hasSize(1);
+    
assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("sal*s");
+    assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
 
     mockSubscriber.unsubscribe("salutations");
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
 
     waitFor(() -> !subscriberThread.isAlive());
+    assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
+    
assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+    assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0);
 
     assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello");
     assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
index 52db20e..d3ba73b 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
@@ -47,4 +47,9 @@ public class DummySubscription implements Subscription {
   public List<Object> createResponse(String channel, byte[] message) {
     return null;
   }
+
+  @Override
+  public String getChannelName() {
+    return null;
+  }
 }
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index 5ba976e..e0ecb43 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -19,6 +19,7 @@ package org.apache.geode.redis.mocks;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import redis.clients.jedis.JedisPubSub;
 
@@ -34,6 +35,58 @@ public class MockSubscriber extends JedisPubSub {
     return new ArrayList<>(receivedPMessages);
   }
 
+  public final List<UnsubscribeInfo> unsubscribeInfos =
+      Collections.synchronizedList(new ArrayList<>());
+  public final List<UnsubscribeInfo> punsubscribeInfos =
+      Collections.synchronizedList(new ArrayList<>());
+
+  @Override
+  public void onUnsubscribe(String channel, int subscribedChannels) {
+    unsubscribeInfos.add(new UnsubscribeInfo(channel, subscribedChannels));
+  }
+
+  @Override
+  public void onPUnsubscribe(String pattern, int subscribedChannels) {
+    punsubscribeInfos.add(new UnsubscribeInfo(pattern, subscribedChannels));
+  }
+
+  public static class UnsubscribeInfo {
+    public final String channel;
+    public final int count;
+
+    public UnsubscribeInfo(String channel, int count) {
+      this.channel = channel;
+      this.count = count;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof UnsubscribeInfo)) {
+        return false;
+      }
+      UnsubscribeInfo that = (UnsubscribeInfo) o;
+      return count == that.count &&
+          Objects.equals(channel, that.channel);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(channel, count);
+    }
+
+    @Override
+    public String toString() {
+      return "UnsubscribeInfo{" +
+          "channel='" + channel + '\'' +
+          ", count=" + count +
+          '}';
+    }
+  }
+
+
   @Override
   public void onMessage(String channel, String message) {
     receivedMessages.add(message);
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 0b66d64..9a860bf 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -46,6 +46,7 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.timeout.WriteTimeoutHandler;
 import io.netty.util.concurrent.Future;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
@@ -416,7 +417,8 @@ public class GeodeRedisServer {
 
     InternalDistributedSystem system = (InternalDistributedSystem) 
cache.getDistributedSystem();
     String redisPassword = system.getConfig().getRedisPassword();
-    final byte[] redisPasswordBytes = Coder.stringToBytes(redisPassword);
+    final byte[] redisPasswordBytes =
+        StringUtils.isBlank(redisPassword) ? null : 
Coder.stringToBytes(redisPassword);
     ServerBootstrap serverBootstrap = new ServerBootstrap();
 
     serverBootstrap.group(bossGroup, workerGroup).channel(socketClass)
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 4ec0ac6..2c7044b 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -164,9 +164,9 @@ public enum RedisCommandType {
 
   SUBSCRIBE(new SubscribeExecutor(), SUPPORTED, new 
MinimumParameterRequirements(2)),
   PUBLISH(new PublishExecutor(), SUPPORTED, new ExactParameterRequirements(3)),
-  UNSUBSCRIBE(new UnsubscribeExecutor(), SUPPORTED, new 
MinimumParameterRequirements(2)),
+  UNSUBSCRIBE(new UnsubscribeExecutor(), SUPPORTED, new 
MinimumParameterRequirements(1)),
   PSUBSCRIBE(new PsubscribeExecutor(), SUPPORTED, new 
MinimumParameterRequirements(2)),
-  PUNSUBSCRIBE(new PunsubscribeExecutor(), SUPPORTED, new 
MinimumParameterRequirements(2)),
+  PUNSUBSCRIBE(new PunsubscribeExecutor(), SUPPORTED, new 
MinimumParameterRequirements(1)),
 
   UNKNOWN(new UnknownExecutor(), SUPPORTED),
 
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
index 9b49127..b71ca68 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
@@ -17,10 +17,14 @@
 package org.apache.geode.redis.internal.executor.pubsub;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.GlobPattern;
 import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -33,17 +37,45 @@ public class PunsubscribeExecutor extends AbstractExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    byte[] pattern = command.getProcessedCommand().get(1);
-    long subscriptionCount =
-        context
-            .getPubSub()
-            .punsubscribe(new GlobPattern(new String(pattern)), 
context.getClient());
-
-    ArrayList<Object> items = new ArrayList<>();
-    items.add("punsubscribe");
-    items.add(pattern);
-    items.add(subscriptionCount);
-
-    return RedisResponse.array(items);
+
+    List<String> channelNames = extractChannelNames(command);
+    if (channelNames.isEmpty()) {
+      channelNames = 
context.getPubSub().findSubscribedChannels(context.getClient());
+    }
+
+    Collection<Collection<?>> response = punsubscribe(context, channelNames);
+
+    return RedisResponse.flattenedArray(response);
+  }
+
+  private List<String> extractChannelNames(Command command) {
+    return command.getProcessedCommandWrappers().stream()
+        .skip(1)
+        .map(ByteArrayWrapper::toString)
+        .collect(Collectors.toList());
+  }
+
+  private Collection<Collection<?>> punsubscribe(ExecutionHandlerContext 
context,
+      List<String> channelNames) {
+    Collection<Collection<?>> response = new ArrayList<>();
+
+    if (channelNames.isEmpty()) {
+      response.add(createItem(null, 0));
+    } else {
+      for (String channel : channelNames) {
+        long subscriptionCount =
+            context.getPubSub().punsubscribe(new GlobPattern(channel), 
context.getClient());
+        response.add(createItem(channel, subscriptionCount));
+      }
+    }
+    return response;
+  }
+
+  private ArrayList<Object> createItem(String channel, long subscriptionCount) 
{
+    ArrayList<Object> oneItem = new ArrayList<>();
+    oneItem.add("punsubscribe");
+    oneItem.add(channel);
+    oneItem.add(subscriptionCount);
+    return oneItem;
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
index 853a29e..09d1a9d 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
@@ -16,34 +16,63 @@
 package org.apache.geode.redis.internal.executor.pubsub;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
 public class UnsubscribeExecutor extends AbstractExecutor {
-  private static final Logger logger = LogService.getLogger();
 
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    List<byte[]> commandElems = command.getProcessedCommand();
 
-    byte[] channelName = commandElems.get(1);
-    long subscriptionCount =
-        context.getPubSub().unsubscribe(new String(channelName), 
context.getClient());
+    List<String> channelNames = extractChannelNames(command);
+    if (channelNames.isEmpty()) {
+      channelNames = 
context.getPubSub().findSubscribedChannels(context.getClient());
+    }
+
+    Collection<Collection<?>> response = unsubscribe(context, channelNames);
+
+    return RedisResponse.flattenedArray(response);
+  }
+
+  private List<String> extractChannelNames(Command command) {
+    return command.getProcessedCommandWrappers().stream()
+        .skip(1)
+        .map(ByteArrayWrapper::toString)
+        .collect(Collectors.toList());
+  }
+
+  private Collection<Collection<?>> unsubscribe(ExecutionHandlerContext 
context,
+      List<String> channelNames) {
+    Collection<Collection<?>> response = new ArrayList<>();
+
+    if (channelNames.isEmpty()) {
+      response.add(createItem(null, 0));
+    } else {
+      for (String channel : channelNames) {
+        long subscriptionCount =
+            context.getPubSub().unsubscribe(channel, context.getClient());
 
-    ArrayList<Object> items = new ArrayList<>();
-    items.add("unsubscribe");
-    items.add(channelName);
-    items.add(subscriptionCount);
+        response.add(createItem(channel, subscriptionCount));
+      }
+    }
+
+    return response;
+  }
 
-    return RedisResponse.array(items);
+  private ArrayList<Object> createItem(String channelName, long 
subscriptionCount) {
+    ArrayList<Object> oneItem = new ArrayList<>();
+    oneItem.add("unsubscribe");
+    oneItem.add(channelName);
+    oneItem.add(subscriptionCount);
+    return oneItem;
   }
 
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 0f02a7c..6c06c08 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -352,7 +352,7 @@ public class Coder {
   }
 
   public static byte[] stringToBytes(String string) {
-    if (string == null || string.equals("")) {
+    if (string == null) {
       return null;
     }
     try {
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 297abb3..7ce9528 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -182,6 +182,13 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
       return;
     }
 
+    if (command.isOfType(RedisCommandType.SELECT)
+        || command.isOfType(RedisCommandType.CONFIG)
+        || command.isOfType(RedisCommandType.PUBSUB)) {
+      writeToChannel(RedisResponse.ok());
+      return;
+    }
+
     if (command.isUnsupported() && !allowUnsupportedCommands()) {
       writeToChannel(
           RedisResponse.error(command.getCommandType() + 
RedisConstants.ERROR_UNSUPPORTED_COMMAND));
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
index dcf2e50..9e0e4b8 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
@@ -53,4 +53,8 @@ class ChannelSubscription extends AbstractSubscription {
     return this.channel.equals(channel);
   }
 
+  @Override
+  public String getChannelName() {
+    return channel;
+  }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
index d19c870..2fbcdfe 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
@@ -54,4 +54,8 @@ class PatternSubscription extends AbstractSubscription {
     return pattern.matches(channel);
   }
 
+  @Override
+  public String getChannelName() {
+    return pattern.globPattern();
+  }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
index 22b9e40..69aebc4 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
@@ -16,6 +16,8 @@
 
 package org.apache.geode.redis.internal.pubsub;
 
+import java.util.List;
+
 import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
@@ -77,4 +79,13 @@ public interface PubSub {
    * @return the number of channels still subscribed to by the client
    */
   long punsubscribe(GlobPattern pattern, Client client);
+
+  /**
+   * Return a list of channel names that a client has subscribed to
+   *
+   * @param client the Client which is to be queried
+   * @return the list of channels
+   */
+  List<String> findSubscribedChannels(Client client);
+
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
index 71deb07..d6b5788 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
@@ -19,6 +19,7 @@ package org.apache.geode.redis.internal.pubsub;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
 
@@ -86,23 +87,12 @@ public class PubSubImpl implements PubSub {
 
   @Override
   public long subscribe(String channel, ExecutionHandlerContext context, 
Client client) {
-    if (subscriptions.exists(channel, client)) {
-      return subscriptions.findSubscriptions(client).size();
-    }
-    Subscription subscription = new ChannelSubscription(client, channel, 
context);
-    subscriptions.add(subscription);
-    return subscriptions.findSubscriptions(client).size();
+    return subscriptions.subscribe(channel, context, client);
   }
 
   @Override
   public long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, 
Client client) {
-    if (subscriptions.exists(pattern, client)) {
-      return subscriptions.findSubscriptions(client).size();
-    }
-    Subscription subscription = new PatternSubscription(client, pattern, 
context);
-    subscriptions.add(subscription);
-
-    return subscriptions.findSubscriptions(client).size();
+    return subscriptions.psubscribe(pattern, context, client);
   }
 
   private void registerPublishFunction() {
@@ -135,14 +125,19 @@ public class PubSubImpl implements PubSub {
 
   @Override
   public long unsubscribe(String channel, Client client) {
-    subscriptions.remove(channel, client);
-    return subscriptions.findSubscriptions(client).size();
+    return subscriptions.unsubscribe(channel, client);
   }
 
   @Override
   public long punsubscribe(GlobPattern pattern, Client client) {
-    subscriptions.remove(pattern, client);
-    return subscriptions.findSubscriptions(client).size();
+    return subscriptions.unsubscribe(pattern, client);
+  }
+
+  @Override
+  public List<String> findSubscribedChannels(Client client) {
+    return subscriptions.findSubscriptions(client).stream()
+        .map(Subscription::getChannelName)
+        .collect(Collectors.toList());
   }
 
   @VisibleForTesting
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
index 4490917..7695a17 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
@@ -51,4 +51,10 @@ public interface Subscription {
    * The response dependent on the type of the subscription
    */
   List<Object> createResponse(String channel, byte[] message);
+
+  /**
+   * Return the subscription name. In the case of a pattern the string 
representation of the
+   * pattern is returned.
+   */
+  String getChannelName();
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
index fc73a79..8f7e732 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
@@ -20,7 +20,10 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.redis.internal.executor.GlobPattern;
 import org.apache.geode.redis.internal.netty.Client;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
 /**
  * Class that manages both channel and pattern subscriptions.
@@ -34,7 +37,8 @@ public class Subscriptions {
    * @param channelOrPattern channel or pattern
    * @param client a client connection
    */
-  public boolean exists(Object channelOrPattern, Client client) {
+  @VisibleForTesting
+  boolean exists(Object channelOrPattern, Client client) {
     return subscriptions.stream()
         .anyMatch(subscription -> subscription.isEqualTo(channelOrPattern, 
client));
   }
@@ -66,7 +70,8 @@ public class Subscriptions {
   /**
    * Add a new subscription
    */
-  public void add(Subscription subscription) {
+  @VisibleForTesting
+  void add(Subscription subscription) {
     subscriptions.add(subscription);
   }
 
@@ -80,14 +85,38 @@ public class Subscriptions {
   /**
    * Remove a single subscription
    */
-  public void remove(Object channel, Client client) {
-    subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, 
client));
+  @VisibleForTesting
+  boolean remove(Object channel, Client client) {
+    return subscriptions.removeIf(subscription -> 
subscription.isEqualTo(channel, client));
   }
 
   /**
    * @return the total number of all local subscriptions
    */
-  public int size() {
+  @VisibleForTesting
+  int size() {
     return subscriptions.size();
   }
+
+  public synchronized long subscribe(String channel, ExecutionHandlerContext 
context,
+      Client client) {
+    if (!exists(channel, client)) {
+      add(new ChannelSubscription(client, channel, context));
+    }
+    return findSubscriptions(client).size();
+  }
+
+  public synchronized long psubscribe(GlobPattern pattern, 
ExecutionHandlerContext context,
+      Client client) {
+    if (!exists(pattern, client)) {
+      add(new PatternSubscription(client, pattern, context));
+    }
+    return findSubscriptions(client).size();
+  }
+
+  public synchronized long unsubscribe(Object channelOrPattern, Client client) 
{
+    remove(channelOrPattern, client);
+    return findSubscriptions(client).size();
+  }
+
 }
diff --git a/geode-redis/src/test/resources/expected-pom.xml 
b/geode-redis/src/test/resources/expected-pom.xml
index b3f8d0b..c17b109 100644
--- a/geode-redis/src/test/resources/expected-pom.xml
+++ b/geode-redis/src/test/resources/expected-pom.xml
@@ -134,5 +134,16 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>runtime</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>spring-boot-starter-logging</artifactId>
+          <groupId>org.springframework.boot</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 </project>

Reply via email to