This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new ae8d674  GEODE-9518: Implement Radish ZUNIONSTORE command (#6794)
ae8d674 is described below

commit ae8d6745a7a1c4ce3a1c3f80c80ca906f53d5e82
Author: Jens Deppe <[email protected]>
AuthorDate: Wed Sep 1 18:41:52 2021 -0700

    GEODE-9518: Implement Radish ZUNIONSTORE command (#6794)
    
    - Add ability to lock multiple keys in a consistent order
    - Add validation option to ConcurrentLoopingThreads
    - Add CROSSSLOTS error message
---
 .../ZUnionStoreNativeRedisAcceptanceTest.java      |  36 ++
 .../geode/redis/ConcurrentLoopingThreads.java      |  51 ++-
 .../redis/RedisCommandArgumentsTestHelper.java     |   2 +-
 .../server/AbstractHitsMissesIntegrationTest.java  |   7 +
 .../set/AbstractSUnionIntegrationTest.java         |  22 +
 .../AbstractZUnionStoreIntegrationTest.java        | 486 +++++++++++++++++++++
 .../sortedset/ZUnionStoreIntegrationTest.java      |  31 ++
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 .../geode/redis/internal/RedisCommandType.java     |   2 +
 .../geode/redis/internal/RedisConstants.java       |   5 +-
 .../geode/redis/internal/RegionProvider.java       |  44 +-
 .../redis/internal/data/NullRedisSortedSet.java    |   1 -
 .../data/RedisDataCommandsFunctionExecutor.java    |   9 +-
 .../data/RedisKeyCommandsFunctionExecutor.java     |   4 +-
 .../geode/redis/internal/data/RedisSortedSet.java  |  58 +++
 .../RedisSortedSetCommandsFunctionExecutor.java    |  23 +
 .../redis/internal/executor/RedisResponse.java     |   4 +
 .../executor/sortedset/RedisSortedSetCommands.java |   3 +
 .../internal/executor/sortedset/ZAggregator.java   |  39 ++
 .../internal/executor/sortedset/ZKeyWeight.java    |  40 ++
 .../executor/sortedset/ZUnionStoreExecutor.java    | 126 ++++++
 .../apache/geode/redis/internal/netty/Coder.java   |   5 +
 .../redis/internal/netty/StringBytesGlossary.java  |  11 +
 .../internal/services/StripedCoordinator.java      |   7 +-
 .../services/SynchronizedStripedCoordinator.java   |  20 +-
 .../SynchronizedStripedCoordinatorTest.java        |   3 +-
 26 files changed, 1005 insertions(+), 35 deletions(-)

diff --git 
a/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreNativeRedisAcceptanceTest.java
 
b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreNativeRedisAcceptanceTest.java
new file mode 100755
index 0000000..cfef4cc
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreNativeRedisAcceptanceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.NativeRedisClusterTestRule;
+
+public class ZUnionStoreNativeRedisAcceptanceTest extends 
AbstractZUnionStoreIntegrationTest {
+
+  @ClassRule
+  public static NativeRedisClusterTestRule server = new 
NativeRedisClusterTestRule();
+
+  @Override
+  public int getPort() {
+    return server.getExposedPorts().get(0);
+  }
+
+  @Override
+  public void flushAll() {
+    server.flushAll();
+  }
+
+}
diff --git 
a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
 
b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
index f2302f5..56813e0 100644
--- 
a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
+++ 
b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
@@ -33,6 +33,7 @@ public class ConcurrentLoopingThreads {
   private final Consumer<Integer>[] functions;
   private ExecutorService executorService = Executors.newCachedThreadPool();
   private List<Future<?>> loopingFutures;
+  private Throwable actionThrowable = null;
 
   @SafeVarargs
   public ConcurrentLoopingThreads(int iterationCount,
@@ -45,15 +46,15 @@ public class ConcurrentLoopingThreads {
    * Start the operations asynchronously. Use {@link #await()} to wait for 
completion.
    */
   public ConcurrentLoopingThreads start() {
-    return start(false);
+    return start(false, null);
   }
 
-  private ConcurrentLoopingThreads start(boolean lockstep) {
-    CyclicBarrier latch = new CyclicBarrier(functions.length);
+  private ConcurrentLoopingThreads start(boolean lockstep, Runnable 
barrierAction) {
+    CyclicBarrier barrier = new CyclicBarrier(functions.length, barrierAction);
 
     loopingFutures = Arrays
         .stream(functions)
-        .map(r -> new LoopingThread(r, iterationCount, latch, lockstep))
+        .map(r -> new LoopingThread(r, iterationCount, barrier, lockstep))
         .map(t -> executorService.submit(t))
         .collect(Collectors.toList());
 
@@ -84,7 +85,7 @@ public class ConcurrentLoopingThreads {
    * Start operations and only return once all are complete.
    */
   public void run() {
-    start(false);
+    start(false, null);
     await();
   }
 
@@ -92,10 +93,41 @@ public class ConcurrentLoopingThreads {
    * Start operations and run each iteration in lockstep
    */
   public void runInLockstep() {
-    start(true);
+    start(true, null);
     await();
   }
 
+  /**
+   * Start operations and provide an action to be performed at the end of 
every iteration. This
+   * implies running in lockstep. This would typically be used to provide some 
form of validation.
+   */
+  public void runWithAction(Runnable action) {
+    Runnable innerRunnable = () -> {
+      try {
+        action.run();
+      } catch (Throwable e) {
+        actionThrowable = e;
+        throw e;
+      }
+    };
+
+    start(true, innerRunnable);
+
+    try {
+      await();
+    } catch (Throwable e) {
+      if (actionThrowable != null) {
+        // This will ensure that AssertionErrors are clearly apparent
+        if (actionThrowable instanceof Error) {
+          throw (Error) actionThrowable;
+        }
+        throw new RuntimeException(actionThrowable);
+      } else {
+        throw e;
+      }
+    }
+  }
+
   private static class LoopingRunnable implements Runnable {
     private final Consumer<Integer> runnable;
     private final int iterationCount;
@@ -112,13 +144,14 @@ public class ConcurrentLoopingThreads {
 
     @Override
     public void run() {
-      waitForBarrier();
+      if (!lockstep) {
+        waitForBarrier();
+      }
       for (int i = 0; i < iterationCount; i++) {
+        runnable.accept(i);
         if (lockstep) {
           waitForBarrier();
         }
-        runnable.accept(i);
-        Thread.yield();
       }
     }
 
diff --git 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
index ce69883..4d5924e 100644
--- 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
@@ -121,7 +121,7 @@ public class RedisCommandArgumentsTestHelper {
     }
 
     for (int i = 0; i < numArgs; i++) {
-      args[i] = String.valueOf(i).getBytes();
+      args[i] = ("{A}" + i).getBytes();
     }
 
     return args;
diff --git 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
index a82fa2e..53cf67c 100644
--- 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
@@ -32,6 +32,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 import redis.clients.jedis.BitOP;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ZParams;
 
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
@@ -332,6 +333,12 @@ public abstract class AbstractHitsMissesIntegrationTest 
implements RedisIntegrat
     runCommandAndAssertHitsAndMisses(SORTED_SET_KEY, k -> 
jedis.zrevrangeByScore(k, 1, 0));
   }
 
+  @Test
+  public void testZUnionStore() {
+    runCommandAndAssertNoStatUpdates(SORTED_SET_KEY,
+        k -> jedis.zunionstore(k, new ZParams().weights(1, 2), k, k));
+  }
+
   /************* Set related commands *************/
   @Test
   public void testSadd() {
diff --git 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
index 6193449..6b90111 100755
--- 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
@@ -30,6 +30,7 @@ import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.Protocol;
 
+import org.apache.geode.redis.ConcurrentLoopingThreads;
 import org.apache.geode.redis.RedisIntegrationTest;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 
@@ -198,4 +199,25 @@ public abstract class AbstractSUnionIntegrationTest 
implements RedisIntegrationT
     assertThat(jedis.smembers("{user1}master").toArray())
         .containsExactlyInAnyOrder(masterSet.toArray());
   }
+
+  @Test
+  public void doesNotThrowExceptions_whenConcurrentSaddAndSunionExecute() {
+    final int ENTRIES = 1000;
+    Set<String> set1 = new HashSet<>();
+    Set<String> set2 = new HashSet<>();
+    for (int i = 0; i < ENTRIES; i++) {
+      set1.add("value-1-" + i);
+      set2.add("value-2-" + i);
+    }
+
+    jedis.sadd("{player1}key1", set1.toArray(new String[] {}));
+    jedis.sadd("{player1}key2", set2.toArray(new String[] {}));
+
+    new ConcurrentLoopingThreads(ENTRIES,
+        i -> jedis.sunion("{player1}key1", "{player1}key2"),
+        i -> jedis.sadd("{player1}key1", "newValue-1-" + i),
+        i -> jedis.sadd("{player1}key2", "newValue-2-" + i))
+            .runInLockstep();
+  }
+
 }
diff --git 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
new file mode 100755
index 0000000..f1036b5
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static 
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static 
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZUnionStoreIntegrationTest implements 
RedisIntegrationTest {
+
+  private static final String NEW_SET = "{user1}new";
+  private static final String SORTED_SET_KEY1 = "{user1}sset1";
+  private static final String SORTED_SET_KEY2 = "{user1}sset2";
+  private static final String SORTED_SET_KEY3 = "{user1}sset3";
+
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), 
REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void shouldError_givenWrongKeyType() {
+    final String STRING_KEY = "{user1}stringKey";
+    jedis.set(STRING_KEY, "value");
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "2", STRING_KEY,
+            SORTED_SET_KEY1))
+                .hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void shouldError_givenSetsCrossSlots() {
+    final String WRONG_KEY = "{user2}another";
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "2", WRONG_KEY,
+            SORTED_SET_KEY1))
+                .hasMessage("CROSSSLOT " + RedisConstants.ERROR_WRONG_SLOT);
+  }
+
+  @Test
+  public void shouldError_givenTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZUNIONSTORE, 3);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooLarge() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "2",
+            SORTED_SET_KEY1))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooSmall() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, SORTED_SET_KEY2))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooManyWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS", "2", "3"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooFewWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "2",
+            SORTED_SET_KEY1, SORTED_SET_KEY2, "WEIGHTS", "1"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenWeightNotANumber() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS", "not-a-number"))
+                .hasMessage("ERR " + RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT);
+  }
+
+  @Test
+  public void shouldError_givenWeightsWithoutAnyValues() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleWeightKeywords() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHT", "1.0", "WEIGHT", "2.0"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenUnknownAggregate() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "AGGREGATE", "UNKNOWN", "WEIGHTS", "1"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenAggregateKeywordWithoutValue() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "AGGREGATE"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleAggregates() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, 
NEW_SET, "1",
+            SORTED_SET_KEY1, "WEIGHTS", "1", "AGGREGATE", "SUM", "MIN"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldUnionize_givenBoundaryScoresAndWeights() {
+    Map<String, Double> scores = new LinkedHashMap<>();
+    scores.put("player1", Double.NEGATIVE_INFINITY);
+    scores.put("player2", 0D);
+    scores.put("player3", 1D);
+    scores.put("player4", Double.POSITIVE_INFINITY);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+    jedis.zadd(SORTED_SET_KEY1, scores);
+
+    jedis.zunionstore(NEW_SET, new ZParams().weights(1), SORTED_SET_KEY1);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+
+    jedis.zunionstore(NEW_SET, new ZParams().weights(0), SORTED_SET_KEY1);
+
+    expectedResults = convertToTuples(scores, (i, x) -> 0D);
+    results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+
+    jedis.zunionstore(NEW_SET, new 
ZParams().weights(Double.POSITIVE_INFINITY), SORTED_SET_KEY1);
+
+    expectedResults = convertToTuples(scores, (i, x) -> x == 1 ? 
Double.POSITIVE_INFINITY : x);
+    results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+
+    jedis.zunionstore(NEW_SET, new 
ZParams().weights(Double.NEGATIVE_INFINITY), SORTED_SET_KEY1);
+
+    expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player3", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player4", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player1", Double.POSITIVE_INFINITY));
+    results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenASingleSet() {
+    Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+    jedis.zadd(SORTED_SET_KEY1, scores);
+
+    assertThat(jedis.zunionstore(NEW_SET, SORTED_SET_KEY1)).isEqualTo(10);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenOneSetDoesNotExist() {
+    Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+    jedis.zadd(SORTED_SET_KEY1, scores);
+
+    jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenWeight() {
+    Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x * 1.5);
+    jedis.zadd(SORTED_SET_KEY1, scores);
+
+    jedis.zunionstore(SORTED_SET_KEY1, new ZParams().weights(1.5), 
SORTED_SET_KEY1);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void 
shouldUnionizeWithWeightAndDefaultAggregate_givenMultipleSetsWithWeights() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) (9 - x));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (x * 2.0) 
+ ((9 - x) * 1.5));
+
+    jedis.zunionstore(NEW_SET, new ZParams().weights(2.0, 1.5), 
SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenMinAggregate() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> 0D);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores2, (i, x) -> x);
+
+    jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MIN),
+        SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenMaxAggregate() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? 0 : x));
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? x : 0));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double) 
i);
+
+    jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MAX),
+        SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void 
shouldUnionizeUsingLastAggregate_givenMultipleAggregateKeywords() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) 0);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) 1);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores2, (i, x) -> x);
+
+    jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, NEW_SET, "2",
+        SORTED_SET_KEY1, SORTED_SET_KEY2, "AGGREGATE", "MIN", "AGGREGATE", 
"MAX");
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenMaxAggregateAndMultipleWeights() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? 0 : x));
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) ((x % 2 == 0) 
? x : 0));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double) 
(i * 2));
+
+    jedis.zunionstore(NEW_SET, new 
ZParams().aggregate(ZParams.Aggregate.MAX).weights(2, 2),
+        SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenSumAggregateAndMultipleSets() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) (x * 2));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Map<String, Double> scores3 = makeScoreMap(10, x -> (double) (x * 3));
+    jedis.zadd(SORTED_SET_KEY3, scores3);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x * 6);
+
+    jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.SUM),
+        SORTED_SET_KEY1, SORTED_SET_KEY2, SORTED_SET_KEY3);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenSetsDoNotOverlap() {
+    Map<String, Double> scores1 = makeScoreMap(0, 2, 10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(1, 2, 10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults =
+        convertToTuples(makeScoreMap(0, 1, 20, x -> (double) x), (i, x) -> x);
+
+    jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 20);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_givenSetsPartiallyOverlap() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(5, 1, 10, x -> (double) (x < 10 
? x : x * 2));
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(makeScoreMap(0, 1, 15, x -> 
(double) x),
+        (i, x) -> (double) (i < 5 ? i : i * 2));
+
+    jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 20);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void ensureWeightsAreAppliedBeforeAggregation() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x * 5);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Map<String, Double> scores2 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY2, scores2);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double) 
(i * 10));
+
+    jedis.zunionstore(SORTED_SET_KEY1,
+        new ZParams().weights(1, 10).aggregate(ZParams.Aggregate.MAX), 
SORTED_SET_KEY1,
+        SORTED_SET_KEY2);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 20);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldUnionize_whenTargetExistsAndSetsAreDuplicated() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x * 2);
+
+    // Default aggregation is SUM
+    jedis.zunionstore(SORTED_SET_KEY1, SORTED_SET_KEY1, SORTED_SET_KEY1);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldPreserveSet_givenDestinationAndSourceAreTheSame() {
+    Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+    jedis.zadd(SORTED_SET_KEY1, scores1);
+
+    Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x);
+
+    jedis.zunionstore(SORTED_SET_KEY1, SORTED_SET_KEY1);
+
+    Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+    assertThat(results).containsExactlyElementsOf(expectedResults);
+  }
+
+  @Test
+  public void ensureSetConsistency_andNoExceptions_whenRunningConcurrently() {
+    int scoreCount = 1000;
+    jedis.zadd("{A}ones", makeScoreMap(scoreCount, x -> 1D));
+
+    Map<String, Double> scores1 = makeScoreMap(scoreCount, x -> 1D);
+    jedis.zadd("{A}scores1", scores1);
+    Map<String, Double> scores2 = makeScoreMap(scoreCount, x -> 2D);
+    jedis.zadd("{A}scores2", scores2);
+
+    new ConcurrentLoopingThreads(1000,
+        i -> jedis.zadd("{A}scores1", (double) i, String.format("member-%05d", 
i)),
+        i -> jedis.zadd("{A}scores2", (double) i, String.format("member-%05d", 
i)),
+        i -> jedis.zunionstore("{A}maxSet", new 
ZParams().aggregate(ZParams.Aggregate.MAX),
+            "{A}scores1", "{A}scores2"),
+        // This ensures that the lock ordering for keys is working
+        i -> jedis.zunionstore("{A}minSet", new 
ZParams().aggregate(ZParams.Aggregate.MIN),
+            "{A}scores2", "{A}scores1"))
+                .runWithAction(() -> 
assertThat(jedis.zrangeWithScores("{A}maxSet", 0, scoreCount))
+                    .hasSize(scoreCount));
+  }
+
+  private Map<String, Double> makeScoreMap(int count, Function<Integer, 
Double> scoreProducer) {
+    return makeScoreMap(0, 1, count, scoreProducer);
+  }
+
+  private Map<String, Double> makeScoreMap(int startIndex, int increment, int 
count,
+      Function<Integer, Double> scoreProducer) {
+    Map<String, Double> map = new LinkedHashMap<>();
+
+    int index = startIndex;
+    for (int i = 0; i < count; i++) {
+      map.put(String.format("member-%05d", index), scoreProducer.apply(index));
+      index += increment;
+    }
+    return map;
+  }
+
+  private Set<Tuple> convertToTuples(Map<String, Double> map,
+      BiFunction<Integer, Double, Double> function) {
+    Set<Tuple> tuples = new LinkedHashSet<>();
+    int x = 0;
+    for (Map.Entry<String, Double> e : map.entrySet()) {
+      tuples.add(new Tuple(e.getKey().getBytes(), function.apply(x++, 
e.getValue())));
+    }
+
+    return tuples;
+  }
+}
diff --git 
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreIntegrationTest.java
 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreIntegrationTest.java
new file mode 100755
index 0000000..7930ae3
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreIntegrationTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+
+public class ZUnionStoreIntegrationTest extends 
AbstractZUnionStoreIntegrationTest {
+
+  @ClassRule
+  public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+  @Override
+  public int getPort() {
+    return server.getPort();
+  }
+
+}
diff --git 
a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
 
b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 7606ec6..950fa69 100644
--- 
a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ 
b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -5,6 +5,7 @@ 
org/apache/geode/redis/internal/collections/SizeableObjectOpenCustomHashSet
 org/apache/geode/redis/internal/collections/IndexibleTreeSet
 org/apache/geode/redis/internal/data/RedisDataMovedException
 org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
+org/apache/geode/redis/internal/executor/sortedset/ZAggregator
 org/apache/geode/redis/internal/RedisException
 org/apache/geode/redis/internal/data/RedisHash$Hash
 org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index ee7bbe1..dd82e62 100755
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -104,6 +104,7 @@ import 
org.apache.geode.redis.internal.executor.sortedset.ZRevRangeByScoreExecut
 import org.apache.geode.redis.internal.executor.sortedset.ZRevRangeExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZRevRankExecutor;
 import org.apache.geode.redis.internal.executor.sortedset.ZScoreExecutor;
+import org.apache.geode.redis.internal.executor.sortedset.ZUnionStoreExecutor;
 import org.apache.geode.redis.internal.executor.string.AppendExecutor;
 import org.apache.geode.redis.internal.executor.string.BitCountExecutor;
 import org.apache.geode.redis.internal.executor.string.BitOpExecutor;
@@ -238,6 +239,7 @@ public enum RedisCommandType {
   ZREVRANGEBYSCORE(new ZRevRangeByScoreExecutor(), SUPPORTED, new 
MinimumParameterRequirements(4)),
   ZREVRANK(new ZRevRankExecutor(), SUPPORTED, new 
ExactParameterRequirements(3)),
   ZSCORE(new ZScoreExecutor(), SUPPORTED, new ExactParameterRequirements(3)),
+  ZUNIONSTORE(new ZUnionStoreExecutor(), SUPPORTED, new 
MinimumParameterRequirements(4)),
 
   /************* Server *****************/
   SLOWLOG(new SlowlogExecutor(), SUPPORTED, new 
SlowlogParameterRequirements()),
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
index ca49380..542825a 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
@@ -48,7 +48,6 @@ public class RedisConstants {
   public static final String ERROR_MIN_MAX_NOT_A_FLOAT = "min or max is not a 
float";
   public static final String ERROR_OOM_COMMAND_NOT_ALLOWED =
       "command not allowed when used memory > 'maxmemory'";
-
   public static final String ERROR_UNKNOWN_SLOWLOG_SUBCOMMAND =
       "Unknown subcommand or wrong number of arguments for '%s'. Try SLOWLOG 
HELP.";
   public static final String ERROR_UNKNOWN_CLUSTER_SUBCOMMAND =
@@ -64,4 +63,8 @@ public class RedisConstants {
   public static final String ERROR_INVALID_TTL = "Invalid TTL value, must be 
>= 0";
   public static final String ERROR_RESTORE_INVALID_PAYLOAD =
       "DUMP payload version or checksum are wrong";
+  public static final String ERROR_WRONG_SLOT =
+      "Keys in request don't hash to the same slot";
+  public static final String ERROR_WEIGHT_NOT_A_FLOAT =
+      "weight value is not a float";
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index 9464eb5..77df08b 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -25,7 +25,6 @@ import static 
org.apache.geode.redis.internal.data.RedisDataType.REDIS_SET;
 import static 
org.apache.geode.redis.internal.data.RedisDataType.REDIS_SORTED_SET;
 import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_STRING;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -150,7 +149,7 @@ public class RegionProvider {
     return redisStats;
   }
 
-  public <T> T execute(Object key, Callable<T> callable) {
+  public <T> T execute(RedisKey key, Callable<T> callable) {
     try {
       return partitionedRegion.computeWithPrimaryLocked(key,
           () -> stripedCoordinator.execute(key, callable));
@@ -163,6 +162,19 @@ public class RegionProvider {
     }
   }
 
+  public <T> T execute(RedisKey key, List<RedisKey> keysToLock, Callable<T> 
callable) {
+    try {
+      return partitionedRegion.computeWithPrimaryLocked(key,
+          () -> stripedCoordinator.execute(keysToLock, callable));
+    } catch (PrimaryBucketLockException | BucketMovedException | 
RegionDestroyedException ex) {
+      throw createRedisDataMovedException((RedisKey) key);
+    } catch (RedisException bex) {
+      throw bex;
+    } catch (Exception ex) {
+      throw new RedisException(ex);
+    }
+  }
+
   public RedisData getRedisData(RedisKey key) {
     return getRedisData(key, NullRedisDataStructures.NULL_REDIS_DATA);
   }
@@ -299,20 +311,22 @@ public class RegionProvider {
   }
 
   /**
-   * A means to consistently order 2 keys for locking to avoid typical 
deadlock situations.
-   *
-   * @return the keys ordered in the sequence in which they should be locked.
+   * A means to consistently order multiple keys for locking to avoid typical 
deadlock situations.
+   * Note that the list of keys is sorted in place.
    */
-  public List<RedisKey> orderForLocking(RedisKey key1, RedisKey key2) {
-    List<RedisKey> orderedKeys = new ArrayList<>();
-    if (stripedCoordinator.compareStripes(key1, key2) > 0) {
-      orderedKeys.add(key1);
-      orderedKeys.add(key2);
-    } else {
-      orderedKeys.add(key2);
-      orderedKeys.add(key1);
-    }
+  public void orderForLocking(List<RedisKey> keys) {
+    keys.sort(stripedCoordinator::compareStripes);
+  }
 
-    return orderedKeys;
+  /**
+   * Check if a key would be stored locally (in a primary bucket on this 
server). Otherwise throw a
+   * {@link RedisDataMovedException}. Note that this will not check for the 
actual existence of the
+   * key.
+   */
+  public void ensureKeyIsLocal(RedisKey key) {
+    if (!slotAdvisor.isLocal(key)) {
+      throw createRedisDataMovedException(key);
+    }
   }
+
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
index 56f3e4f..a4bd517 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
@@ -17,7 +17,6 @@
 package org.apache.geode.redis.internal.data;
 
 
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
index 757f276..a1a9082 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
@@ -44,15 +44,16 @@ public abstract class RedisDataCommandsFunctionExecutor {
     return regionProvider.getLocalDataRegion();
   }
 
-  protected <T> T stripedExecute(Object key, Callable<T> callable) {
+  protected <T> T stripedExecute(RedisKey key, Callable<T> callable) {
     return regionProvider.execute(key, callable);
   }
 
+  protected <T> T stripedExecute(RedisKey key, List<RedisKey> keysToLock, 
Callable<T> callable) {
+    return regionProvider.execute(key, keysToLock, callable);
+  }
+
   protected RedisData getRedisData(RedisKey key) {
     return regionProvider.getRedisData(key);
   }
 
-  protected List<RedisKey> orderForLocking(RedisKey key1, RedisKey key2) {
-    return regionProvider.orderForLocking(key1, key2);
-  }
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
index 986e1a9..9b67953 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
@@ -17,6 +17,7 @@
 package org.apache.geode.redis.internal.data;
 
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.geode.redis.internal.RegionProvider;
@@ -135,7 +136,8 @@ public class RedisKeyCommandsFunctionExecutor extends 
RedisDataCommandsFunctionE
 
   @Override
   public boolean rename(RedisKey oldKey, RedisKey newKey) {
-    List<RedisKey> orderedKeys = orderForLocking(oldKey, newKey);
+    List<RedisKey> orderedKeys = Arrays.asList(oldKey, newKey);
+    getRegionProvider().orderForLocking(orderedKeys);
     return stripedExecute(orderedKeys.get(0), () -> 
rename0(orderedKeys.get(1), oldKey, newKey));
   }
 
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
index 85ec034..854b8cd 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
@@ -19,12 +19,14 @@ package org.apache.geode.redis.internal.data;
 import static java.lang.Double.compare;
 import static org.apache.geode.internal.JvmSizeUtils.memoryOverhead;
 import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_A_VALID_FLOAT;
+import static 
org.apache.geode.redis.internal.data.NullRedisDataStructures.NULL_REDIS_SORTED_SET;
 import static 
org.apache.geode.redis.internal.data.RedisDataType.REDIS_SORTED_SET;
 import static org.apache.geode.redis.internal.netty.Coder.bytesToDouble;
 import static org.apache.geode.redis.internal.netty.Coder.doubleToBytes;
 import static 
org.apache.geode.redis.internal.netty.Coder.stripTrailingZeroFromDouble;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bGREATEST_MEMBER_NAME;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bLEAST_MEMBER_NAME;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bZERO;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -45,6 +47,7 @@ import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.internal.size.Sizeable;
 import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RegionProvider;
 import org.apache.geode.redis.internal.collections.OrderStatisticsTree;
 import 
org.apache.geode.redis.internal.collections.SizeableBytes2ObjectOpenCustomHashMapWithCursor;
 import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
@@ -55,6 +58,9 @@ import 
org.apache.geode.redis.internal.executor.sortedset.SortedSetLexRangeOptio
 import 
org.apache.geode.redis.internal.executor.sortedset.SortedSetRankRangeOptions;
 import 
org.apache.geode.redis.internal.executor.sortedset.SortedSetScoreRangeOptions;
 import org.apache.geode.redis.internal.executor.sortedset.ZAddOptions;
+import org.apache.geode.redis.internal.executor.sortedset.ZAggregator;
+import org.apache.geode.redis.internal.executor.sortedset.ZKeyWeight;
+import org.apache.geode.redis.internal.netty.Coder;
 
 public class RedisSortedSet extends AbstractRedisData {
   protected static final int REDIS_SORTED_SET_OVERHEAD = 
memoryOverhead(RedisSortedSet.class);
@@ -379,6 +385,51 @@ public class RedisSortedSet extends AbstractRedisData {
     return null;
   }
 
+  long zunionstore(RegionProvider regionProvider, RedisKey key, 
List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, 
keyWeight.getKey(), false);
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+      double weight = keyWeight.getWeight();
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          byte[] scoreBytes;
+          // Redis math and Java math are different when handling infinity. 
Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            scoreBytes = bZERO;
+          } else if (weight == 1) {
+            scoreBytes = entry.getScoreBytes();
+          } else {
+            double newScore = entry.score * weight;
+            if (Double.isNaN(newScore)) {
+              scoreBytes = entry.getScoreBytes();
+            } else {
+              scoreBytes = Coder.doubleToBytes(entry.score * weight);
+            }
+          }
+          members.put(entry.member, new OrderedSetEntry(entry.member, 
scoreBytes));
+          continue;
+        }
+
+        
existingValue.updateScore(aggregator.getFunction().apply(existingValue.score,
+            entry.score * weight));
+      }
+    }
+
+    scoreSet.addAll(members.values());
+
+    regionProvider.getLocalDataRegion().put(key, this);
+
+    return getSortedSetSize();
+  }
+
   private List<byte[]> zpop(Iterator<AbstractOrderedSetEntry> scoresIterator,
       Region<RedisKey, RedisData> region, RedisKey key) {
     if (!scoresIterator.hasNext()) {
@@ -663,6 +714,13 @@ public class RedisSortedSet extends AbstractRedisData {
         score = processByteArrayAsDouble(newScore);
       }
     }
+
+    public double updateScore(double newScore) {
+      score = newScore;
+      scoreBytes = Coder.doubleToBytes(newScore);
+
+      return score;
+    }
   }
 
   // Dummy entry used to find the rank of an element with the given score for 
inclusive or
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
index 03c0773..959e803 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
@@ -17,6 +17,8 @@
 package org.apache.geode.redis.internal.data;
 
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.geode.redis.internal.RegionProvider;
@@ -25,6 +27,8 @@ import 
org.apache.geode.redis.internal.executor.sortedset.SortedSetLexRangeOptio
 import 
org.apache.geode.redis.internal.executor.sortedset.SortedSetRankRangeOptions;
 import 
org.apache.geode.redis.internal.executor.sortedset.SortedSetScoreRangeOptions;
 import org.apache.geode.redis.internal.executor.sortedset.ZAddOptions;
+import org.apache.geode.redis.internal.executor.sortedset.ZAggregator;
+import org.apache.geode.redis.internal.executor.sortedset.ZKeyWeight;
 
 public class RedisSortedSetCommandsFunctionExecutor extends 
RedisDataCommandsFunctionExecutor
     implements RedisSortedSetCommands {
@@ -132,4 +136,23 @@ public class RedisSortedSetCommandsFunctionExecutor 
extends RedisDataCommandsFun
   public byte[] zscore(RedisKey key, byte[] member) {
     return stripedExecute(key, () -> getRedisSortedSet(key, 
true).zscore(member));
   }
+
+  @Override
+  public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisKey> keysToLock = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight kw : keyWeights) {
+      getRegionProvider().ensureKeyIsLocal(kw.getKey());
+      keysToLock.add(kw.getKey());
+    }
+    getRegionProvider().ensureKeyIsLocal(destinationKey);
+    keysToLock.add(destinationKey);
+
+    getRegionProvider().orderForLocking(keysToLock);
+
+    return stripedExecute(destinationKey, keysToLock,
+        () -> new 
RedisSortedSet(Collections.emptyList()).zunionstore(getRegionProvider(),
+            destinationKey, keyWeights, aggregator));
+  }
+
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index 2f0a775..d5156d6 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -133,6 +133,10 @@ public class RedisResponse {
     return new RedisResponse((bba) -> Coder.getOOMResponse(bba, error));
   }
 
+  public static RedisResponse crossSlot(String error) {
+    return new RedisResponse((bba) -> Coder.getCrossSlotResponse(bba, error));
+  }
+
   public static RedisResponse busykey(String error) {
     return new RedisResponse((bba) -> Coder.getBusyKeyResponse(bba, error));
   }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
index e447dbe..14e4419 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
@@ -56,4 +56,7 @@ public interface RedisSortedSetCommands {
   long zrevrank(RedisKey key, byte[] member);
 
   byte[] zscore(RedisKey key, byte[] member);
+
+  long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights, 
ZAggregator aggregator);
+
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java
new file mode 100644
index 0000000..c33e15e
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import java.util.function.BiFunction;
+
+/**
+ * Enums representing aggregation functions used in {@link 
ZUnionStoreExecutor} and
+ * {@link ZInterStoreExecutor}.
+ */
+public enum ZAggregator {
+
+  SUM(Double::sum),
+  MIN(Math::min),
+  MAX(Math::max);
+
+  private final BiFunction<Double, Double, Double> function;
+
+  ZAggregator(BiFunction<Double, Double, Double> function) {
+    this.function = function;
+  }
+
+  public BiFunction<Double, Double, Double> getFunction() {
+    return function;
+  }
+}
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZKeyWeight.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZKeyWeight.java
new file mode 100644
index 0000000..7eaf203
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZKeyWeight.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Simple class to hold the key and weight associations used in {@link 
ZUnionStoreExecutor} and
+ * {@link ZInterStoreExecutor}.
+ */
+public class ZKeyWeight {
+  private final RedisKey key;
+  private final double weight;
+
+  public ZKeyWeight(RedisKey key, double weight) {
+    this.key = key;
+    this.weight = weight;
+  }
+
+  public RedisKey getKey() {
+    return key;
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+}
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
new file mode 100644
index 0000000..fe96aea
--- /dev/null
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static 
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext 
context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException nex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array 
sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!weights.isEmpty()) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = 
ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (sourceKeys.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    int bucket = command.getKey().getBucketId();
+    for (RedisKey key : sourceKeys) {
+      if (key.getBucketId() != bucket) {
+        return RedisResponse.crossSlot(ERROR_WRONG_SLOT);
+      }
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>();
+    for (int i = 0; i < sourceKeys.size(); i++) {
+      double weight = weights.isEmpty() ? 1 : weights.get(i);
+      keyWeights.add(new ZKeyWeight(sourceKeys.get(i), weight));
+    }
+
+    long result = context.getSortedSetCommands()
+        .zunionstore(command.getKey(), keyWeights, aggregator);
+
+    return RedisResponse.integer(result);
+  }
+
+}
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 0023d77..461b970 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -26,6 +26,7 @@ import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.NUMBER_0
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.SIMPLE_STRING_ID;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bBUSYKEY;
 import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bCRLF;
+import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bCROSSSLOT;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bEMPTY_ARRAY;
 import static 
org.apache.geode.redis.internal.netty.StringBytesGlossary.bEMPTY_STRING;
 import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bERR;
@@ -220,6 +221,10 @@ public class Coder {
     return getErrorResponse0(buffer, bWRONGTYPE, error);
   }
 
+  public static ByteBuf getCrossSlotResponse(ByteBuf buffer, String error) {
+    return getErrorResponse0(buffer, bCROSSSLOT, error);
+  }
+
   public static ByteBuf getBusyKeyResponse(ByteBuf buffer, String error) {
     byte[] errorAr = stringToBytes(error);
     buffer.writeByte(ERROR_ID);
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
index cc4f3f8..7f422d8 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
@@ -97,6 +97,9 @@ public class StringBytesGlossary {
   @MakeImmutable
   public static final byte[] bBUSYKEY = stringToBytes("BUSYKEY ");
 
+  @MakeImmutable
+  public static final byte[] bCROSSSLOT = stringToBytes("CROSSSLOT ");
+
   // ********** Redis Command constants **********
 
   // ClusterExecutor
@@ -163,6 +166,12 @@ public class StringBytesGlossary {
   @MakeImmutable
   public static final byte[] bINCR = stringToBytes("INCR");
 
+  // ZUnionStoreExecutor
+  @MakeImmutable
+  public static final byte[] bWEIGHTS = stringToBytes("WEIGHTS");
+  @MakeImmutable
+  public static final byte[] bAGGREGATE = stringToBytes("AGGREGATE");
+
   // SetExecutor
   @MakeImmutable
   public static final byte[] bEX = stringToBytes("EX");
@@ -183,6 +192,8 @@ public class StringBytesGlossary {
   public static final byte bPLUS = SIMPLE_STRING_ID; // +
   public static final byte bMINUS = ERROR_ID; // -
 
+  public static final byte[] bZERO = stringToBytes("0");
+
   // ********** Constants for Double Infinity comparisons **********
   public static final String P_INF = "+inf";
   public static final String INF = "inf";
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
index 19efa75..3ceb638 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
@@ -15,8 +15,11 @@
 
 package org.apache.geode.redis.internal.services;
 
+import java.util.List;
 import java.util.concurrent.Callable;
 
+import org.apache.geode.redis.internal.data.RedisKey;
+
 
 /**
  * Allows users to "stripe" their execution in such a way that all tasks 
belonging to one stripe are
@@ -33,7 +36,9 @@ public interface StripedCoordinator {
    * @param stripeId defines the "stripe"
    * @param callable the unit of work to do sequentially. May be called after 
run returns.
    */
-  <T> T execute(Object stripeId, Callable<T> callable);
+  <T> T execute(RedisKey stripeId, Callable<T> callable);
+
+  <T> T execute(List<RedisKey> stripeIds, Callable<T> callable);
 
   int compareStripes(Object object1, Object object2);
 }
diff --git 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
index 5cdeb17..7d435b6 100644
--- 
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
+++ 
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
@@ -15,8 +15,11 @@
 
 package org.apache.geode.redis.internal.services;
 
+import java.util.List;
 import java.util.concurrent.Callable;
 
+import org.apache.geode.redis.internal.data.RedisKey;
+
 /**
  * Implements {@link StripedCoordinator} by using
  * synchronization. The thread that calls execute will also be the thread that 
does the work. But it
@@ -39,7 +42,7 @@ public class SynchronizedStripedCoordinator implements 
StripedCoordinator {
   }
 
   @Override
-  public <T> T execute(Object stripeId, Callable<T> callable) {
+  public <T> T execute(RedisKey stripeId, Callable<T> callable) {
     synchronized (getSync(stripeId)) {
       try {
         return callable.call();
@@ -51,6 +54,21 @@ public class SynchronizedStripedCoordinator implements 
StripedCoordinator {
     }
   }
 
+  @Override
+  public <T> T execute(List<RedisKey> stripeIds, Callable<T> callable) {
+    return execute(stripeIds, 0, callable);
+  }
+
+  private <T> T execute(List<RedisKey> stripeIds, int index, Callable<T> 
callable) {
+    if (index + 1 == stripeIds.size()) {
+      return execute(stripeIds.get(index), callable);
+    }
+
+    synchronized (getSync(stripeIds.get(index))) {
+      return execute(stripeIds, ++index, callable);
+    }
+  }
+
   private Object getSync(Object stripeId) {
     return syncs[getStripeIndex(stripeId)];
   }
diff --git 
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
 
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
index d08b2e9..8b62ac0 100644
--- 
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
+++ 
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
@@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import org.junit.Test;
 
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.services.StripedCoordinator;
 import org.apache.geode.redis.internal.services.SynchronizedStripedCoordinator;
 
@@ -42,7 +43,7 @@ public class SynchronizedStripedCoordinatorTest {
     assertThat(result).isEqualTo("OK");
   }
 
-  private static class Hashy {
+  private static class Hashy extends RedisKey {
     private final int hashcode;
 
     public Hashy(int hashcode) {

Reply via email to