This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6aa2420  Introduce RedisResponse to SET executors (#5114)
6aa2420 is described below

commit 6aa2420842249184cc94915e1b207c44b130e2bb
Author: Jens Deppe <[email protected]>
AuthorDate: Tue May 19 13:06:44 2020 -0700

    Introduce RedisResponse to SET executors (#5114)
---
 .../org/apache/geode/redis/internal/Command.java   |  4 +-
 .../redis/internal/ExecutionHandlerContext.java    | 97 +++++++++++-----------
 .../org/apache/geode/redis/internal/Executor.java  | 15 +++-
 .../geode/redis/internal/RedisCommandType.java     | 10 ++-
 .../apache/geode/redis/internal/RedisResponse.java | 89 ++++++++++++++++++++
 .../redis/internal/executor/AbstractExecutor.java  |  9 ++
 .../redis/internal/executor/AuthExecutor.java      | 18 ++--
 .../redis/internal/executor/QuitExecutor.java      |  8 +-
 .../redis/internal/executor/set/SAddExecutor.java  |  7 +-
 .../redis/internal/executor/set/SCardExecutor.java |  9 +-
 .../internal/executor/set/SIsMemberExecutor.java   | 13 ++-
 .../internal/executor/set/SMembersExecutor.java    | 14 +---
 .../redis/internal/executor/set/SMoveExecutor.java | 65 ++++++---------
 .../redis/internal/executor/set/SPopExecutor.java  | 25 ++----
 .../internal/executor/set/SRandMemberExecutor.java | 35 ++++----
 .../redis/internal/executor/set/SRemExecutor.java  | 11 ++-
 .../redis/internal/executor/set/SScanExecutor.java | 28 +++----
 .../redis/internal/executor/set/SetOpExecutor.java | 43 +++++-----
 .../geode/redis/internal/PubSubImplJUnitTest.java  |  3 +-
 19 files changed, 295 insertions(+), 208 deletions(-)

diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java
index 5dd8fa4..89a6137 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Command.java
@@ -171,9 +171,9 @@ public class Command {
     return builder.toString();
   }
 
-  public void execute(ExecutionHandlerContext executionHandlerContext) {
+  public RedisResponse execute(ExecutionHandlerContext 
executionHandlerContext) {
     RedisCommandType type = getCommandType();
-    type.executeCommand(this, executionHandlerContext);
+    return type.executeCommand(this, executionHandlerContext);
   }
 
   boolean isOfType(RedisCommandType type) {
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
index 1998371..471dde2 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.redis.internal;
 
+import static org.apache.geode.redis.internal.RedisCommandType.PUBLISH;
+
 import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -55,8 +57,8 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
 
   private static final Logger logger = LogService.getLogger();
   private static final int WAIT_REGION_DSTRYD_MILLIS = 100;
-  private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) / 
WAIT_REGION_DSTRYD_MILLIS; // 60
-  // seconds
+  private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) / 
WAIT_REGION_DSTRYD_MILLIS;
+
   private final RedisLockService lockService;
 
   private final Cache cache;
@@ -136,6 +138,10 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
     return channel.writeAndFlush(message, channel.newPromise());
   }
 
+  public ChannelFuture writeToChannel(RedisResponse response) {
+    return channel.writeAndFlush(response.encode(byteBufAllocator), 
channel.newPromise());
+  }
+
   /**
    * This will handle the execution of received commands
    */
@@ -164,12 +170,12 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
       channelInactive(ctx);
       return;
     }
-    ByteBuf response = getExceptionResponse(ctx, cause);
-    writeToChannel(response);
+    writeToChannel(getExceptionResponse(ctx, cause));
   }
 
-  private ByteBuf getExceptionResponse(ChannelHandlerContext ctx, Throwable 
cause) {
-    ByteBuf response;
+  private RedisResponse getExceptionResponse(ChannelHandlerContext ctx, 
Throwable cause) {
+    RedisResponse response;
+
     if (cause instanceof FunctionException) {
       Throwable th = cause.getCause();
       if (th == null) {
@@ -184,28 +190,26 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
     }
 
     if (cause instanceof RedisDataTypeMismatchException) {
-      response = Coder.getWrongTypeResponse(this.byteBufAllocator, 
cause.getMessage());
+      response = RedisResponse.wrongType(cause.getMessage());
     } else if (cause instanceof DecoderException
         && cause.getCause() instanceof RedisCommandParserException) {
-      response =
-          Coder.getErrorResponse(this.byteBufAllocator, 
RedisConstants.PARSING_EXCEPTION_MESSAGE);
+      response = RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE);
 
     } else if (cause instanceof RegionCreationException) {
-      this.logger.error(cause);
-      response =
-          Coder.getErrorResponse(this.byteBufAllocator, 
RedisConstants.ERROR_REGION_CREATION);
+      logger.error(cause);
+      response = RedisResponse.error(RedisConstants.ERROR_REGION_CREATION);
     } else if (cause instanceof InterruptedException || cause instanceof 
CacheClosedException) {
-      response =
-          Coder.getErrorResponse(this.byteBufAllocator, 
RedisConstants.SERVER_ERROR_SHUTDOWN);
+      response = RedisResponse.error(RedisConstants.SERVER_ERROR_SHUTDOWN);
     } else if (cause instanceof IllegalStateException
         || cause instanceof RedisParametersMismatchException) {
-      response = Coder.getErrorResponse(this.byteBufAllocator, 
cause.getMessage());
+      response = RedisResponse.error(cause.getMessage());
     } else {
       if (logger.isErrorEnabled()) {
         logger.error("GeodeRedisServer-Unexpected error handler for " + 
ctx.channel(), cause);
       }
-      response = Coder.getErrorResponse(this.byteBufAllocator, 
RedisConstants.SERVER_ERROR_MESSAGE);
+      response = RedisResponse.error(RedisConstants.SERVER_ERROR_MESSAGE);
     }
+
     return response;
   }
 
@@ -219,6 +223,7 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
   }
 
   private void executeCommand(ChannelHandlerContext ctx, Command command) 
throws Exception {
+    RedisResponse response;
     if (isAuthenticated) {
       if (command.isOfType(RedisCommandType.SHUTDOWN)) {
         this.server.shutdown();
@@ -226,35 +231,32 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
       }
 
       if (hasTransaction() && !command.isTransactional()) {
-        executeWithTransaction(ctx, command);
+        response = executeWithTransaction(ctx, command);
       } else {
-        executeWithoutTransaction(command);
+        response = executeWithoutTransaction(command);
       }
 
       logResponse(command);
 
       if (hasTransaction() && command.isOfType(RedisCommandType.MULTI)) {
-        writeToChannel(
-            Coder.getSimpleStringResponse(this.byteBufAllocator, 
RedisConstants.COMMAND_QUEUED));
-      } else {
-        // PUBLISH responses are always deferred
-        if (command.getCommandType() != RedisCommandType.PUBLISH) {
-          ByteBuf response = command.getResponse();
-          writeToChannel(response);
-        }
+        response = RedisResponse.string(RedisConstants.COMMAND_QUEUED);
       }
-    } else if (command.isOfType(RedisCommandType.QUIT)) {
-      command.execute(this);
-      ByteBuf response = command.getResponse();
-      writeToChannel(response);
-      channelInactive(ctx);
     } else if (command.isOfType(RedisCommandType.AUTH)) {
-      command.execute(this);
-      ByteBuf response = command.getResponse();
-      writeToChannel(response);
+      response = command.execute(this);
     } else {
-      ByteBuf r = Coder.getNoAuthResponse(this.byteBufAllocator, 
RedisConstants.ERROR_NOT_AUTH);
-      writeToChannel(r);
+      response = RedisResponse.error(RedisConstants.ERROR_NOT_AUTH);
+    }
+
+    // PUBLISH responses are always deferred
+    // TODO: Clean this up once all Executors are using RedisResponse
+    if (response == null && !command.isOfType(PUBLISH)) {
+      writeToChannel(command.getResponse());
+    } else if (response != null) {
+      writeToChannel(response);
+    }
+
+    if (command.isOfType(RedisCommandType.QUIT)) {
+      channelInactive(ctx);
     }
   }
 
@@ -295,12 +297,11 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
    * @param command Command to execute
    * @throws Exception Throws exception if exception is from within execution 
and not to be handled
    */
-  private void executeWithoutTransaction(Command command) throws Exception {
+  private RedisResponse executeWithoutTransaction(Command command) throws 
Exception {
     Exception cause = null;
     for (int i = 0; i < MAXIMUM_NUM_RETRIES; i++) {
       try {
-        command.execute(this);
-        return;
+        return command.execute(this);
       } catch (Exception e) {
         logger.error(e);
 
@@ -314,26 +315,28 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
     throw cause;
   }
 
-  private void executeWithTransaction(ChannelHandlerContext ctx,
+  private RedisResponse executeWithTransaction(ChannelHandlerContext ctx,
       Command command) throws Exception {
     CacheTransactionManager txm = cache.getCacheTransactionManager();
     TransactionId transactionId = getTransactionID();
     txm.resume(transactionId);
+
+    RedisResponse response;
     try {
-      command.execute(this);
+      response = command.execute(this);
     } catch (UnsupportedOperationInTransactionException e) {
-      command.setResponse(Coder.getErrorResponse(this.byteBufAllocator,
-          RedisConstants.ERROR_UNSUPPORTED_OPERATION_IN_TRANSACTION));
+      response = 
RedisResponse.error(RedisConstants.ERROR_UNSUPPORTED_OPERATION_IN_TRANSACTION);
     } catch (TransactionException e) {
-      command.setResponse(Coder.getErrorResponse(this.byteBufAllocator,
-          RedisConstants.ERROR_TRANSACTION_EXCEPTION));
+      response = 
RedisResponse.error(RedisConstants.ERROR_TRANSACTION_EXCEPTION);
     } catch (Exception e) {
-      ByteBuf response = getExceptionResponse(ctx, e);
-      command.setResponse(response);
+      response = getExceptionResponse(ctx, e);
     }
+
     getTransactionQueue().add(command);
     transactionId = txm.suspend();
     setTransactionID(transactionId);
+
+    return response;
   }
 
   /**
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java
index b8a4542..8a2fe84 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Executor.java
@@ -16,7 +16,7 @@ package org.apache.geode.redis.internal;
 
 
 /**
- * Interface for executors of a {@link Command}
+ * Interface for executors of a {@link Command}.
  */
 public interface Executor {
 
@@ -27,9 +27,18 @@ public interface Executor {
    * @param command The command to be executed
    * @param context The execution context by which this command is to be 
executed
    */
-  void executeCommand(Command command, ExecutionHandlerContext context);
+  default void executeCommand(Command command, ExecutionHandlerContext 
context) {
+    executeCommandWithResponse(command, context);
+  }
 
-  default void execute(Command command, ExecutionHandlerContext context) {
+  /**
+   * Interim method to transition to returning a {@link RedisResponse}
+   *
+   * TODO: Once all commands are transitioned, one of these methods needs to 
be removed.
+   */
+  default RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     executeCommand(command, context);
+    return null;
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index a505888..49eca2d 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -340,18 +340,20 @@ public enum RedisCommandType {
   private final Executor executor;
   private final ParameterRequirements parameterRequirements;
 
-  private RedisCommandType(Executor executor) {
+  RedisCommandType(Executor executor) {
     this(executor, new UnspecifiedParameterRequirements());
   }
 
-  private RedisCommandType(Executor executor, ParameterRequirements 
parameterRequirements) {
+  RedisCommandType(Executor executor, ParameterRequirements 
parameterRequirements) {
     this.executor = executor;
     this.parameterRequirements = parameterRequirements;
   }
 
-  public void executeCommand(Command command, ExecutionHandlerContext 
executionHandlerContext) {
+  public RedisResponse executeCommand(Command command,
+      ExecutionHandlerContext executionHandlerContext) {
     parameterRequirements.checkParameters(command, executionHandlerContext);
-    executor.executeCommand(command, executionHandlerContext);
+
+    return executor.executeCommandWithResponse(command, 
executionHandlerContext);
   }
 
   public boolean isTransactional() {
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisResponse.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisResponse.java
new file mode 100644
index 0000000..fd31f00
--- /dev/null
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+public class RedisResponse {
+
+  private final Function<ByteBufAllocator, ByteBuf> coderCallback;
+
+  private RedisResponse(Function<ByteBufAllocator, ByteBuf> coderCallback) {
+    this.coderCallback = coderCallback;
+  }
+
+  public ByteBuf encode(ByteBufAllocator allocator) {
+    return coderCallback.apply(allocator);
+  }
+
+  public static RedisResponse integer(long numericValue) {
+    return new RedisResponse((bba) -> Coder.getIntegerResponse(bba, 
numericValue));
+  }
+
+  public static RedisResponse string(String stringValue) {
+    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, 
stringValue));
+  }
+
+  public static RedisResponse bulkString(Object value) {
+    return new RedisResponse((bba) -> {
+      try {
+        return Coder.getBulkStringResponse(bba, value);
+      } catch (CoderException e) {
+        return Coder.getErrorResponse(bba, "Internal server error: " + 
e.getMessage());
+      }
+    });
+  }
+
+  public static RedisResponse ok() {
+    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, 
"OK"));
+  }
+
+  public static RedisResponse nil() {
+    return new RedisResponse(Coder::getNilResponse);
+  }
+
+  public static RedisResponse array(Collection<?> collection) {
+    return new RedisResponse((bba) -> {
+      try {
+        return Coder.getArrayResponse(bba, collection);
+      } catch (CoderException e) {
+        return Coder.getErrorResponse(bba, "Internal server error: " + 
e.getMessage());
+      }
+    });
+  }
+
+  public static RedisResponse emptyArray() {
+    return new RedisResponse(Coder::getEmptyArrayResponse);
+  }
+
+  public static RedisResponse error(String error) {
+    return new RedisResponse((bba) -> Coder.getErrorResponse(bba, error));
+  }
+
+  public static RedisResponse wrongType(String error) {
+    return new RedisResponse((bba) -> Coder.getWrongTypeResponse(bba, error));
+  }
+
+  public static RedisResponse scan(List<?> items) {
+    return new RedisResponse((bba) -> Coder.getScanResponse(bba, items));
+  }
+}
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
index adc4369..56425a6 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
@@ -29,6 +29,7 @@ import org.apache.geode.redis.internal.Executor;
 import org.apache.geode.redis.internal.GeodeRedisServer;
 import org.apache.geode.redis.internal.RedisConstants;
 import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
 import org.apache.geode.redis.internal.RegionProvider;
 
 /**
@@ -159,4 +160,12 @@ public abstract class AbstractExecutor implements Executor 
{
 
     command.setResponse(rsp);
   }
+
+  protected RedisResponse respondBulkStrings(Object message) {
+    if (message instanceof Collection) {
+      return RedisResponse.array((Collection<?>) message);
+    } else {
+      return RedisResponse.string((String) message);
+    }
+  }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
index d4e3436..a2b3637 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AuthExecutor.java
@@ -17,37 +17,35 @@ package org.apache.geode.redis.internal.executor;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
 import org.apache.geode.redis.internal.Executor;
 import org.apache.geode.redis.internal.RedisConstants;
 import org.apache.geode.redis.internal.RedisConstants.ArityDef;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class AuthExecutor implements Executor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
     if (commandElems.size() < 2) {
-      
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), 
ArityDef.AUTH));
-      return;
+      return RedisResponse.error(ArityDef.AUTH);
     }
     byte[] password = context.getAuthPassword();
     if (password == null) {
-      command.setResponse(
-          Coder.getErrorResponse(context.getByteBufAllocator(), 
RedisConstants.ERROR_NO_PASS));
-      return;
+      return RedisResponse.error(RedisConstants.ERROR_NO_PASS);
     }
+
     boolean correct = Arrays.equals(commandElems.get(1), password);
 
     if (correct) {
       context.setAuthenticationVerified();
-      
command.setResponse(Coder.getSimpleStringResponse(context.getByteBufAllocator(),
 "OK"));
+      return RedisResponse.ok();
     } else {
-      command.setResponse(
-          Coder.getErrorResponse(context.getByteBufAllocator(), 
RedisConstants.ERROR_INVALID_PWD));
+      return RedisResponse.error(RedisConstants.ERROR_INVALID_PWD);
     }
   }
 
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
index 84ad241..5b677dc 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/QuitExecutor.java
@@ -14,17 +14,17 @@
  */
 package org.apache.geode.redis.internal.executor;
 
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
 import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class QuitExecutor extends AbstractExecutor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
-    command.setResponse(
-        Coder.getSimpleStringResponse(context.getByteBufAllocator(), 
RedisConstants.QUIT_RESPONSE));
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
+    return RedisResponse.string(RedisConstants.QUIT_RESPONSE);
   }
 
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
index 0b86071..1a50768 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SAddExecutor.java
@@ -18,14 +18,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SAddExecutor extends SetExecutor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
 
     List<ByteArrayWrapper> commandElements = 
command.getProcessedCommandWrappers();
 
@@ -37,6 +38,6 @@ public class SAddExecutor extends SetExecutor {
 
     long entriesAdded = redisSetCommands.sadd(command.getKey(), membersToAdd);
 
-    
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 
entriesAdded));
+    return RedisResponse.integer(entriesAdded);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
index f5c313b..f9517b8 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
@@ -16,20 +16,21 @@ package org.apache.geode.redis.internal.executor.set;
 
 
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
 import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SCardExecutor extends SetExecutor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     ByteArrayWrapper key = command.getKey();
     checkDataType(key, RedisDataType.REDIS_SET, context);
     RedisSetCommands redisSetCommands =
         new 
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
-    int size = redisSetCommands.scard(key);
-    
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 
size));
+
+    return RedisResponse.integer(redisSetCommands.scard(key));
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
index 85819be..a5f38aa 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
@@ -17,24 +17,31 @@ package org.apache.geode.redis.internal.executor.set;
 import java.util.List;
 
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SIsMemberExecutor extends SetExecutor {
 
   private static final int EXISTS = 1;
+
   private static final int NOT_EXISTS = 0;
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
     ByteArrayWrapper key = command.getKey();
+    if (!context.getKeyRegistrar().isRegistered(key)) {
+      return RedisResponse.integer(NOT_EXISTS);
+    }
+
     ByteArrayWrapper member = new ByteArrayWrapper(commandElems.get(2));
     RedisSetCommands redisSetCommands =
         new 
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
     int result = redisSetCommands.sismember(key, member) ? EXISTS : NOT_EXISTS;
-    
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 
result));
+
+    return RedisResponse.integer(result);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
index 10b909b..1c367a0 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
@@ -17,26 +17,20 @@ package org.apache.geode.redis.internal.executor.set;
 import java.util.Set;
 
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
-import org.apache.geode.redis.internal.CoderException;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SMembersExecutor extends SetExecutor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     ByteArrayWrapper key = command.getKey();
     RedisSetCommands redisSetCommands =
         new 
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
     Set<ByteArrayWrapper> members = redisSetCommands.smembers(key);
 
-    try {
-      
command.setResponse(Coder.getArrayResponse(context.getByteBufAllocator(), 
members));
-    } catch (CoderException e) {
-      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-          RedisConstants.SERVER_ERROR_MESSAGE));
-    }
+    return RedisResponse.array(members);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
index 4da5317..78db927 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
@@ -22,11 +22,11 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.redis.internal.AutoCloseableLock;
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
 import org.apache.geode.redis.internal.RedisData;
 import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SMoveExecutor extends SetExecutor {
 
@@ -35,7 +35,8 @@ public class SMoveExecutor extends SetExecutor {
   private static final int NOT_MOVED = 0;
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
     ByteArrayWrapper source = command.getKey();
@@ -47,12 +48,12 @@ public class SMoveExecutor extends SetExecutor {
 
     Region<ByteArrayWrapper, RedisData> region = getRegion(context);
 
+    RedisResponse response;
     try (AutoCloseableLock regionLock = withRegionLock(context, source)) {
       RedisData sourceSet = region.get(source);
 
       if (sourceSet == null) {
-        
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 
NOT_MOVED));
-        return;
+        return RedisResponse.integer(NOT_MOVED);
       }
 
       boolean removed =
@@ -60,46 +61,32 @@ public class SMoveExecutor extends SetExecutor {
               new ArrayList<>(Collections.singletonList(member))) == 1;
 
       if (!removed) {
-        
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 
NOT_MOVED));
-      } else {
-        try (AutoCloseableLock destinationLock = withRegionLock(context, 
destination)) {
-          // TODO: this should invoke a function in case the primary for 
destination is remote
-          new RedisSetInRegion(region).sadd(destination,
-              new ArrayList<>(Collections.singletonList(member)));
-
-          
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 
MOVED));
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          System.out.println("Interrupt exception!!");
-          command.setResponse(
-              Coder.getErrorResponse(context.getByteBufAllocator(), "Thread 
interrupted."));
-          return;
-        } catch (TimeoutException e) {
-          System.out.println("Timeout exception!!");
-          
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-              "Timeout acquiring lock. Please try again."));
-          return;
-        } catch (Exception e) {
-          System.out.println("Unexpected exception: " + e);
-          
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-              "Unexpected exception."));
-        }
+        return RedisResponse.integer(NOT_MOVED);
+      }
+
+      try (AutoCloseableLock destinationLock = withRegionLock(context, 
destination)) {
+        // TODO: this should invoke a function in case the primary for 
destination is remote
+        new RedisSetInRegion(region).sadd(destination,
+            new ArrayList<>(Collections.singletonList(member)));
+
+        response = RedisResponse.integer(MOVED);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        response = RedisResponse.error("Thread interrupted");
+      } catch (TimeoutException e) {
+        response = RedisResponse.error("Timeout acquiring lock. Please try 
again.");
+      } catch (Exception e) {
+        response = RedisResponse.error("Unexpected exception.");
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      System.out.println("Interrupt exception!!");
-      command.setResponse(
-          Coder.getErrorResponse(context.getByteBufAllocator(), "Thread 
interrupted."));
-      return;
+      response = RedisResponse.error("Thread interrupted.");
     } catch (TimeoutException e) {
-      System.out.println("Timeout exception!!");
-      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-          "Timeout acquiring lock. Please try again."));
-      return;
+      response = RedisResponse.error("Timeout acquiring lock. Please try 
again.");
     } catch (Exception e) {
-      System.out.println("Unexpected exception: " + e);
-      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-          "Unexpected exception."));
+      response = RedisResponse.error("Unexpected exception.");
     }
+
+    return response;
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
index a05777d..0fd6cdb 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
@@ -18,16 +18,15 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
-import org.apache.geode.redis.internal.CoderException;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SPopExecutor extends SetExecutor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
     int popCount = 1;
     if (commandElems.size() == 3) {
@@ -39,19 +38,13 @@ public class SPopExecutor extends SetExecutor {
         new 
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
     Collection<ByteArrayWrapper> popped = redisSetCommands.spop(key, popCount);
     if (popped.isEmpty()) {
-      command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
-      return;
+      return RedisResponse.nil();
     }
-    try {
-      if (popCount == 1) {
-        command.setResponse(
-            Coder.getBulkStringResponse(context.getByteBufAllocator(), 
popped.iterator().next()));
-      } else {
-        
command.setResponse(Coder.getArrayResponse(context.getByteBufAllocator(), 
popped));
-      }
-    } catch (CoderException e) {
-      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-          RedisConstants.SERVER_ERROR_MESSAGE));
+
+    if (popCount == 1) {
+      return RedisResponse.bulkString(popped.iterator().next());
+    } else {
+      return RedisResponse.array(popped);
     }
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
index bc4fbbd..2b17553 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
@@ -19,17 +19,17 @@ import java.util.List;
 
 import org.apache.geode.redis.internal.ByteArrayWrapper;
 import org.apache.geode.redis.internal.Coder;
-import org.apache.geode.redis.internal.CoderException;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
-import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SRandMemberExecutor extends SetExecutor {
 
   private static final String ERROR_NOT_NUMERIC = "The count provided must be 
numeric";
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
     ByteArrayWrapper key = command.getKey();
@@ -40,15 +40,14 @@ public class SRandMemberExecutor extends SetExecutor {
       try {
         count = Coder.bytesToInt(commandElems.get(2));
       } catch (NumberFormatException e) {
-        command.setResponse(
-            Coder.getErrorResponse(context.getByteBufAllocator(), 
ERROR_NOT_NUMERIC));
-        return;
+        return RedisResponse.error(ERROR_NOT_NUMERIC);
       }
     }
+
     if (count == 0) {
-      command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
-      return;
+      return RedisResponse.nil();
     }
+
     if (count < 0) {
       count = -count;
     }
@@ -56,19 +55,13 @@ public class SRandMemberExecutor extends SetExecutor {
     RedisSetCommands redisSetCommands =
         new 
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
     Collection<ByteArrayWrapper> results = redisSetCommands.srandmember(key, 
count);
-    try {
-      if (results.isEmpty()) {
-        
command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
-      } else if (count == 1) {
-        command.setResponse(
-            Coder.getBulkStringResponse(context.getByteBufAllocator(),
-                results.iterator().next().toBytes()));
-      } else {
-        
command.setResponse(Coder.getArrayResponse(context.getByteBufAllocator(), 
results));
-      }
-    } catch (CoderException e) {
-      command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-          RedisConstants.SERVER_ERROR_MESSAGE));
+
+    if (results.isEmpty()) {
+      return RedisResponse.nil();
+    } else if (count == 1) {
+      return RedisResponse.bulkString(results.iterator().next());
+    } else {
+      return RedisResponse.array(results);
     }
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
index de7882f..6233be3 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
@@ -18,13 +18,14 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.RedisResponse;
 
 public class SRemExecutor extends SetExecutor {
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<ByteArrayWrapper> commandElements = 
command.getProcessedCommandWrappers();
 
     ByteArrayWrapper key = command.getKey();
@@ -33,15 +34,13 @@ public class SRemExecutor extends SetExecutor {
         new 
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
 
     ArrayList<ByteArrayWrapper> membersToRemove =
-        new ArrayList<>(
-            commandElements
-                .subList(2, commandElements.size()));
+        new ArrayList<>(commandElements.subList(2, commandElements.size()));
 
     long membersRemoved =
         redisSetCommands.srem(
             key,
             membersToRemove);
 
-    
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 
membersRemoved));
+    return RedisResponse.integer(membersRemoved);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
index ef2a7e8..66af52a 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
@@ -25,12 +25,14 @@ import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
 import org.apache.geode.redis.internal.RedisConstants;
 import org.apache.geode.redis.internal.RedisDataType;
+import org.apache.geode.redis.internal.RedisResponse;
 import org.apache.geode.redis.internal.executor.AbstractScanExecutor;
 
 public class SScanExecutor extends AbstractScanExecutor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
     ByteArrayWrapper key = command.getKey();
@@ -42,15 +44,15 @@ public class SScanExecutor extends AbstractScanExecutor {
     Pattern matchPattern = null;
     String globMatchPattern = null;
     int count = DEFAULT_COUNT;
+
     try {
       cursor = Integer.parseInt(cursorString);
     } catch (NumberFormatException e) {
-      
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), 
ERROR_CURSOR));
-      return;
+      return RedisResponse.error(ERROR_CURSOR);
     }
+
     if (cursor < 0) {
-      
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), 
ERROR_CURSOR));
-      return;
+      return RedisResponse.error(ERROR_CURSOR);
     }
 
     if (commandElems.size() > 4) {
@@ -65,8 +67,7 @@ public class SScanExecutor extends AbstractScanExecutor {
           count = Coder.bytesToInt(bytes);
         }
       } catch (NumberFormatException e) {
-        
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), 
ERROR_COUNT));
-        return;
+        return RedisResponse.error(ERROR_COUNT);
       }
     }
 
@@ -79,27 +80,24 @@ public class SScanExecutor extends AbstractScanExecutor {
           count = Coder.bytesToInt(bytes);
         }
       } catch (NumberFormatException e) {
-        
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), 
ERROR_COUNT));
-        return;
+        return RedisResponse.error(ERROR_COUNT);
       }
     }
 
     if (count < 0) {
-      
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), 
ERROR_COUNT));
-      return;
+      return RedisResponse.error(ERROR_COUNT);
     }
 
     try {
       matchPattern = convertGlobToRegex(globMatchPattern);
     } catch (PatternSyntaxException e) {
-      command.setResponse(
-          Coder.getErrorResponse(context.getByteBufAllocator(), 
RedisConstants.ERROR_ILLEGAL_GLOB));
-      return;
+      return RedisResponse.error(RedisConstants.ERROR_ILLEGAL_GLOB);
     }
 
     RedisSetCommands redisSetCommands =
         new 
RedisSetCommandsFunctionExecutor(context.getRegionProvider().getDataRegion());
     List<Object> returnList = redisSetCommands.sscan(key, matchPattern, count, 
cursor);
-    command.setResponse(Coder.getScanResponse(context.getByteBufAllocator(), 
returnList));
+
+    return RedisResponse.scan(returnList);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
index b6928fd..4e91040 100755
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
@@ -23,16 +23,17 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.redis.internal.AutoCloseableLock;
 import org.apache.geode.redis.internal.ByteArrayWrapper;
-import org.apache.geode.redis.internal.Coder;
 import org.apache.geode.redis.internal.Command;
 import org.apache.geode.redis.internal.ExecutionHandlerContext;
 import org.apache.geode.redis.internal.RedisData;
+import org.apache.geode.redis.internal.RedisResponse;
 import org.apache.geode.redis.internal.RegionProvider;
 
 public abstract class SetOpExecutor extends SetExecutor {
 
   @Override
-  public void executeCommand(Command command, ExecutionHandlerContext context) 
{
+  public RedisResponse executeCommandWithResponse(Command command,
+      ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
     int setsStartIndex = isStorage() ? 2 : 1;
 
@@ -43,25 +44,27 @@ public abstract class SetOpExecutor extends SetExecutor {
     }
 
     ByteArrayWrapper firstSetKey = new 
ByteArrayWrapper(commandElems.get(setsStartIndex++));
+    RedisResponse response;
+
     if (destination != null) {
       try (AutoCloseableLock regionLock = withRegionLock(context, 
destination)) {
-        doActualSetOperation(command, context, commandElems, setsStartIndex, 
regionProvider,
-            destination, firstSetKey);
+        response = doActualSetOperation(command, context, commandElems, 
setsStartIndex,
+            regionProvider, destination, firstSetKey);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        command.setResponse(
-            Coder.getErrorResponse(context.getByteBufAllocator(), "Thread 
interrupted."));
+        response = RedisResponse.error("Thread interrupted");
       } catch (TimeoutException e) {
-        
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(),
-            "Timeout acquiring lock. Please try again."));
+        return RedisResponse.error("Timeout acquiring lock. Please try again");
       }
     } else {
-      doActualSetOperation(command, context, commandElems, setsStartIndex, 
regionProvider,
-          destination, firstSetKey);
+      response = doActualSetOperation(command, context, commandElems, 
setsStartIndex,
+          regionProvider, destination, firstSetKey);
     }
+
+    return response;
   }
 
-  private boolean doActualSetOperation(Command command, 
ExecutionHandlerContext context,
+  private RedisResponse doActualSetOperation(Command command, 
ExecutionHandlerContext context,
       List<byte[]> commandElems, int setsStartIndex,
       RegionProvider regionProvider, ByteArrayWrapper destination,
       ByteArrayWrapper firstSetKey) {
@@ -81,10 +84,11 @@ public abstract class SetOpExecutor extends SetExecutor {
     }
 
     if (setList.isEmpty() && !isStorage()) {
-      respondBulkStrings(command, context, firstSet);
-      return true;
+      return respondBulkStrings(firstSet);
     }
 
+    RedisResponse response;
+
     Set<ByteArrayWrapper> resultSet = setOp(firstSet, setList);
     if (isStorage()) {
       regionProvider.removeKey(destination);
@@ -92,20 +96,19 @@ public abstract class SetOpExecutor extends SetExecutor {
         if (!resultSet.isEmpty()) {
           region.put(destination, new RedisSet(resultSet));
         }
-        command
-            .setResponse(
-                Coder.getIntegerResponse(context.getByteBufAllocator(), 
resultSet.size()));
+        response = RedisResponse.integer(resultSet.size());
       } else {
-        
command.setResponse(Coder.getIntegerResponse(context.getByteBufAllocator(), 0));
+        response = RedisResponse.integer(0);
       }
     } else {
       if (resultSet == null || resultSet.isEmpty()) {
-        
command.setResponse(Coder.getEmptyArrayResponse(context.getByteBufAllocator()));
+        response = RedisResponse.emptyArray();
       } else {
-        respondBulkStrings(command, context, resultSet);
+        response = respondBulkStrings(resultSet);
       }
     }
-    return false;
+
+    return response;
   }
 
   protected abstract boolean isStorage();
diff --git 
a/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
 
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
index 424b5b6..d051910 100644
--- 
a/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
+++ 
b/geode-redis/src/test/java/org/apache/geode/redis/internal/PubSubImplJUnitTest.java
@@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
@@ -38,7 +39,7 @@ public class PubSubImplJUnitTest {
     ExecutionHandlerContext mockContext = mock(ExecutionHandlerContext.class);
 
     FailingChannelFuture mockFuture = new FailingChannelFuture();
-    when(mockContext.writeToChannel(any())).thenReturn(mockFuture);
+    when(mockContext.writeToChannel((ByteBuf) any())).thenReturn(mockFuture);
     when(mockContext.getByteBufAllocator()).thenReturn(new 
PooledByteBufAllocator());
 
     Client deadClient = mock(Client.class);

Reply via email to