This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 9f8a2ff GEODE-7800: Add Redis PSUBSCRIBE and PUNSUBSCRIBE commands
(#4705)
9f8a2ff is described below
commit 9f8a2ff2b43c183b4824dd5ab764ecd2243cb2e1
Author: Sarah Abbey <[email protected]>
AuthorDate: Sat Feb 15 21:59:42 2020 -0500
GEODE-7800: Add Redis PSUBSCRIBE and PUNSUBSCRIBE commands (#4705)
* GEODE-7800: Add Redis PSUBSCRIBE and PUNSUBSCRIBE commands
Similar to `SUBSCRIBE` and `UNSUBSCRIBE`, `PSUBSCRIBE` allows a client
to subscribe to a pattern. For example: `PSUBSCRIBE sal*s`
The subscription pattern is in the form of a glob supporting `*`, `?`
and ranges. https://redis.io/commands/psubscribe
Pattern subscriptions must be unsubscribed verbatim. i.e., the above
subscription would not be unsubscribed using the pattern `s*`, but must
be unsubscribed using the complete subscribed pattern, namely `sal*s`.
When clients subscribe to overlapping patterns (or channels) they will
receive a message for every matched subscription. Matches for a single
client are not conflated.
Co-authored-by: Sarah Abbey <[email protected]>
Co-authored-by: John Hutchison <[email protected]>
Co-authored-by: Jens Deppe <[email protected]>
* Fixes class names in sanctioned-geode-redis-serializables.txt
* Fixes flaky test
* Adds license to PublishResult
* Clean code using spA and make test reliable
stop propagating exceptions that get thrown when disconnecting Jedis
* Cleans up subscribers and publishers after each test or after the whole
class as needed
* re-add ignoreExceptions to waitfor test helper method
Co-authored-by: Jens Deppe <[email protected]>
Co-authored-by: Venkateswara Prasath Durairaj
<[email protected]>
---
.../java/org/apache/geode/redis/PubSubTest.java | 216 +++++++++++++--------
.../apache/geode/redis/mocks/MockSubscriber.java | 14 +-
.../org/apache/geode/redis/GeodeRedisServer.java | 3 +-
.../{Subscriber.java => AbstractSubscription.java} | 46 +++--
.../{Client.java => ChannelSubscription.java} | 38 ++--
.../org/apache/geode/redis/internal/Client.java | 2 +-
.../{Client.java => PatternSubscription.java} | 38 ++--
.../org/apache/geode/redis/internal/PubSub.java | 21 ++
.../apache/geode/redis/internal/PubSubImpl.java | 91 ++++-----
.../internal/{Client.java => PublishResult.java} | 39 ++--
.../geode/redis/internal/RedisCommandType.java | 56 ++++++
.../internal/{Client.java => Subscription.java} | 56 +++---
.../apache/geode/redis/internal/Subscriptions.java | 84 ++++++++
.../executor/pubsub/PsubscribeExecutor.java | 70 +++++++
.../internal/executor/pubsub/PublishExecutor.java | 4 +-
...lishExecutor.java => PunsubscribeExecutor.java} | 45 +++--
.../internal/org/apache/hadoop/fs/GlobPattern.java | 18 ++
.../sanctioned-geode-redis-serializables.txt | 2 +
.../geode/redis/internal/PubSubImplJUnitTest.java | 52 +++++
.../geode/redis/internal/SubscriptionsTest.java | 170 ++++++++++++++++
20 files changed, 780 insertions(+), 285 deletions(-)
diff --git
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
index 66f67a6..7897e36 100644
---
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
+++
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
@@ -18,13 +18,14 @@ package org.apache.geode.redis;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
@@ -41,10 +42,11 @@ import org.apache.geode.test.junit.categories.RedisTest;
@Category({RedisTest.class})
public class PubSubTest {
- private static Jedis jedis;
private static GeodeRedisServer server;
private static GemFireCache cache;
private static Random rand;
+ private static Jedis publisher;
+ private static Jedis subscriber;
private static int port = 6379;
@BeforeClass
@@ -57,26 +59,25 @@ public class PubSubTest {
cache = cf.create();
port = AvailablePortHelper.getRandomAvailableTCPPort();
server = new GeodeRedisServer("localhost", port);
+ subscriber = new Jedis("localhost", port);
+ publisher = new Jedis("localhost", port);
server.start();
- jedis = new Jedis("localhost", port, 10000000);
}
@AfterClass
public static void tearDown() {
- jedis.close();
+ subscriber.close();
+ publisher.close();
cache.close();
server.shutdown();
}
@Test
- public void testOneSubscriberOneChannel() throws InterruptedException {
- Jedis subscriber = new Jedis("localhost", port);
- Jedis publisher = new Jedis("localhost", port);
+ public void testOneSubscriberOneChannel() {
List<String> expectedMessages = Arrays.asList("hello");
- CountDownLatch latch = new CountDownLatch(1);
- MockSubscriber mockSubscriber = new MockSubscriber(latch);
+ MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations");
@@ -84,41 +85,29 @@ public class PubSubTest {
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
-
- assertThat(latch.await(2, TimeUnit.SECONDS))
- .as("channel subscriptions were not received")
- .isTrue();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
-
- subscriberThread.join(2000);
-
- assertThat(subscriberThread.isAlive())
- .as("subscriber thread should not be alive")
- .isFalse();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
}
@Test
- public void testOneSubscriberSubscribingToTwoChannels() throws Exception {
- Jedis subscriber = new Jedis("localhost", port);
- Jedis publisher = new Jedis("localhost", port);
+ public void testOneSubscriberSubscribingToTwoChannels() {
List<String> expectedMessages = Arrays.asList("hello", "howdy");
- CountDownLatch latch = new CountDownLatch(2);
- MockSubscriber mockSubscriber = new MockSubscriber(latch);
+ MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> subscriber.subscribe(mockSubscriber,
"salutations", "yuletide");
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
- assertThat(latch.await(2, TimeUnit.SECONDS))
- .as("channel subscriptions were not received")
- .isTrue();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(1);
@@ -126,28 +115,21 @@ public class PubSubTest {
result = publisher.publish("yuletide", "howdy");
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
mockSubscriber.unsubscribe("yuletide");
-
- subscriberThread.join(2000);
-
- assertThat(subscriberThread.isAlive())
- .as("subscriber thread should not be alive")
- .isFalse();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
}
@Test
- public void testTwoSubscribersOneChannel() throws Exception {
- Jedis subscriber1 = new Jedis("localhost", port);
+ public void testTwoSubscribersOneChannel() {
Jedis subscriber2 = new Jedis("localhost", port);
- Jedis publisher = new Jedis("localhost", port);
- CountDownLatch latch = new CountDownLatch(2);
- MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
- MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
-
- Runnable runnable1 = () -> subscriber1.subscribe(mockSubscriber1,
"salutations");
+ MockSubscriber mockSubscriber1 = new MockSubscriber();
+ MockSubscriber mockSubscriber2 = new MockSubscriber();
+ Runnable runnable1 = () -> subscriber.subscribe(mockSubscriber1,
"salutations");
Runnable runnable2 = () -> subscriber2.subscribe(mockSubscriber2,
"salutations");
Thread subscriber1Thread = new Thread(runnable1);
@@ -155,78 +137,104 @@ public class PubSubTest {
Thread subscriber2Thread = new Thread(runnable2);
subscriber2Thread.start();
- assertThat(latch.await(2, TimeUnit.SECONDS))
- .as("channel subscriptions were not received")
- .isTrue();
+ waitFor(() -> mockSubscriber1.getSubscribedChannels() == 1);
+ waitFor(() -> mockSubscriber2.getSubscribedChannels() == 1);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(2);
mockSubscriber1.unsubscribe("salutations");
-
- subscriber1Thread.join(2000);
-
- assertThat(subscriber1Thread.isAlive())
- .as("subscriber1 thread should not be alive")
- .isFalse();
+ waitFor(() -> mockSubscriber1.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriber1Thread.isAlive());
result = publisher.publish("salutations", "goodbye");
assertThat(result).isEqualTo(1);
mockSubscriber2.unsubscribe("salutations");
-
- subscriber2Thread.join(2000);
- assertThat(subscriber2Thread.isAlive())
- .as("subscriber2 thread should not be alive")
- .isFalse();
+ waitFor(() -> mockSubscriber2.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriber2Thread.isAlive());
assertThat(mockSubscriber1.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
assertThat(mockSubscriber2.getReceivedMessages()).isEqualTo(Arrays.asList("hello",
"goodbye"));
+
+ subscriber2.close();
}
@Test
- public void testPublishToNonexistentChannel() throws Exception {
- Jedis publisher = new Jedis("localhost", port);
+ public void testPublishToNonexistentChannel() {
Long result = publisher.publish("thisChannelDoesn'tExist", "hello");
assertThat(result).isEqualTo(0);
}
@Test
- public void testOneSubscriberOneChannelTwoTimes() throws Exception {
- Jedis subscriber = new Jedis("localhost", port);
- Jedis publisher = new Jedis("localhost", port);
- CountDownLatch latch = new CountDownLatch(2);
- MockSubscriber mockSubscriber = new MockSubscriber(latch);
+ public void testOneSubscriberOneChannelTwoTimes() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable1 = () -> subscriber.subscribe(mockSubscriber,
"salutations", "salutations");
Thread subscriberThread = new Thread(runnable1);
subscriberThread.start();
- assertThat(latch.await(2, TimeUnit.SECONDS))
- .as("channel subscriptions were not received")
- .isTrue();
- assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
Long result = publisher.publish("salutations", "hello");
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+
+ waitFor(() -> !subscriberThread.isAlive());
+
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
+ }
- subscriberThread.join(2000);
+ @Test
+ public void testDeadSubscriber() {
+ Jedis deadSubscriber = new Jedis("localhost", port);
- assertThat(subscriberThread.isAlive())
- .as("subscriber1 thread should not be alive")
- .isFalse();
+ MockSubscriber mockSubscriber = new MockSubscriber();
-
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
+ Runnable runnable = () -> {
+ deadSubscriber.subscribe(mockSubscriber, "salutations");
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ deadSubscriber.close();
+
+ waitFor(() -> !deadSubscriber.isConnected());
+ Long result = publisher.publish("salutations", "hello");
+
+ assertThat(result).isEqualTo(0);
+ assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
}
@Test
- public void testDeadSubscriber() throws InterruptedException {
- Jedis subscriber = new Jedis("localhost", port);
- Jedis publisher = new Jedis("localhost", port);
+ public void testPatternSubscribe() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+
+ Runnable runnable = () -> {
+ subscriber.psubscribe(mockSubscriber, "sal*s");
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
- CountDownLatch latch = new CountDownLatch(1);
- MockSubscriber mockSubscriber = new MockSubscriber(latch);
+ Long result = publisher.publish("salutations", "hello");
+ assertThat(result).isEqualTo(1);
+
+ assertThat(mockSubscriber.getReceivedMessages()).hasSize(1);
+ assertThat(mockSubscriber.getReceivedMessages()).contains("hello");
+
+ mockSubscriber.punsubscribe("sal*s");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
+ }
+
+ @Test
+ public void testPatternAndRegularSubscribe() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
Runnable runnable = () -> {
subscriber.subscribe(mockSubscriber, "salutations");
@@ -234,21 +242,57 @@ public class PubSubTest {
Thread subscriberThread = new Thread(runnable);
subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
- assertThat(latch.await(2, TimeUnit.SECONDS))
- .as("channel subscriptions were not received")
- .isTrue();
+ mockSubscriber.psubscribe("sal*s");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
- subscriber.close();
- subscriberThread.join(2000);
+ Long result = publisher.publish("salutations", "hello");
+ assertThat(result).isEqualTo(2);
+
+ mockSubscriber.punsubscribe("sal*s");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ mockSubscriber.unsubscribe("salutations");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+
+ waitFor(() -> !subscriberThread.isAlive());
+
+ assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello",
"hello");
+ }
+
+ @Test
+ public void testPatternWithoutAGlob() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
- assertThat(subscriberThread.isAlive())
- .as("subscriber thread should not be alive")
- .isFalse();
+ Runnable runnable = () -> {
+ subscriber.subscribe(mockSubscriber, "salutations");
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ mockSubscriber.psubscribe("salutations");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
Long result = publisher.publish("salutations", "hello");
- assertThat(result).isEqualTo(0);
+ assertThat(result).isEqualTo(2);
- assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
+ mockSubscriber.punsubscribe("salutations");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ mockSubscriber.unsubscribe("salutations");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+
+ waitFor(() -> !subscriberThread.isAlive());
+
+ assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello",
"hello");
+ }
+
+ private void waitFor(Callable<Boolean> booleanCallable) {
+ await().atMost(1, TimeUnit.SECONDS)
+ .ignoreExceptions() // ignoring socket closed exceptions
+ .until(booleanCallable);
}
}
diff --git
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index 74eda78..a1e9d41 100644
---
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -18,29 +18,23 @@ package org.apache.geode.redis.mocks;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import redis.clients.jedis.JedisPubSub;
public class MockSubscriber extends JedisPubSub {
- private CountDownLatch latch;
- private List<String> receivedMessages = new ArrayList<String>();
-
- public MockSubscriber(CountDownLatch latch) {
- this.latch = latch;
- }
+ private List<String> receivedMessages = new ArrayList<>();
public List<String> getReceivedMessages() {
return receivedMessages;
}
@Override
- public void onSubscribe(String channel, int subscribedChannels) {
- latch.countDown();
+ public void onMessage(String channel, String message) {
+ receivedMessages.add(message);
}
@Override
- public void onMessage(String channel, String message) {
+ public void onPMessage(String pattern, String channel, String message) {
receivedMessages.add(message);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
index 75a7e6a..0c639e5 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/GeodeRedisServer.java
@@ -75,6 +75,7 @@ import org.apache.geode.redis.internal.PubSub;
import org.apache.geode.redis.internal.PubSubImpl;
import org.apache.geode.redis.internal.RedisDataType;
import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.Subscriptions;
/**
* The GeodeRedisServer is a server that understands the Redis protocol. As
commands are sent to the
@@ -450,7 +451,7 @@ public class GeodeRedisServer {
throw assErr;
}
this.keyRegistrar = new KeyRegistrar(redisMetaData);
- this.pubSub = new PubSubImpl();
+ this.pubSub = new PubSubImpl(new Subscriptions());
this.regionCache = new RegionProvider(stringsRegion, hLLRegion,
this.keyRegistrar,
expirationFutures, expirationExecutor, this.DEFAULT_REGION_TYPE);
redisMetaData.put(REDIS_META_DATA_REGION, RedisDataType.REDIS_PROTECTED);
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriber.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
similarity index 68%
rename from
geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriber.java
rename to
geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
index 07bcfc0..c8514d5 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriber.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
@@ -25,32 +25,39 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.logging.internal.log4j.api.LogService;
-class Subscriber {
+public abstract class AbstractSubscription implements Subscription {
private static final Logger logger = LogService.getLogger();
- public final Client client;
- public final String channel;
- private ExecutionHandlerContext context;
+ private final Client client;
+ private final ExecutionHandlerContext context;
- public Subscriber(Client client, String channel, ExecutionHandlerContext
context) {
+ AbstractSubscription(Client client, ExecutionHandlerContext context) {
+ if (client == null) {
+ throw new IllegalArgumentException("client cannot be null");
+ }
+ if (context == null) {
+ throw new IllegalArgumentException("context cannot be null");
+ }
this.client = client;
- this.channel = channel;
this.context = context;
}
- public boolean isEqualTo(String channel, Client client) {
- if (channel == null || client == null) {
- return false;
- }
- return channel.equals(this.channel) && client.equals(this.client);
- }
-
- public boolean publishMessage(String channel, String message) {
+ @Override
+ public PublishResult publishMessage(String channel, String message) {
ByteBuf messageByteBuffer = constructResponse(channel, message);
if (messageByteBuffer == null) {
- return false;
+ return new PublishResult(client, false);
}
- return writeToChannelSynchronously(messageByteBuffer);
+ return new PublishResult(client,
writeToChannelSynchronously(messageByteBuffer));
+ }
+
+ Client getClient() {
+ return client;
+ }
+
+ @Override
+ public boolean matchesClient(Client client) {
+ return this.client.equals(client);
}
private ByteBuf constructResponse(String channel, String message) {
@@ -66,9 +73,9 @@ class Subscriber {
}
/**
- * This method turns the response into a synchronous call. We want to
- * determine if the response, to the client, resulted in an error - for
example if the client has
- * disconnected and the write fails. In such cases we need to be able to
notify the caller.
+ * This method turns the response into a synchronous call. We want to
determine if the response,
+ * to the client, resulted in an error - for example if the client has
disconnected and the write
+ * fails. In such cases we need to be able to notify the caller.
*/
private boolean writeToChannelSynchronously(ByteBuf messageByteBuffer) {
ChannelFuture channelFuture = context.writeToChannel(messageByteBuffer);
@@ -78,6 +85,7 @@ class Subscriber {
} catch (InterruptedException | ExecutionException e) {
return false;
}
+
return channelFuture.cause() == null;
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
similarity index 55%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to
geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
index 494bce3..b64f132 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
@@ -11,39 +11,35 @@
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
+ *
*/
package org.apache.geode.redis.internal;
-import java.util.Objects;
-
-import io.netty.channel.Channel;
+/**
+ * This class represents a single channel subscription as created by the
SUBSCRIBE command
+ */
+class ChannelSubscription extends AbstractSubscription {
+ private String channel;
-class Client {
- private Channel channel;
+ public ChannelSubscription(Client client, String channel,
ExecutionHandlerContext context) {
+ super(client, context);
- public Client(Channel remoteAddress) {
- this.channel = remoteAddress;
+ if (channel == null) {
+ throw new IllegalArgumentException("channel cannot be null");
+ }
+ this.channel = channel;
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Client client = (Client) o;
- return Objects.equals(channel, client.channel);
+ public boolean isEqualTo(Object channelOrPattern, Client client) {
+ return this.channel != null && this.channel.equals(channelOrPattern)
+ && this.getClient().equals(client);
}
@Override
- public int hashCode() {
- return Objects.hash(channel);
+ public boolean matches(String channel) {
+ return this.channel.equals(channel);
}
- public boolean isDead() {
- return !this.channel.isOpen();
- }
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
index 494bce3..44a31a7 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
@@ -19,7 +19,7 @@ import java.util.Objects;
import io.netty.channel.Channel;
-class Client {
+public class Client {
private Channel channel;
public Client(Channel remoteAddress) {
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
similarity index 53%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to
geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
index 494bce3..212eeaa 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
@@ -11,39 +11,37 @@
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
+ *
*/
package org.apache.geode.redis.internal;
-import java.util.Objects;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
-import io.netty.channel.Channel;
+/**
+ * This class represents a pattern subscription as created by the PSUBSCRIBE
command
+ */
+class PatternSubscription extends AbstractSubscription {
+ final GlobPattern pattern;
-class Client {
- private Channel channel;
+ public PatternSubscription(Client client, GlobPattern pattern,
ExecutionHandlerContext context) {
+ super(client, context);
- public Client(Channel remoteAddress) {
- this.channel = remoteAddress;
+ if (pattern == null) {
+ throw new IllegalArgumentException("pattern cannot be null");
+ }
+ this.pattern = pattern;
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Client client = (Client) o;
- return Objects.equals(channel, client.channel);
+ public boolean isEqualTo(Object channelOrPattern, Client client) {
+ return this.pattern != null && this.pattern.equals(channelOrPattern)
+ && this.getClient().equals(client);
}
@Override
- public int hashCode() {
- return Objects.hash(channel);
+ public boolean matches(String channel) {
+ return pattern.matches(channel);
}
- public boolean isDead() {
- return !this.channel.isOpen();
- }
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java
index 78a695e..4462d83 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSub.java
@@ -16,6 +16,8 @@
package org.apache.geode.redis.internal;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
+
/**
* Interface that represents the ability to Publish, Subscribe and Unsubscribe
from channels.
*/
@@ -41,6 +43,16 @@ public interface PubSub {
long subscribe(String channel, ExecutionHandlerContext context, Client
client);
/**
+ * Subscribe to a pattern
+ *
+ * @param pattern glob pattern to subscribe to
+ * @param context ExecutionHandlerContext which will handle the client
response
+ * @param client a Client instance making the request
+ * @return the number of channels subscribed to
+ */
+ long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client
client);
+
+ /**
* Unsubscribe a client from a channel
*
* @param channel the channel to unsubscribe from
@@ -48,4 +60,13 @@ public interface PubSub {
* @return the number of channels still subscribed to by the client
*/
long unsubscribe(String channel, Client client);
+
+ /**
+ * Unsubscribe from a previously subscribed pattern
+ *
+ * @param pattern the channel to unsubscribe from
+ * @param client the Client which is to be unsubscribed
+ * @return the number of channels still subscribed to by the client
+ */
+ long punsubscribe(GlobPattern pattern, Client client);
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java
index 46cee7b..5582cdf 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/PubSubImpl.java
@@ -16,15 +16,16 @@
package org.apache.geode.redis.internal;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
/**
* Concrete class that manages publish and subscribe functionality. Since
Redis subscriptions
@@ -34,9 +35,11 @@ import org.apache.geode.cache.execute.ResultCollector;
public class PubSubImpl implements PubSub {
public static final String REDIS_PUB_SUB_FUNCTION_ID =
"redisPubSubFunctionID";
- private Subscribers subscribers = new Subscribers();
+ private final Subscriptions subscriptions;
+
+ public PubSubImpl(Subscriptions subscriptions) {
+ this.subscriptions = subscriptions;
- public PubSubImpl() {
registerPublishFunction();
}
@@ -54,12 +57,23 @@ public class PubSubImpl implements PubSub {
@Override
public long subscribe(String channel, ExecutionHandlerContext context,
Client client) {
- if (subscribers.exists(channel, client)) {
- return subscribers.findSubscribers(client).size();
+ if (subscriptions.exists(channel, client)) {
+ return subscriptions.findSubscriptions(client).size();
+ }
+ Subscription subscription = new ChannelSubscription(client, channel,
context);
+ subscriptions.add(subscription);
+ return subscriptions.findSubscriptions(client).size();
+ }
+
+ @Override
+ public long psubscribe(GlobPattern pattern, ExecutionHandlerContext context,
Client client) {
+ if (subscriptions.exists(pattern, client)) {
+ return subscriptions.findSubscriptions(client).size();
}
- Subscriber subscriber = new Subscriber(client, channel, context);
- subscribers.add(subscriber);
- return subscribers.findSubscribers(client).size();
+ Subscription subscription = new PatternSubscription(client, pattern,
context);
+ subscriptions.add(subscription);
+
+ return subscriptions.findSubscriptions(client).size();
}
private void registerPublishFunction() {
@@ -80,57 +94,36 @@ public class PubSubImpl implements PubSub {
@Override
public long unsubscribe(String channel, Client client) {
- this.subscribers.remove(channel, client);
- return this.subscribers.findSubscribers(client).size();
+ this.subscriptions.remove(channel, client);
+ return this.subscriptions.findSubscriptions(client).size();
}
- private long publishMessageToSubscribers(String channel, String message) {
- Map<Boolean, List<Subscriber>> results = this.subscribers
- .findSubscribers(channel)
+ @Override
+ public long punsubscribe(GlobPattern pattern, Client client) {
+ this.subscriptions.remove(pattern, client);
+ return this.subscriptions.findSubscriptions(client).size();
+ }
+
+ @VisibleForTesting
+ long publishMessageToSubscribers(String channel, String message) {
+
+ Map<Boolean, List<PublishResult>> results = this.subscriptions
+ .findSubscriptions(channel)
.stream()
- .collect(
- Collectors.partitioningBy(subscriber ->
subscriber.publishMessage(channel, message)));
+ .map(subscription -> subscription.publishMessage(channel, message))
+ .collect(Collectors.partitioningBy(PublishResult::isSuccessful));
prune(results.get(false));
return results.get(true).size();
}
- private void prune(List<Subscriber> failedSubscribers) {
- failedSubscribers.forEach(subscriber -> {
- if (subscriber.client.isDead()) {
- subscribers.remove(subscriber.client);
+ private void prune(List<PublishResult> failedSubscriptions) {
+ failedSubscriptions.forEach(publishResult -> {
+ Client client = publishResult.getClient();
+ if (client.isDead()) {
+ subscriptions.remove(client);
}
});
}
-
- private class Subscribers {
- List<Subscriber> subscribers = new ArrayList<>();
-
- private boolean exists(String channel, Client client) {
- return subscribers.stream().anyMatch(subscriber ->
subscriber.isEqualTo(channel, client));
- }
-
- private List<Subscriber> findSubscribers(Client client) {
- return subscribers.stream().filter(subscriber ->
subscriber.client.equals(client))
- .collect(Collectors.toList());
- }
-
- private List<Subscriber> findSubscribers(String channel) {
- return subscribers.stream().filter(subscriber ->
subscriber.channel.equals(channel))
- .collect(Collectors.toList());
- }
-
- public void add(Subscriber subscriber) {
- this.subscribers.add(subscriber);
- }
-
- public void remove(String channel, Client client) {
- this.subscribers.removeIf(subscriber -> subscriber.isEqualTo(channel,
client));
- }
-
- public void remove(Client client) {
- this.subscribers.removeIf(subscriber ->
subscriber.client.equals(client));
- }
- }
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PublishResult.java
similarity index 59%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to
geode-redis/src/main/java/org/apache/geode/redis/internal/PublishResult.java
index 494bce3..d4aa15b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PublishResult.java
@@ -15,35 +15,24 @@
package org.apache.geode.redis.internal;
-import java.util.Objects;
-
-import io.netty.channel.Channel;
-
-class Client {
- private Channel channel;
-
- public Client(Channel remoteAddress) {
- this.channel = remoteAddress;
- }
+/**
+ * Represents the results of publishing a message to a subscription. Contains
the client the message
+ * was published to as well as whether or not the message was published
successfully.
+ */
+public class PublishResult {
+ private final Client client;
+ private final boolean result;
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Client client = (Client) o;
- return Objects.equals(channel, client.channel);
+ public PublishResult(Client client, boolean result) {
+ this.client = client;
+ this.result = result;
}
- @Override
- public int hashCode() {
- return Objects.hash(channel);
+ public Client getClient() {
+ return client;
}
- public boolean isDead() {
- return !this.channel.isOpen();
+ public boolean isSuccessful() {
+ return result;
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index fc7a0ab..aa8bb2a 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -66,7 +66,9 @@ import
org.apache.geode.redis.internal.executor.list.LTrimExecutor;
import org.apache.geode.redis.internal.executor.list.RPopExecutor;
import org.apache.geode.redis.internal.executor.list.RPushExecutor;
import org.apache.geode.redis.internal.executor.list.RPushXExecutor;
+import org.apache.geode.redis.internal.executor.pubsub.PsubscribeExecutor;
import org.apache.geode.redis.internal.executor.pubsub.PublishExecutor;
+import org.apache.geode.redis.internal.executor.pubsub.PunsubscribeExecutor;
import org.apache.geode.redis.internal.executor.pubsub.SubscribeExecutor;
import org.apache.geode.redis.internal.executor.pubsub.UnsubscribeExecutor;
import org.apache.geode.redis.internal.executor.set.SAddExecutor;
@@ -2624,6 +2626,10 @@ public enum RedisCommandType {
return this.dataType;
}
},
+
+ /**
+ * PUBLISH channel message
+ */
PUBLISH {
private Executor executor;
@@ -2642,6 +2648,10 @@ public enum RedisCommandType {
return this.dataType;
}
},
+
+ /**
+ * UNSUBSCRIBE channel...
+ */
UNSUBSCRIBE {
private Executor executor;
@@ -2661,6 +2671,52 @@ public enum RedisCommandType {
}
},
+ /**
+ * PSUBSCRIBE channel-pattern...
+ * <p>
+ * subscribe to channel
+ */
+ PSUBSCRIBE {
+ private Executor executor;
+
+ @Override
+ public Executor getExecutor() {
+ if (executor == null) {
+ executor = new PsubscribeExecutor();
+ }
+ return executor;
+ }
+
+ private final RedisDataType dataType = RedisDataType.REDIS_PUBSUB;
+
+ @Override
+ public RedisDataType getDataType() {
+ return this.dataType;
+ }
+ },
+
+ /**
+ * PUNSUBSCRIBE channel...
+ */
+ PUNSUBSCRIBE {
+ private Executor executor;
+
+ @Override
+ public Executor getExecutor() {
+ if (executor == null) {
+ executor = new PunsubscribeExecutor();
+ }
+ return executor;
+ }
+
+ private final RedisDataType dataType = RedisDataType.REDIS_PUBSUB;
+
+ @Override
+ public RedisDataType getDataType() {
+ return this.dataType;
+ }
+ },
+
/**************************************
* Geospatial commands ****************
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
similarity index 54%
copy from geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
copy to
geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
index 494bce3..46be25a 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Client.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
@@ -11,39 +11,33 @@
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
+ *
*/
package org.apache.geode.redis.internal;
-import java.util.Objects;
-
-import io.netty.channel.Channel;
-
-class Client {
- private Channel channel;
-
- public Client(Channel remoteAddress) {
- this.channel = remoteAddress;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Client client = (Client) o;
- return Objects.equals(channel, client.channel);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(channel);
- }
-
- public boolean isDead() {
- return !this.channel.isOpen();
- }
+/**
+ * Interface that represents the relationship between a channel or pattern and
client.
+ */
+public interface Subscription {
+ /**
+ * Equality of a subscription is represented by a combination of client and
one of channel or
+ * pattern
+ */
+ boolean isEqualTo(Object channelOrPattern, Client client);
+
+ /**
+ * Will publish a message to the designated channel.
+ */
+ PublishResult publishMessage(String channel, String message);
+
+ /**
+ * Verifies that the subscription is established with the designated client.
+ */
+ boolean matchesClient(Client client);
+
+ /**
+ * Verifies that the subscription channel or pattern matches the designated
channel.
+ */
+ boolean matches(String channel);
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
new file mode 100644
index 0000000..b66770a
--- /dev/null
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Class that manages both channel and pattern subscriptions.
+ */
+public class Subscriptions {
+ private List<Subscription> subscriptions = new ArrayList<>();
+
+ /**
+ * Check whether a given client has already subscribed to a channel or
pattern
+ *
+ * @param channelOrPattern channel or pattern
+ * @param client a client connection
+ */
+ public boolean exists(Object channelOrPattern, Client client) {
+ return subscriptions.stream()
+ .anyMatch(subscription -> subscription.isEqualTo(channelOrPattern,
client));
+ }
+
+ /**
+ * Return all subscriptions for a given client
+ *
+ * @param client the subscribed client
+ * @return a list of subscriptions
+ */
+ public List<Subscription> findSubscriptions(Client client) {
+ return subscriptions.stream()
+ .filter(subscription -> subscription.matchesClient(client))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Return all subscriptions for a given channel or pattern
+ *
+ * @param channelOrPattern the channel or pattern
+ * @return a list of subscriptions
+ */
+ public List<Subscription> findSubscriptions(String channelOrPattern) {
+ return this.subscriptions.stream()
+ .filter(subscription -> subscription.matches(channelOrPattern))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Add a new subscription
+ */
+ public void add(Subscription subscription) {
+ this.subscriptions.add(subscription);
+ }
+
+ /**
+ * Remove all subscriptions for a given client
+ */
+ public void remove(Client client) {
+ this.subscriptions.removeIf(subscription ->
subscription.matchesClient(client));
+ }
+
+ /**
+ * Remove a single subscription
+ */
+ public void remove(Object channel, Client client) {
+ this.subscriptions.removeIf(subscription ->
subscription.isEqualTo(channel, client));
+ }
+}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
new file mode 100644
index 0000000..5a91146
--- /dev/null
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import java.util.ArrayList;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.Coder;
+import org.apache.geode.redis.internal.CoderException;
+import org.apache.geode.redis.internal.Command;
+import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
+
+public class PsubscribeExecutor extends AbstractExecutor {
+ private static final Logger logger = LogService.getLogger();
+
+ @Override
+ public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ ArrayList<ArrayList<Object>> items = new ArrayList<>();
+ for (int i = 1; i < command.getProcessedCommand().size(); i++) {
+ ArrayList<Object> item = new ArrayList<>();
+ byte[] pattern = command.getProcessedCommand().get(i);
+ long subscribedChannels =
+ context.getPubSub().psubscribe(
+ new GlobPattern(new String(pattern)), context,
context.getClient());
+
+ item.add("psubscribe");
+ item.add(pattern);
+ item.add(subscribedChannels);
+
+ items.add(item);
+ }
+
+ writeResponse(command, context, items);
+ }
+
+ private void writeResponse(Command command, ExecutionHandlerContext context,
+ ArrayList<ArrayList<Object>> items) {
+ ByteBuf aggregatedResponse = context.getByteBufAllocator().buffer();
+ items.forEach(item -> {
+ ByteBuf response = null;
+ try {
+ response = Coder.getArrayResponse(context.getByteBufAllocator(), item);
+ } catch (CoderException e) {
+ logger.warn("Error encoding subscribe response", e);
+ }
+ aggregatedResponse.writeBytes(response);
+ });
+ command.setResponse(aggregatedResponse);
+ }
+
+}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
index 0f8b585..75b6b26 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
@@ -22,10 +22,10 @@ import io.netty.buffer.ByteBuf;
import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.Executor;
import org.apache.geode.redis.internal.RedisConstants.ArityDef;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
-public class PublishExecutor implements Executor {
+public class PublishExecutor extends AbstractExecutor {
@Override
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
similarity index 53%
copy from
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
copy to
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
index 0f8b585..0f1c1a3 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
@@ -11,41 +11,46 @@
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
+ *
*/
package org.apache.geode.redis.internal.executor.pubsub;
-import java.util.List;
+import java.util.ArrayList;
import io.netty.buffer.ByteBuf;
+import org.apache.logging.log4j.Logger;
+import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.internal.Coder;
+import org.apache.geode.redis.internal.CoderException;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.Executor;
-import org.apache.geode.redis.internal.RedisConstants.ArityDef;
-
-public class PublishExecutor implements Executor {
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
+public class PunsubscribeExecutor extends AbstractExecutor {
+ private static final Logger logger = LogService.getLogger();
@Override
public void executeCommand(Command command, ExecutionHandlerContext context)
{
- List<byte[]> args = command.getProcessedCommand();
- if (args.size() != 3) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
ArityDef.PUBLISH));
- return;
+ byte[] pattern = command.getProcessedCommand().get(1);
+ long subscriptionCount =
+ context
+ .getPubSub()
+ .punsubscribe(new GlobPattern(new String(pattern)),
context.getClient());
+
+ ArrayList<Object> items = new ArrayList<>();
+ items.add("punsubscribe");
+ items.add(pattern);
+ items.add(subscriptionCount);
+
+ ByteBuf response = null;
+ try {
+ response = Coder.getArrayResponse(context.getByteBufAllocator(), items);
+ } catch (CoderException e) {
+ logger.warn("Error encoding unsubscribe response", e);
}
-
- String channelName = new String(args.get(1));
- String message = new String(args.get(2));
- long publishCount = context.getPubSub().publish(channelName, message);
-
- writeResponse(command, context, publishCount);
- }
-
- private void writeResponse(Command command, ExecutionHandlerContext context,
- long publishCount) {
- ByteBuf response = Coder.getIntegerResponse(context.getByteBufAllocator(),
publishCount);
command.setResponse(response);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
index 94bc21b..7cedb5c 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
@@ -15,6 +15,7 @@
package org.apache.geode.redis.internal.org.apache.hadoop.fs;
+import java.util.Objects;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -159,6 +160,23 @@ public class GlobPattern {
return hasWildcard;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GlobPattern)) {
+ return false;
+ }
+ GlobPattern that = (GlobPattern) o;
+ return this.compiled.pattern().equals(that.compiled.pattern());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(compiled, hasWildcard);
+ }
+
private static void error(String message, String pattern, int pos) {
throw new PatternSyntaxException(message, pattern, pos);
}
diff --git
a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
index c98a91f..1883d15 100755
---
a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
+++
b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
@@ -31,6 +31,8 @@
org/apache/geode/redis/internal/RedisCommandType$12,false,dataType:org/apache/ge
org/apache/geode/redis/internal/RedisCommandType$120,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
org/apache/geode/redis/internal/RedisCommandType$121,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
org/apache/geode/redis/internal/RedisCommandType$122,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
+org/apache/geode/redis/internal/RedisCommandType$123,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
+org/apache/geode/redis/internal/RedisCommandType$124,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
org/apache/geode/redis/internal/RedisCommandType$13,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
org/apache/geode/redis/internal/RedisCommandType$14,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
org/apache/geode/redis/internal/RedisCommandType$15,false,dataType:org/apache/geode/redis/internal/RedisDataType,executor:org/apache/geode/redis/internal/Executor
diff --git
a/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
new file mode 100644
index 0000000..e667da0
--- /dev/null
+++
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+
+public class PubSubImplJUnitTest {
+
+ @Test
+ public void testSubscriptionWithDeadClientIsPruned() {
+ Subscriptions subscriptions = new Subscriptions();
+ ExecutionHandlerContext mockExecutionHandlerContext =
mock(ExecutionHandlerContext.class);
+
+ Client deadClient = mock(Client.class);
+ when(deadClient.isDead()).thenReturn(true);
+
+ ChannelSubscription subscription =
+ spy(new ChannelSubscription(deadClient,
+ "sally", mockExecutionHandlerContext));
+
+ doReturn(new PublishResult(deadClient,
false)).when(subscription).publishMessage(any(), any());
+ subscriptions.add(subscription);
+
+ PubSubImpl subject = new PubSubImpl(subscriptions);
+
+ Long numberOfSubscriptions = subject.publishMessageToSubscribers("sally",
"message");
+
+ assertThat(numberOfSubscriptions).isEqualTo(0);
+ assertThat(subscriptions.findSubscriptions(deadClient)).isEmpty();
+ }
+}
diff --git
a/geode-redis/src/test/java/org/apache/geode/redis/internal/SubscriptionsTest.java
b/geode-redis/src/test/java/org/apache/geode/redis/internal/SubscriptionsTest.java
new file mode 100644
index 0000000..f216a04
--- /dev/null
+++
b/geode-redis/src/test/java/org/apache/geode/redis/internal/SubscriptionsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import io.netty.channel.Channel;
+import org.junit.Test;
+
+import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
+
+public class SubscriptionsTest {
+
+ @Test
+ public void correctlyIdentifiesChannelSubscriber() {
+ Subscriptions subscriptions = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ Client client = new Client(channel);
+
+ subscriptions.add(new ChannelSubscription(client, "subscriptions",
context));
+
+ assertThat(subscriptions.exists("subscriptions", client)).isTrue();
+ assertThat(subscriptions.exists("unknown", client)).isFalse();
+ }
+
+ @Test
+ public void correctlyIdentifiesPatternSubscriber() {
+ Subscriptions subscriptions = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ Client client = new Client(channel);
+ GlobPattern pattern = new GlobPattern("sub*s");
+
+ subscriptions.add(new PatternSubscription(client, pattern, context));
+
+ assertThat(subscriptions.exists(pattern, client)).isTrue();
+ }
+
+ @Test
+ public void doesNotMisidentifyChannelAsPattern() {
+ Subscriptions subscriptions = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ Client client = new Client(channel);
+ GlobPattern globPattern1 = new GlobPattern("sub*s");
+ GlobPattern globPattern2 = new GlobPattern("subscriptions");
+
+ subscriptions.add(new ChannelSubscription(client, "subscriptions",
context));
+
+ assertThat(subscriptions.exists(globPattern1, client)).isFalse();
+ assertThat(subscriptions.exists(globPattern2, client)).isFalse();
+ }
+
+ @Test
+ public void doesNotMisidentifyWhenBothTypesArePresent() {
+ Subscriptions subscriptions = new Subscriptions();
+
+ Channel channel = mock(Channel.class);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ Client client = new Client(channel);
+ GlobPattern globby = new GlobPattern("sub*s");
+
+ subscriptions.add(new ChannelSubscription(client, "subscriptions",
context));
+ subscriptions.add(new PatternSubscription(client, globby, context));
+
+ assertThat(subscriptions.exists(globby, client)).isTrue();
+ assertThat(subscriptions.exists("subscriptions", client)).isTrue();
+ }
+
+
+ @Test
+ public void verifyDifferentMockChannelsNotEqual() {
+ Channel mockChannelOne = mock(Channel.class);
+ Channel mockChannelTwo = mock(Channel.class);
+
+ assertThat(mockChannelOne).isNotEqualTo(mockChannelTwo);
+ assertThat(mockChannelOne.equals(mockChannelTwo)).isFalse();
+ }
+
+ @Test
+ public void findSubscribers() {
+ Subscriptions subscriptions = new Subscriptions();
+
+ Channel mockChannelOne = mock(Channel.class);
+ Channel mockChannelTwo = mock(Channel.class);
+
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ Client clientOne = new Client(mockChannelOne);
+ Client clientTwo = new Client(mockChannelTwo);
+
+ ChannelSubscription subscriptionOne =
+ new ChannelSubscription(clientOne, "subscriptions", context);
+ ChannelSubscription subscriptionTwo = new ChannelSubscription(clientTwo,
"monkeys", context);
+
+ subscriptions.add(subscriptionOne);
+ subscriptions.add(subscriptionTwo);
+
+
assertThat(subscriptions.findSubscriptions(clientOne)).containsExactly(subscriptionOne);
+ }
+
+ @Test
+ public void removeByClient() {
+ Subscriptions subscriptions = new Subscriptions();
+ Channel mockChannelOne = mock(Channel.class);
+ Channel mockChannelTwo = mock(Channel.class);
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ Client clientOne = new Client(mockChannelOne);
+ Client clientTwo = new Client(mockChannelTwo);
+
+ ChannelSubscription subscriptionOne =
+ new ChannelSubscription(clientOne, "subscriptions", context);
+ ChannelSubscription subscriptionTwo = new ChannelSubscription(clientTwo,
"monkeys", context);
+
+ subscriptions.add(subscriptionOne);
+ subscriptions.add(subscriptionTwo);
+
+ subscriptions.remove(clientOne);
+
+ assertThat(subscriptions.findSubscriptions(clientOne)).isEmpty();
+
assertThat(subscriptions.findSubscriptions(clientTwo)).containsExactly(subscriptionTwo);
+ }
+
+ @Test
+ public void removeByClientAndPattern() {
+
+ Subscriptions subscriptions = new Subscriptions();
+ Channel mockChannelOne = mock(Channel.class);
+
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ Client client = new Client(mockChannelOne);
+
+ ChannelSubscription channelSubscriberOne =
+ new ChannelSubscription(client, "subscriptions", context);
+ GlobPattern pattern = new GlobPattern("monkeys");
+ PatternSubscription patternSubscriber = new PatternSubscription(client,
+ pattern, context);
+ ChannelSubscription channelSubscriberTwo = new ChannelSubscription(client,
"monkeys", context);
+
+ subscriptions.add(channelSubscriberOne);
+ subscriptions.add(patternSubscriber);
+ subscriptions.add(channelSubscriberTwo);
+
+ subscriptions.remove(pattern, client);
+
+ assertThat(subscriptions
+ .findSubscriptions(client))
+ .containsExactlyInAnyOrder(
+ channelSubscriberOne,
+ channelSubscriberTwo);
+ }
+}