This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6aa2420 Introduce RedisResponse to SET executors (#5114)
6aa2420 is described below
commit 6aa2420842249184cc94915e1b207c44b130e2bb
Author: Jens Deppe <[email protected]>
AuthorDate: Tue May 19 13:06:44 2020 -0700
Introduce RedisResponse to SET executors (#5114)
---
.../org/apache/geode/redis/internal/Command.java | 4 +-
.../redis/internal/ExecutionHandlerContext.java | 97 +++++++++++-----------
.../org/apache/geode/redis/internal/Executor.java | 15 +++-
.../geode/redis/internal/RedisCommandType.java | 10 ++-
.../apache/geode/redis/internal/RedisResponse.java | 89 ++++++++++++++++++++
.../redis/internal/executor/AbstractExecutor.java | 9 ++
.../redis/internal/executor/AuthExecutor.java | 18 ++--
.../redis/internal/executor/QuitExecutor.java | 8 +-
.../redis/internal/executor/set/SAddExecutor.java | 7 +-
.../redis/internal/executor/set/SCardExecutor.java | 9 +-
.../internal/executor/set/SIsMemberExecutor.java | 13 ++-
.../internal/executor/set/SMembersExecutor.java | 14 +---
.../redis/internal/executor/set/SMoveExecutor.java | 65 ++++++---------
.../redis/internal/executor/set/SPopExecutor.java | 25 ++----
.../internal/executor/set/SRandMemberExecutor.java | 35 ++++----
.../redis/internal/executor/set/SRemExecutor.java | 11 ++-
.../redis/internal/executor/set/SScanExecutor.java | 28 +++----
.../redis/internal/executor/set/SetOpExecutor.java | 43 +++++-----
.../geode/redis/internal/PubSubImplJUnitTest.java | 3 +-
19 files changed, 295 insertions(+), 208 deletions(-)
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java
index 5dd8fa4..89a6137 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java
@@ -171,9 +171,9 @@ public class Command {
return builder.toString();
}
- public void execute(ExecutionHandlerContext executionHandlerContext) {
+ public RedisResponse execute(ExecutionHandlerContext
executionHandlerContext) {
RedisCommandType type = getCommandType();
- type.executeCommand(this, executionHandlerContext);
+ return type.executeCommand(this, executionHandlerContext);
}
boolean isOfType(RedisCommandType type) {
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
index 1998371..471dde2 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.redis.internal;
+import static org.apache.geode.redis.internal.RedisCommandType.PUBLISH;
+
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -55,8 +57,8 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
private static final Logger logger = LogService.getLogger();
private static final int WAIT_REGION_DSTRYD_MILLIS = 100;
- private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) /
WAIT_REGION_DSTRYD_MILLIS; // 60
- // seconds
+ private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) /
WAIT_REGION_DSTRYD_MILLIS;
+
private final RedisLockService lockService;
private final Cache cache;
@@ -136,6 +138,10 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
return channel.writeAndFlush(message, channel.newPromise());
}
+ public ChannelFuture writeToChannel(RedisResponse response) {
+ return channel.writeAndFlush(response.encode(byteBufAllocator),
channel.newPromise());
+ }
+
/**
* This will handle the execution of received commands
*/
@@ -164,12 +170,12 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
channelInactive(ctx);
return;
}
- ByteBuf response = getExceptionResponse(ctx, cause);
- writeToChannel(response);
+ writeToChannel(getExceptionResponse(ctx, cause));
}
- private ByteBuf getExceptionResponse(ChannelHandlerContext ctx, Throwable
cause) {
- ByteBuf response;
+ private RedisResponse getExceptionResponse(ChannelHandlerContext ctx,
Throwable cause) {
+ RedisResponse response;
+
if (cause instanceof FunctionException) {
Throwable th = cause.getCause();
if (th == null) {
@@ -184,28 +190,26 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
}
if (cause instanceof RedisDataTypeMismatchException) {
- response = Coder.getWrongTypeResponse(this.byteBufAllocator,
cause.getMessage());
+ response = RedisResponse.wrongType(cause.getMessage());
} else if (cause instanceof DecoderException
&& cause.getCause() instanceof RedisCommandParserException) {
- response =
- Coder.getErrorResponse(this.byteBufAllocator,
RedisConstants.PARSING_EXCEPTION_MESSAGE);
+ response = RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE);
} else if (cause instanceof RegionCreationException) {
- this.logger.error(cause);
- response =
- Coder.getErrorResponse(this.byteBufAllocator,
RedisConstants.ERROR_REGION_CREATION);
+ logger.error(cause);
+ response = RedisResponse.error(RedisConstants.ERROR_REGION_CREATION);
} else if (cause instanceof InterruptedException || cause instanceof
CacheClosedException) {
- response =
- Coder.getErrorResponse(this.byteBufAllocator,
RedisConstants.SERVER_ERROR_SHUTDOWN);
+ response = RedisResponse.error(RedisConstants.SERVER_ERROR_SHUTDOWN);
} else if (cause instanceof IllegalStateException
|| cause instanceof RedisParametersMismatchException) {
- response = Coder.getErrorResponse(this.byteBufAllocator,
cause.getMessage());
+ response = RedisResponse.error(cause.getMessage());
} else {
if (logger.isErrorEnabled()) {
logger.error("GeodeRedisServer-Unexpected error handler for " +
ctx.channel(), cause);
}
- response = Coder.getErrorResponse(this.byteBufAllocator,
RedisConstants.SERVER_ERROR_MESSAGE);
+ response = RedisResponse.error(RedisConstants.SERVER_ERROR_MESSAGE);
}
+
return response;
}
@@ -219,6 +223,7 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
}
private void executeCommand(ChannelHandlerContext ctx, Command command)
throws Exception {
+ RedisResponse response;
if (isAuthenticated) {
if (command.isOfType(RedisCommandType.SHUTDOWN)) {
this.server.shutdown();
@@ -226,35 +231,32 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
}
if (hasTransaction() && !command.isTransactional()) {
- executeWithTransaction(ctx, command);
+ response = executeWithTransaction(ctx, command);
} else {
- executeWithoutTransaction(command);
+ response = executeWithoutTransaction(command);
}
logResponse(command);
if (hasTransaction() && command.isOfType(RedisCommandType.MULTI)) {
- writeToChannel(
- Coder.getSimpleStringResponse(this.byteBufAllocator,
RedisConstants.COMMAND_QUEUED));
- } else {
- // PUBLISH responses are always deferred
- if (command.getCommandType() != RedisCommandType.PUBLISH) {
- ByteBuf response = command.getResponse();
- writeToChannel(response);
- }
+ response = RedisResponse.string(RedisConstants.COMMAND_QUEUED);
}
- } else if (command.isOfType(RedisCommandType.QUIT)) {
- command.execute(this);
- ByteBuf response = command.getResponse();
- writeToChannel(response);
- channelInactive(ctx);
} else if (command.isOfType(RedisCommandType.AUTH)) {
- command.execute(this);
- ByteBuf response = command.getResponse();
- writeToChannel(response);
+ response = command.execute(this);
} else {
- ByteBuf r = Coder.getNoAuthResponse(this.byteBufAllocator,
RedisConstants.ERROR_NOT_AUTH);
- writeToChannel(r);
+ response = RedisResponse.error(RedisConstants.ERROR_NOT_AUTH);
+ }
+
+ // PUBLISH responses are always deferred
+ // TODO: Clean this up once all Executors are using RedisResponse
+ if (response == null && !command.isOfType(PUBLISH)) {
+ writeToChannel(command.getResponse());
+ } else if (response != null) {
+ writeToChannel(response);
+ }
+
+ if (command.isOfType(RedisCommandType.QUIT)) {
+ channelInactive(ctx);
}
}
@@ -295,12 +297,11 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
* @param command Command to execute
* @throws Exception Throws exception if exception is from within execution
and not to be handled
*/
- private void executeWithoutTransaction(Command command) throws Exception {
+ private RedisResponse executeWithoutTransaction(Command command) throws
Exception {
Exception cause = null;
for (int i = 0; i < MAXIMUM_NUM_RETRIES; i++) {
try {
- command.execute(this);
- return;
+ return command.execute(this);
} catch (Exception e) {
logger.error(e);
@@ -314,26 +315,28 @@ public class ExecutionHandlerContext extends
ChannelInboundHandlerAdapter {
throw cause;
}
- private void executeWithTransaction(ChannelHandlerContext ctx,
+ private RedisResponse executeWithTransaction(ChannelHandlerContext ctx,
Command command) throws Exception {
CacheTransactionManager txm = cache.getCacheTransactionManager();
TransactionId transactionId = getTransactionID();
txm.resume(transactionId);
+
+ RedisResponse response;
try {
- command.execute(this);
+ response = command.execute(this);
} catch (UnsupportedOperationInTransactionException e) {
- command.setResponse(Coder.getErrorResponse(this.byteBufAllocator,
- RedisConstants.ERROR_UNSUPPORTED_OPERATION_IN_TRANSACTION));
+ response =
RedisResponse.error(RedisConstants.ERROR_UNSUPPORTED_OPERATION_IN_TRANSACTION);
} catch (TransactionException e) {
- command.setResponse(Coder.getErrorResponse(this.byteBufAllocator,
- RedisConstants.ERROR_TRANSACTION_EXCEPTION));
+ response =
RedisResponse.error(RedisConstants.ERROR_TRANSACTION_EXCEPTION);
} catch (Exception e) {
- ByteBuf response = getExceptionResponse(ctx, e);
- command.setResponse(response);
+ response = getExceptionResponse(ctx, e);
}
+
getTransactionQueue().add(command);
transactionId = txm.suspend();
setTransactionID(transactionId);
+
+ return response;
}
/**
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java
index b8a4542..8a2fe84 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java
@@ -16,7 +16,7 @@ package org.apache.geode.redis.internal;
/**
- * Interface for executors of a {@link Command}
+ * Interface for executors of a {@link Command}.
*/
public interface Executor {
@@ -27,9 +27,18 @@ public interface Executor {
* @param command The command to be executed
* @param context The execution context by which this command is to be
executed
*/
- void executeCommand(Command command, ExecutionHandlerContext context);
+ default void executeCommand(Command command, ExecutionHandlerContext
context) {
+ executeCommandWithResponse(command, context);
+ }
- default void execute(Command command, ExecutionHandlerContext context) {
+ /**
+ * Interim method to transition to returning a {@link RedisResponse}
+ *
+ * TODO: Once all commands are transitioned, one of these methods needs to
be removed.
+ */
+ default RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
executeCommand(command, context);
+ return null;
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index a505888..49eca2d 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -340,18 +340,20 @@ public enum RedisCommandType {
private final Executor executor;
private final ParameterRequirements parameterRequirements;
- private RedisCommandType(Executor executor) {
+ RedisCommandType(Executor executor) {
this(executor, new UnspecifiedParameterRequirements());
}
- private RedisCommandType(Executor executor, ParameterRequirements
parameterRequirements) {
+ RedisCommandType(Executor executor, ParameterRequirements
parameterRequirements) {
this.executor = executor;
this.parameterRequirements = parameterRequirements;
}
- public void executeCommand(Command command, ExecutionHandlerContext
executionHandlerContext) {
+ public RedisResponse executeCommand(Command command,
+ ExecutionHandlerContext executionHandlerContext) {
parameterRequirements.checkParameters(command, executionHandlerContext);
- executor.executeCommand(command, executionHandlerContext);
+
+ return executor.executeCommandWithResponse(command,
executionHandlerContext);
}
public boolean isTransactional() {
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisResponse.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisResponse.java
new file mode 100644
index 0000000..fd31f00
--- /dev/null
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+public class RedisResponse {
+
+ private final Function<ByteBufAllocator, ByteBuf> coderCallback;
+
+ private RedisResponse(Function<ByteBufAllocator, ByteBuf> coderCallback) {
+ this.coderCallback = coderCallback;
+ }
+
+ public ByteBuf encode(ByteBufAllocator allocator) {
+ return coderCallback.apply(allocator);
+ }
+
+ public static RedisResponse integer(long numericValue) {
+ return new RedisResponse((bba) -> Coder.getIntegerResponse(bba,
numericValue));
+ }
+
+ public static RedisResponse string(String stringValue) {
+ return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba,
stringValue));
+ }
+
+ public static RedisResponse bulkString(Object value) {
+ return new RedisResponse((bba) -> {
+ try {
+ return Coder.getBulkStringResponse(bba, value);
+ } catch (CoderException e) {
+ return Coder.getErrorResponse(bba, "Internal server error: " +
e.getMessage());
+ }
+ });
+ }
+
+ public static RedisResponse ok() {
+ return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba,
"OK"));
+ }
+
+ public static RedisResponse nil() {
+ return new RedisResponse(Coder::getNilResponse);
+ }
+
+ public static RedisResponse array(Collection<?> collection) {
+ return new RedisResponse((bba) -> {
+ try {
+ return Coder.getArrayResponse(bba, collection);
+ } catch (CoderException e) {
+ return Coder.getErrorResponse(bba, "Internal server error: " +
e.getMessage());
+ }
+ });
+ }
+
+ public static RedisResponse emptyArray() {
+ return new RedisResponse(Coder::getEmptyArrayResponse);
+ }
+
+ public static RedisResponse error(String error) {
+ return new RedisResponse((bba) -> Coder.getErrorResponse(bba, error));
+ }
+
+ public static RedisResponse wrongType(String error) {
+ return new RedisResponse((bba) -> Coder.getWrongTypeResponse(bba, error));
+ }
+
+ public static RedisResponse scan(List<?> items) {
+ return new RedisResponse((bba) -> Coder.getScanResponse(bba, items));
+ }
+}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
index adc4369..56425a6 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
@@ -29,6 +29,7 @@ import org.apache.geode.redis.internal.Executor;
import org.apache.geode.redis.internal.GeodeRedisServer;
import org.apache.geode.redis.internal.RedisConstants;
import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
import org.apache.geode.redis.internal.RegionProvider;
/**
@@ -159,4 +160,12 @@ public abstract class AbstractExecutor implements Executor
{
command.setResponse(rsp);
}
+
+ protected RedisResponse respondBulkStrings(Object message) {
+ if (message instanceof Collection) {
+ return RedisResponse.array((Collection<?>) message);
+ } else {
+ return RedisResponse.string((String) message);
+ }
+ }
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
index d4e3436..a2b3637 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
@@ -17,37 +17,35 @@ package org.apache.geode.redis.internal.executor;
import java.util.Arrays;
import java.util.List;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.Executor;
import org.apache.geode.redis.internal.RedisConstants;
import org.apache.geode.redis.internal.RedisConstants.ArityDef;
+import org.apache.geode.redis.internal.RedisResponse;
public class AuthExecutor implements Executor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
if (commandElems.size() < 2) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
ArityDef.AUTH));
- return;
+ return RedisResponse.error(ArityDef.AUTH);
}
byte[] password = context.getAuthPassword();
if (password == null) {
- command.setResponse(
- Coder.getErrorResponse(context.getByteBufAllocator(),
RedisConstants.ERROR_NO_PASS));
- return;
+ return RedisResponse.error(RedisConstants.ERROR_NO_PASS);
}
+
boolean correct = Arrays.equals(commandElems.get(1), password);
if (correct) {
context.setAuthenticationVerified();
-
command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(),
"OK"));
+ return RedisResponse.ok();
} else {
- command.setResponse(
- Coder.getErrorResponse(context.getByteBufAllocator(),
RedisConstants.ERROR_INVALID_PWD));
+ return RedisResponse.error(RedisConstants.ERROR_INVALID_PWD);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
index 84ad241..5b677dc 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
@@ -14,17 +14,17 @@
*/
package org.apache.geode.redis.internal.executor;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
public class QuitExecutor extends AbstractExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
- command.setResponse(
- Coder.getSimpleStringResponse(context.getByteBufAllocator(),
RedisConstants.QUIT_RESPONSE));
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
+ return RedisResponse.string(RedisConstants.QUIT_RESPONSE);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
index 0b86071..1a50768 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
@@ -18,14 +18,15 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.RedisResponse;
public class SAddExecutor extends SetExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<ByteArrayWrapper> commandElements =
command.getProcessedCommandWrappers();
@@ -37,6 +38,6 @@ public class SAddExecutor extends SetExecutor {
long entriesAdded = redisSetCommands.sadd(command.getKey(), membersToAdd);
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(),
entriesAdded));
+ return RedisResponse.integer(entriesAdded);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
index f5c313b..f9517b8 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
@@ -16,20 +16,21 @@ package org.apache.geode.redis.internal.executor.set;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
public class SCardExecutor extends SetExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
ByteArrayWrapper key = command.getKey();
checkDataType(key, RedisDataType.REDIS_SET, context);
RedisSetCommands redisSetCommands =
new
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
- int size = redisSetCommands.scard(key);
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(),
size));
+
+ return RedisResponse.integer(redisSetCommands.scard(key));
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
index 85819be..a5f38aa 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
@@ -17,24 +17,31 @@ package org.apache.geode.redis.internal.executor.set;
import java.util.List;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.RedisResponse;
public class SIsMemberExecutor extends SetExecutor {
private static final int EXISTS = 1;
+
private static final int NOT_EXISTS = 0;
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
ByteArrayWrapper key = command.getKey();
+ if (!context.getKeyRegistrar().isRegistered(key)) {
+ return RedisResponse.integer(NOT_EXISTS);
+ }
+
ByteArrayWrapper member = new ByteArrayWrapper(commandElems.get(2));
RedisSetCommands redisSetCommands =
new
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
int result = redisSetCommands.sismember(key, member) ? EXISTS : NOT_EXISTS;
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(),
result));
+
+ return RedisResponse.integer(result);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
index 10b909b..1c367a0 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
@@ -17,26 +17,20 @@ package org.apache.geode.redis.internal.executor.set;
import java.util.Set;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
-import org.apache.geode.redis.internal.CoderException;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
public class SMembersExecutor extends SetExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
ByteArrayWrapper key = command.getKey();
RedisSetCommands redisSetCommands =
new
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
Set<ByteArrayWrapper> members = redisSetCommands.smembers(key);
- try {
-
command.setResponse(Coder.getArrayResponse(context.getByteBufAllocator(),
members));
- } catch (CoderException e) {
- command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- RedisConstants.SERVER_ERROR_MESSAGE));
- }
+ return RedisResponse.array(members);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
index 4da5317..78db927 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
@@ -22,11 +22,11 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.RedisData;
import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
public class SMoveExecutor extends SetExecutor {
@@ -35,7 +35,8 @@ public class SMoveExecutor extends SetExecutor {
private static final int NOT_MOVED = 0;
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
ByteArrayWrapper source = command.getKey();
@@ -47,12 +48,12 @@ public class SMoveExecutor extends SetExecutor {
Region<ByteArrayWrapper, RedisData> region = getRegion(context);
+ RedisResponse response;
try (AutoCloseableLock regionLock = withRegionLock(context, source)) {
RedisData sourceSet = region.get(source);
if (sourceSet == null) {
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(),
NOT_MOVED));
- return;
+ return RedisResponse.integer(NOT_MOVED);
}
boolean removed =
@@ -60,46 +61,32 @@ public class SMoveExecutor extends SetExecutor {
new ArrayList<>(Collections.singletonList(member))) == 1;
if (!removed) {
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(),
NOT_MOVED));
- } else {
- try (AutoCloseableLock destinationLock = withRegionLock(context,
destination)) {
- // TODO: this should invoke a function in case the primary for
destination is remote
- new RedisSetInRegion(region).sadd(destination,
- new ArrayList<>(Collections.singletonList(member)));
-
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(),
MOVED));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- System.out.println("Interrupt exception!!");
- command.setResponse(
- Coder.getErrorResponse(context.getByteBufAllocator(), "Thread
interrupted."));
- return;
- } catch (TimeoutException e) {
- System.out.println("Timeout exception!!");
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- "Timeout acquiring lock. Please try again."));
- return;
- } catch (Exception e) {
- System.out.println("Unexpected exception: " + e);
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- "Unexpected exception."));
- }
+ return RedisResponse.integer(NOT_MOVED);
+ }
+
+ try (AutoCloseableLock destinationLock = withRegionLock(context,
destination)) {
+ // TODO: this should invoke a function in case the primary for
destination is remote
+ new RedisSetInRegion(region).sadd(destination,
+ new ArrayList<>(Collections.singletonList(member)));
+
+ response = RedisResponse.integer(MOVED);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ response = RedisResponse.error("Thread interrupted");
+ } catch (TimeoutException e) {
+ response = RedisResponse.error("Timeout acquiring lock. Please try
again.");
+ } catch (Exception e) {
+ response = RedisResponse.error("Unexpected exception.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- System.out.println("Interrupt exception!!");
- command.setResponse(
- Coder.getErrorResponse(context.getByteBufAllocator(), "Thread
interrupted."));
- return;
+ response = RedisResponse.error("Thread interrupted.");
} catch (TimeoutException e) {
- System.out.println("Timeout exception!!");
- command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- "Timeout acquiring lock. Please try again."));
- return;
+ response = RedisResponse.error("Timeout acquiring lock. Please try
again.");
} catch (Exception e) {
- System.out.println("Unexpected exception: " + e);
- command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- "Unexpected exception."));
+ response = RedisResponse.error("Unexpected exception.");
}
+
+ return response;
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
index a05777d..0fd6cdb 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
@@ -18,16 +18,15 @@ import java.util.Collection;
import java.util.List;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
-import org.apache.geode.redis.internal.CoderException;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
public class SPopExecutor extends SetExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
int popCount = 1;
if (commandElems.size() == 3) {
@@ -39,19 +38,13 @@ public class SPopExecutor extends SetExecutor {
new
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
Collection<ByteArrayWrapper> popped = redisSetCommands.spop(key, popCount);
if (popped.isEmpty()) {
- command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
- return;
+ return RedisResponse.nil();
}
- try {
- if (popCount == 1) {
- command.setResponse(
- Coder.getBulkStringResponse(context.getByteBufAllocator(),
popped.iterator().next()));
- } else {
-
command.setResponse(Coder.getArrayResponse(context.getByteBufAllocator(),
popped));
- }
- } catch (CoderException e) {
- command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- RedisConstants.SERVER_ERROR_MESSAGE));
+
+ if (popCount == 1) {
+ return RedisResponse.bulkString(popped.iterator().next());
+ } else {
+ return RedisResponse.array(popped);
}
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
index bc4fbbd..2b17553 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
@@ -19,17 +19,17 @@ import java.util.List;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.apache.geode.redis.internal.Coder;
-import org.apache.geode.redis.internal.CoderException;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
public class SRandMemberExecutor extends SetExecutor {
private static final String ERROR_NOT_NUMERIC = "The count provided must be
numeric";
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
ByteArrayWrapper key = command.getKey();
@@ -40,15 +40,14 @@ public class SRandMemberExecutor extends SetExecutor {
try {
count = Coder.bytesToInt(commandElems.get(2));
} catch (NumberFormatException e) {
- command.setResponse(
- Coder.getErrorResponse(context.getByteBufAllocator(),
ERROR_NOT_NUMERIC));
- return;
+ return RedisResponse.error(ERROR_NOT_NUMERIC);
}
}
+
if (count == 0) {
- command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
- return;
+ return RedisResponse.nil();
}
+
if (count < 0) {
count = -count;
}
@@ -56,19 +55,13 @@ public class SRandMemberExecutor extends SetExecutor {
RedisSetCommands redisSetCommands =
new
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
Collection<ByteArrayWrapper> results = redisSetCommands.srandmember(key,
count);
- try {
- if (results.isEmpty()) {
-
command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
- } else if (count == 1) {
- command.setResponse(
- Coder.getBulkStringResponse(context.getByteBufAllocator(),
- results.iterator().next().toBytes()));
- } else {
-
command.setResponse(Coder.getArrayResponse(context.getByteBufAllocator(),
results));
- }
- } catch (CoderException e) {
- command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- RedisConstants.SERVER_ERROR_MESSAGE));
+
+ if (results.isEmpty()) {
+ return RedisResponse.nil();
+ } else if (count == 1) {
+ return RedisResponse.bulkString(results.iterator().next());
+ } else {
+ return RedisResponse.array(results);
}
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
index de7882f..6233be3 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
@@ -18,13 +18,14 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.RedisResponse;
public class SRemExecutor extends SetExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<ByteArrayWrapper> commandElements =
command.getProcessedCommandWrappers();
ByteArrayWrapper key = command.getKey();
@@ -33,15 +34,13 @@ public class SRemExecutor extends SetExecutor {
new
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
ArrayList<ByteArrayWrapper> membersToRemove =
- new ArrayList<>(
- commandElements
- .subList(2, commandElements.size()));
+ new ArrayList<>(commandElements.subList(2, commandElements.size()));
long membersRemoved =
redisSetCommands.srem(
key,
membersToRemove);
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(),
membersRemoved));
+ return RedisResponse.integer(membersRemoved);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
index ef2a7e8..66af52a 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
@@ -25,12 +25,14 @@ import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.RedisConstants;
import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
import org.apache.geode.redis.internal.executor.AbstractScanExecutor;
public class SScanExecutor extends AbstractScanExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
ByteArrayWrapper key = command.getKey();
@@ -42,15 +44,15 @@ public class SScanExecutor extends AbstractScanExecutor {
Pattern matchPattern = null;
String globMatchPattern = null;
int count = DEFAULT_COUNT;
+
try {
cursor = Integer.parseInt(cursorString);
} catch (NumberFormatException e) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
ERROR_CURSOR));
- return;
+ return RedisResponse.error(ERROR_CURSOR);
}
+
if (cursor < 0) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
ERROR_CURSOR));
- return;
+ return RedisResponse.error(ERROR_CURSOR);
}
if (commandElems.size() > 4) {
@@ -65,8 +67,7 @@ public class SScanExecutor extends AbstractScanExecutor {
count = Coder.bytesToInt(bytes);
}
} catch (NumberFormatException e) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
ERROR_COUNT));
- return;
+ return RedisResponse.error(ERROR_COUNT);
}
}
@@ -79,27 +80,24 @@ public class SScanExecutor extends AbstractScanExecutor {
count = Coder.bytesToInt(bytes);
}
} catch (NumberFormatException e) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
ERROR_COUNT));
- return;
+ return RedisResponse.error(ERROR_COUNT);
}
}
if (count < 0) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
ERROR_COUNT));
- return;
+ return RedisResponse.error(ERROR_COUNT);
}
try {
matchPattern = convertGlobToRegex(globMatchPattern);
} catch (PatternSyntaxException e) {
- command.setResponse(
- Coder.getErrorResponse(context.getByteBufAllocator(),
RedisConstants.ERROR_ILLEGAL_GLOB));
- return;
+ return RedisResponse.error(RedisConstants.ERROR_ILLEGAL_GLOB);
}
RedisSetCommands redisSetCommands =
new
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
List<Object> returnList = redisSetCommands.sscan(key, matchPattern, count,
cursor);
- command.setResponse(Coder.getScanResponse(context.getByteBufAllocator(),
returnList));
+
+ return RedisResponse.scan(returnList);
}
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
index b6928fd..4e91040 100755
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
@@ -23,16 +23,17 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.redis.internal.AutoCloseableLock;
import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
import org.apache.geode.redis.internal.Command;
import org.apache.geode.redis.internal.ExecutionHandlerContext;
import org.apache.geode.redis.internal.RedisData;
+import org.apache.geode.redis.internal.RedisResponse;
import org.apache.geode.redis.internal.RegionProvider;
public abstract class SetOpExecutor extends SetExecutor {
@Override
- public void executeCommand(Command command, ExecutionHandlerContext context)
{
+ public RedisResponse executeCommandWithResponse(Command command,
+ ExecutionHandlerContext context) {
List<byte[]> commandElems = command.getProcessedCommand();
int setsStartIndex = isStorage() ? 2 : 1;
@@ -43,25 +44,27 @@ public abstract class SetOpExecutor extends SetExecutor {
}
ByteArrayWrapper firstSetKey = new
ByteArrayWrapper(commandElems.get(setsStartIndex++));
+ RedisResponse response;
+
if (destination != null) {
try (AutoCloseableLock regionLock = withRegionLock(context,
destination)) {
- doActualSetOperation(command, context, commandElems, setsStartIndex,
regionProvider,
- destination, firstSetKey);
+ response = doActualSetOperation(command, context, commandElems,
setsStartIndex,
+ regionProvider, destination, firstSetKey);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- command.setResponse(
- Coder.getErrorResponse(context.getByteBufAllocator(), "Thread
interrupted."));
+ response = RedisResponse.error("Thread interrupted");
} catch (TimeoutException e) {
-
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
- "Timeout acquiring lock. Please try again."));
+ return RedisResponse.error("Timeout acquiring lock. Please try again");
}
} else {
- doActualSetOperation(command, context, commandElems, setsStartIndex,
regionProvider,
- destination, firstSetKey);
+ response = doActualSetOperation(command, context, commandElems,
setsStartIndex,
+ regionProvider, destination, firstSetKey);
}
+
+ return response;
}
- private boolean doActualSetOperation(Command command,
ExecutionHandlerContext context,
+ private RedisResponse doActualSetOperation(Command command,
ExecutionHandlerContext context,
List<byte[]> commandElems, int setsStartIndex,
RegionProvider regionProvider, ByteArrayWrapper destination,
ByteArrayWrapper firstSetKey) {
@@ -81,10 +84,11 @@ public abstract class SetOpExecutor extends SetExecutor {
}
if (setList.isEmpty() && !isStorage()) {
- respondBulkStrings(command, context, firstSet);
- return true;
+ return respondBulkStrings(firstSet);
}
+ RedisResponse response;
+
Set<ByteArrayWrapper> resultSet = setOp(firstSet, setList);
if (isStorage()) {
regionProvider.removeKey(destination);
@@ -92,20 +96,19 @@ public abstract class SetOpExecutor extends SetExecutor {
if (!resultSet.isEmpty()) {
region.put(destination, new RedisSet(resultSet));
}
- command
- .setResponse(
- Coder.getIntegerResponse(context.getByteBufAllocator(),
resultSet.size()));
+ response = RedisResponse.integer(resultSet.size());
} else {
-
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 0));
+ response = RedisResponse.integer(0);
}
} else {
if (resultSet == null || resultSet.isEmpty()) {
-
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
+ response = RedisResponse.emptyArray();
} else {
- respondBulkStrings(command, context, resultSet);
+ response = respondBulkStrings(resultSet);
}
}
- return false;
+
+ return response;
}
protected abstract boolean isStorage();
diff --git
a/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
index 424b5b6..d051910 100644
---
a/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
+++
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
@@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
@@ -38,7 +39,7 @@ public class PubSubImplJUnitTest {
ExecutionHandlerContext mockContext = mock(ExecutionHandlerContext.class);
FailingChannelFuture mockFuture = new FailingChannelFuture();
- when(mockContext.writeToChannel(any())).thenReturn(mockFuture);
+ when(mockContext.writeToChannel((ByteBuf) any())).thenReturn(mockFuture);
when(mockContext.getByteBufAllocator()).thenReturn(new
PooledByteBufAllocator());
Client deadClient = mock(Client.class);