This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new ae8d674 GEODE-9518: Implement Radish ZUNIONSTORE command (#6794)
ae8d674 is described below
commit ae8d6745a7a1c4ce3a1c3f80c80ca906f53d5e82
Author: Jens Deppe <[email protected]>
AuthorDate: Wed Sep 1 18:41:52 2021 -0700
GEODE-9518: Implement Radish ZUNIONSTORE command (#6794)
- Add ability to lock multiple keys in a consistent order
- Add validation option to ConcurrentLoopingThreads
- Add CROSSSLOTS error message
---
.../ZUnionStoreNativeRedisAcceptanceTest.java | 36 ++
.../geode/redis/ConcurrentLoopingThreads.java | 51 ++-
.../redis/RedisCommandArgumentsTestHelper.java | 2 +-
.../server/AbstractHitsMissesIntegrationTest.java | 7 +
.../set/AbstractSUnionIntegrationTest.java | 22 +
.../AbstractZUnionStoreIntegrationTest.java | 486 +++++++++++++++++++++
.../sortedset/ZUnionStoreIntegrationTest.java | 31 ++
.../apache/geode/codeAnalysis/excludedClasses.txt | 1 +
.../geode/redis/internal/RedisCommandType.java | 2 +
.../geode/redis/internal/RedisConstants.java | 5 +-
.../geode/redis/internal/RegionProvider.java | 44 +-
.../redis/internal/data/NullRedisSortedSet.java | 1 -
.../data/RedisDataCommandsFunctionExecutor.java | 9 +-
.../data/RedisKeyCommandsFunctionExecutor.java | 4 +-
.../geode/redis/internal/data/RedisSortedSet.java | 58 +++
.../RedisSortedSetCommandsFunctionExecutor.java | 23 +
.../redis/internal/executor/RedisResponse.java | 4 +
.../executor/sortedset/RedisSortedSetCommands.java | 3 +
.../internal/executor/sortedset/ZAggregator.java | 39 ++
.../internal/executor/sortedset/ZKeyWeight.java | 40 ++
.../executor/sortedset/ZUnionStoreExecutor.java | 126 ++++++
.../apache/geode/redis/internal/netty/Coder.java | 5 +
.../redis/internal/netty/StringBytesGlossary.java | 11 +
.../internal/services/StripedCoordinator.java | 7 +-
.../services/SynchronizedStripedCoordinator.java | 20 +-
.../SynchronizedStripedCoordinatorTest.java | 3 +-
26 files changed, 1005 insertions(+), 35 deletions(-)
diff --git
a/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreNativeRedisAcceptanceTest.java
b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreNativeRedisAcceptanceTest.java
new file mode 100755
index 0000000..cfef4cc
--- /dev/null
+++
b/geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreNativeRedisAcceptanceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.NativeRedisClusterTestRule;
+
+public class ZUnionStoreNativeRedisAcceptanceTest extends
AbstractZUnionStoreIntegrationTest {
+
+ @ClassRule
+ public static NativeRedisClusterTestRule server = new
NativeRedisClusterTestRule();
+
+ @Override
+ public int getPort() {
+ return server.getExposedPorts().get(0);
+ }
+
+ @Override
+ public void flushAll() {
+ server.flushAll();
+ }
+
+}
diff --git
a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
index f2302f5..56813e0 100644
---
a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
+++
b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
@@ -33,6 +33,7 @@ public class ConcurrentLoopingThreads {
private final Consumer<Integer>[] functions;
private ExecutorService executorService = Executors.newCachedThreadPool();
private List<Future<?>> loopingFutures;
+ private Throwable actionThrowable = null;
@SafeVarargs
public ConcurrentLoopingThreads(int iterationCount,
@@ -45,15 +46,15 @@ public class ConcurrentLoopingThreads {
* Start the operations asynchronously. Use {@link #await()} to wait for
completion.
*/
public ConcurrentLoopingThreads start() {
- return start(false);
+ return start(false, null);
}
- private ConcurrentLoopingThreads start(boolean lockstep) {
- CyclicBarrier latch = new CyclicBarrier(functions.length);
+ private ConcurrentLoopingThreads start(boolean lockstep, Runnable
barrierAction) {
+ CyclicBarrier barrier = new CyclicBarrier(functions.length, barrierAction);
loopingFutures = Arrays
.stream(functions)
- .map(r -> new LoopingThread(r, iterationCount, latch, lockstep))
+ .map(r -> new LoopingThread(r, iterationCount, barrier, lockstep))
.map(t -> executorService.submit(t))
.collect(Collectors.toList());
@@ -84,7 +85,7 @@ public class ConcurrentLoopingThreads {
* Start operations and only return once all are complete.
*/
public void run() {
- start(false);
+ start(false, null);
await();
}
@@ -92,10 +93,41 @@ public class ConcurrentLoopingThreads {
* Start operations and run each iteration in lockstep
*/
public void runInLockstep() {
- start(true);
+ start(true, null);
await();
}
+ /**
+ * Start operations and provide an action to be performed at the end of
every iteration. This
+ * implies running in lockstep. This would typically be used to provide some
form of validation.
+ */
+ public void runWithAction(Runnable action) {
+ Runnable innerRunnable = () -> {
+ try {
+ action.run();
+ } catch (Throwable e) {
+ actionThrowable = e;
+ throw e;
+ }
+ };
+
+ start(true, innerRunnable);
+
+ try {
+ await();
+ } catch (Throwable e) {
+ if (actionThrowable != null) {
+ // This will ensure that AssertionErrors are clearly apparent
+ if (actionThrowable instanceof Error) {
+ throw (Error) actionThrowable;
+ }
+ throw new RuntimeException(actionThrowable);
+ } else {
+ throw e;
+ }
+ }
+ }
+
private static class LoopingRunnable implements Runnable {
private final Consumer<Integer> runnable;
private final int iterationCount;
@@ -112,13 +144,14 @@ public class ConcurrentLoopingThreads {
@Override
public void run() {
- waitForBarrier();
+ if (!lockstep) {
+ waitForBarrier();
+ }
for (int i = 0; i < iterationCount; i++) {
+ runnable.accept(i);
if (lockstep) {
waitForBarrier();
}
- runnable.accept(i);
- Thread.yield();
}
}
diff --git
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
index ce69883..4d5924e 100644
---
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
+++
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/RedisCommandArgumentsTestHelper.java
@@ -121,7 +121,7 @@ public class RedisCommandArgumentsTestHelper {
}
for (int i = 0; i < numArgs; i++) {
- args[i] = String.valueOf(i).getBytes();
+ args[i] = ("{A}" + i).getBytes();
}
return args;
diff --git
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
index a82fa2e..53cf67c 100644
---
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
+++
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/server/AbstractHitsMissesIntegrationTest.java
@@ -32,6 +32,7 @@ import org.junit.Ignore;
import org.junit.Test;
import redis.clients.jedis.BitOP;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ZParams;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
@@ -332,6 +333,12 @@ public abstract class AbstractHitsMissesIntegrationTest
implements RedisIntegrat
runCommandAndAssertHitsAndMisses(SORTED_SET_KEY, k ->
jedis.zrevrangeByScore(k, 1, 0));
}
+ @Test
+ public void testZUnionStore() {
+ runCommandAndAssertNoStatUpdates(SORTED_SET_KEY,
+ k -> jedis.zunionstore(k, new ZParams().weights(1, 2), k, k));
+ }
+
/************* Set related commands *************/
@Test
public void testSadd() {
diff --git
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
index 6193449..6b90111 100755
---
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
+++
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/set/AbstractSUnionIntegrationTest.java
@@ -30,6 +30,7 @@ import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Protocol;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
import org.apache.geode.redis.RedisIntegrationTest;
import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -198,4 +199,25 @@ public abstract class AbstractSUnionIntegrationTest
implements RedisIntegrationT
assertThat(jedis.smembers("{user1}master").toArray())
.containsExactlyInAnyOrder(masterSet.toArray());
}
+
+ @Test
+ public void doesNotThrowExceptions_whenConcurrentSaddAndSunionExecute() {
+ final int ENTRIES = 1000;
+ Set<String> set1 = new HashSet<>();
+ Set<String> set2 = new HashSet<>();
+ for (int i = 0; i < ENTRIES; i++) {
+ set1.add("value-1-" + i);
+ set2.add("value-2-" + i);
+ }
+
+ jedis.sadd("{player1}key1", set1.toArray(new String[] {}));
+ jedis.sadd("{player1}key2", set2.toArray(new String[] {}));
+
+ new ConcurrentLoopingThreads(ENTRIES,
+ i -> jedis.sunion("{player1}key1", "{player1}key2"),
+ i -> jedis.sadd("{player1}key1", "newValue-1-" + i),
+ i -> jedis.sadd("{player1}key2", "newValue-2-" + i))
+ .runInLockstep();
+ }
+
}
diff --git
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
new file mode 100755
index 0000000..f1036b5
--- /dev/null
+++
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZUnionStoreIntegrationTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static
org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static
org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZUnionStoreIntegrationTest implements
RedisIntegrationTest {
+
+ private static final String NEW_SET = "{user1}new";
+ private static final String SORTED_SET_KEY1 = "{user1}sset1";
+ private static final String SORTED_SET_KEY2 = "{user1}sset2";
+ private static final String SORTED_SET_KEY3 = "{user1}sset3";
+
+ private JedisCluster jedis;
+
+ @Before
+ public void setUp() {
+ jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()),
REDIS_CLIENT_TIMEOUT);
+ }
+
+ @After
+ public void tearDown() {
+ flushAll();
+ jedis.close();
+ }
+
+ @Test
+ public void shouldError_givenWrongKeyType() {
+ final String STRING_KEY = "{user1}stringKey";
+ jedis.set(STRING_KEY, "value");
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "2", STRING_KEY,
+ SORTED_SET_KEY1))
+ .hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE);
+ }
+
+ @Test
+ public void shouldError_givenSetsCrossSlots() {
+ final String WRONG_KEY = "{user2}another";
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "2", WRONG_KEY,
+ SORTED_SET_KEY1))
+ .hasMessage("CROSSSLOT " + RedisConstants.ERROR_WRONG_SLOT);
+ }
+
+ @Test
+ public void shouldError_givenTooFewArguments() {
+ assertAtLeastNArgs(jedis, Protocol.Command.ZUNIONSTORE, 3);
+ }
+
+ @Test
+ public void shouldError_givenNumkeysTooLarge() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "2",
+ SORTED_SET_KEY1))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenNumkeysTooSmall() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, SORTED_SET_KEY2))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenTooManyWeights() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, "WEIGHTS", "2", "3"))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenTooFewWeights() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "2",
+ SORTED_SET_KEY1, SORTED_SET_KEY2, "WEIGHTS", "1"))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenWeightNotANumber() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, "WEIGHTS", "not-a-number"))
+ .hasMessage("ERR " + RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT);
+ }
+
+ @Test
+ public void shouldError_givenWeightsWithoutAnyValues() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, "WEIGHTS"))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenMultipleWeightKeywords() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, "WEIGHT", "1.0", "WEIGHT", "2.0"))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenUnknownAggregate() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, "AGGREGATE", "UNKNOWN", "WEIGHTS", "1"))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenAggregateKeywordWithoutValue() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, "AGGREGATE"))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldError_givenMultipleAggregates() {
+ assertThatThrownBy(
+ () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE,
NEW_SET, "1",
+ SORTED_SET_KEY1, "WEIGHTS", "1", "AGGREGATE", "SUM", "MIN"))
+ .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+ }
+
+ @Test
+ public void shouldUnionize_givenBoundaryScoresAndWeights() {
+ Map<String, Double> scores = new LinkedHashMap<>();
+ scores.put("player1", Double.NEGATIVE_INFINITY);
+ scores.put("player2", 0D);
+ scores.put("player3", 1D);
+ scores.put("player4", Double.POSITIVE_INFINITY);
+ Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+ jedis.zadd(SORTED_SET_KEY1, scores);
+
+ jedis.zunionstore(NEW_SET, new ZParams().weights(1), SORTED_SET_KEY1);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+
+ jedis.zunionstore(NEW_SET, new ZParams().weights(0), SORTED_SET_KEY1);
+
+ expectedResults = convertToTuples(scores, (i, x) -> 0D);
+ results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+
+ jedis.zunionstore(NEW_SET, new
ZParams().weights(Double.POSITIVE_INFINITY), SORTED_SET_KEY1);
+
+ expectedResults = convertToTuples(scores, (i, x) -> x == 1 ?
Double.POSITIVE_INFINITY : x);
+ results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+
+ jedis.zunionstore(NEW_SET, new
ZParams().weights(Double.NEGATIVE_INFINITY), SORTED_SET_KEY1);
+
+ expectedResults = new LinkedHashSet<>();
+ expectedResults.add(new Tuple("player3", Double.NEGATIVE_INFINITY));
+ expectedResults.add(new Tuple("player4", Double.NEGATIVE_INFINITY));
+ expectedResults.add(new Tuple("player2", 0D));
+ expectedResults.add(new Tuple("player1", Double.POSITIVE_INFINITY));
+ results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenASingleSet() {
+ Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+ Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+ jedis.zadd(SORTED_SET_KEY1, scores);
+
+ assertThat(jedis.zunionstore(NEW_SET, SORTED_SET_KEY1)).isEqualTo(10);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenOneSetDoesNotExist() {
+ Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+ Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+ jedis.zadd(SORTED_SET_KEY1, scores);
+
+ jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenWeight() {
+ Map<String, Double> scores = makeScoreMap(10, x -> (double) x);
+ Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x * 1.5);
+ jedis.zadd(SORTED_SET_KEY1, scores);
+
+ jedis.zunionstore(SORTED_SET_KEY1, new ZParams().weights(1.5),
SORTED_SET_KEY1);
+
+ Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void
shouldUnionizeWithWeightAndDefaultAggregate_givenMultipleSetsWithWeights() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(10, x -> (double) (9 - x));
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (x * 2.0)
+ ((9 - x) * 1.5));
+
+ jedis.zunionstore(NEW_SET, new ZParams().weights(2.0, 1.5),
SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenMinAggregate() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(10, x -> 0D);
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults = convertToTuples(scores2, (i, x) -> x);
+
+ jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MIN),
+ SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenMaxAggregate() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) ((x % 2 == 0)
? 0 : x));
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(10, x -> (double) ((x % 2 == 0)
? x : 0));
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double)
i);
+
+ jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MAX),
+ SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void
shouldUnionizeUsingLastAggregate_givenMultipleAggregateKeywords() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) 0);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(10, x -> (double) 1);
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults = convertToTuples(scores2, (i, x) -> x);
+
+ jedis.sendCommand(NEW_SET, Protocol.Command.ZUNIONSTORE, NEW_SET, "2",
+ SORTED_SET_KEY1, SORTED_SET_KEY2, "AGGREGATE", "MIN", "AGGREGATE",
"MAX");
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenMaxAggregateAndMultipleWeights() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) ((x % 2 == 0)
? 0 : x));
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(10, x -> (double) ((x % 2 == 0)
? x : 0));
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double)
(i * 2));
+
+ jedis.zunionstore(NEW_SET, new
ZParams().aggregate(ZParams.Aggregate.MAX).weights(2, 2),
+ SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenSumAggregateAndMultipleSets() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(10, x -> (double) (x * 2));
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Map<String, Double> scores3 = makeScoreMap(10, x -> (double) (x * 3));
+ jedis.zadd(SORTED_SET_KEY3, scores3);
+
+ Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x * 6);
+
+ jedis.zunionstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.SUM),
+ SORTED_SET_KEY1, SORTED_SET_KEY2, SORTED_SET_KEY3);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenSetsDoNotOverlap() {
+ Map<String, Double> scores1 = makeScoreMap(0, 2, 10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(1, 2, 10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults =
+ convertToTuples(makeScoreMap(0, 1, 20, x -> (double) x), (i, x) -> x);
+
+ jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 20);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_givenSetsPartiallyOverlap() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(5, 1, 10, x -> (double) (x < 10
? x : x * 2));
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults = convertToTuples(makeScoreMap(0, 1, 15, x ->
(double) x),
+ (i, x) -> (double) (i < 5 ? i : i * 2));
+
+ jedis.zunionstore(NEW_SET, SORTED_SET_KEY1, SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 20);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void ensureWeightsAreAppliedBeforeAggregation() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x * 5);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Map<String, Double> scores2 = makeScoreMap(10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY2, scores2);
+
+ Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> (double)
(i * 10));
+
+ jedis.zunionstore(SORTED_SET_KEY1,
+ new ZParams().weights(1, 10).aggregate(ZParams.Aggregate.MAX),
SORTED_SET_KEY1,
+ SORTED_SET_KEY2);
+
+ Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 20);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldUnionize_whenTargetExistsAndSetsAreDuplicated() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x * 2);
+
+ // Default aggregation is SUM
+ jedis.zunionstore(SORTED_SET_KEY1, SORTED_SET_KEY1, SORTED_SET_KEY1);
+
+ Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void shouldPreserveSet_givenDestinationAndSourceAreTheSame() {
+ Map<String, Double> scores1 = makeScoreMap(10, x -> (double) x);
+ jedis.zadd(SORTED_SET_KEY1, scores1);
+
+ Set<Tuple> expectedResults = convertToTuples(scores1, (i, x) -> x);
+
+ jedis.zunionstore(SORTED_SET_KEY1, SORTED_SET_KEY1);
+
+ Set<Tuple> results = jedis.zrangeWithScores(SORTED_SET_KEY1, 0, 10);
+
+ assertThat(results).containsExactlyElementsOf(expectedResults);
+ }
+
+ @Test
+ public void ensureSetConsistency_andNoExceptions_whenRunningConcurrently() {
+ int scoreCount = 1000;
+ jedis.zadd("{A}ones", makeScoreMap(scoreCount, x -> 1D));
+
+ Map<String, Double> scores1 = makeScoreMap(scoreCount, x -> 1D);
+ jedis.zadd("{A}scores1", scores1);
+ Map<String, Double> scores2 = makeScoreMap(scoreCount, x -> 2D);
+ jedis.zadd("{A}scores2", scores2);
+
+ new ConcurrentLoopingThreads(1000,
+ i -> jedis.zadd("{A}scores1", (double) i, String.format("member-%05d",
i)),
+ i -> jedis.zadd("{A}scores2", (double) i, String.format("member-%05d",
i)),
+ i -> jedis.zunionstore("{A}maxSet", new
ZParams().aggregate(ZParams.Aggregate.MAX),
+ "{A}scores1", "{A}scores2"),
+ // This ensures that the lock ordering for keys is working
+ i -> jedis.zunionstore("{A}minSet", new
ZParams().aggregate(ZParams.Aggregate.MIN),
+ "{A}scores2", "{A}scores1"))
+ .runWithAction(() ->
assertThat(jedis.zrangeWithScores("{A}maxSet", 0, scoreCount))
+ .hasSize(scoreCount));
+ }
+
+ private Map<String, Double> makeScoreMap(int count, Function<Integer,
Double> scoreProducer) {
+ return makeScoreMap(0, 1, count, scoreProducer);
+ }
+
+ private Map<String, Double> makeScoreMap(int startIndex, int increment, int
count,
+ Function<Integer, Double> scoreProducer) {
+ Map<String, Double> map = new LinkedHashMap<>();
+
+ int index = startIndex;
+ for (int i = 0; i < count; i++) {
+ map.put(String.format("member-%05d", index), scoreProducer.apply(index));
+ index += increment;
+ }
+ return map;
+ }
+
+ private Set<Tuple> convertToTuples(Map<String, Double> map,
+ BiFunction<Integer, Double, Double> function) {
+ Set<Tuple> tuples = new LinkedHashSet<>();
+ int x = 0;
+ for (Map.Entry<String, Double> e : map.entrySet()) {
+ tuples.add(new Tuple(e.getKey().getBytes(), function.apply(x++,
e.getValue())));
+ }
+
+ return tuples;
+ }
+}
diff --git
a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreIntegrationTest.java
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreIntegrationTest.java
new file mode 100755
index 0000000..7930ae3
--- /dev/null
+++
b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreIntegrationTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import org.junit.ClassRule;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+
+public class ZUnionStoreIntegrationTest extends
AbstractZUnionStoreIntegrationTest {
+
+ @ClassRule
+ public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+ @Override
+ public int getPort() {
+ return server.getPort();
+ }
+
+}
diff --git
a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 7606ec6..950fa69 100644
---
a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++
b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -5,6 +5,7 @@
org/apache/geode/redis/internal/collections/SizeableObjectOpenCustomHashSet
org/apache/geode/redis/internal/collections/IndexibleTreeSet
org/apache/geode/redis/internal/data/RedisDataMovedException
org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
+org/apache/geode/redis/internal/executor/sortedset/ZAggregator
org/apache/geode/redis/internal/RedisException
org/apache/geode/redis/internal/data/RedisHash$Hash
org/apache/geode/redis/internal/data/RedisRestoreKeyExistsException
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index ee7bbe1..dd82e62 100755
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -104,6 +104,7 @@ import
org.apache.geode.redis.internal.executor.sortedset.ZRevRangeByScoreExecut
import org.apache.geode.redis.internal.executor.sortedset.ZRevRangeExecutor;
import org.apache.geode.redis.internal.executor.sortedset.ZRevRankExecutor;
import org.apache.geode.redis.internal.executor.sortedset.ZScoreExecutor;
+import org.apache.geode.redis.internal.executor.sortedset.ZUnionStoreExecutor;
import org.apache.geode.redis.internal.executor.string.AppendExecutor;
import org.apache.geode.redis.internal.executor.string.BitCountExecutor;
import org.apache.geode.redis.internal.executor.string.BitOpExecutor;
@@ -238,6 +239,7 @@ public enum RedisCommandType {
ZREVRANGEBYSCORE(new ZRevRangeByScoreExecutor(), SUPPORTED, new
MinimumParameterRequirements(4)),
ZREVRANK(new ZRevRankExecutor(), SUPPORTED, new
ExactParameterRequirements(3)),
ZSCORE(new ZScoreExecutor(), SUPPORTED, new ExactParameterRequirements(3)),
+ ZUNIONSTORE(new ZUnionStoreExecutor(), SUPPORTED, new
MinimumParameterRequirements(4)),
/************* Server *****************/
SLOWLOG(new SlowlogExecutor(), SUPPORTED, new
SlowlogParameterRequirements()),
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
index ca49380..542825a 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java
@@ -48,7 +48,6 @@ public class RedisConstants {
public static final String ERROR_MIN_MAX_NOT_A_FLOAT = "min or max is not a
float";
public static final String ERROR_OOM_COMMAND_NOT_ALLOWED =
"command not allowed when used memory > 'maxmemory'";
-
public static final String ERROR_UNKNOWN_SLOWLOG_SUBCOMMAND =
"Unknown subcommand or wrong number of arguments for '%s'. Try SLOWLOG
HELP.";
public static final String ERROR_UNKNOWN_CLUSTER_SUBCOMMAND =
@@ -64,4 +63,8 @@ public class RedisConstants {
public static final String ERROR_INVALID_TTL = "Invalid TTL value, must be
>= 0";
public static final String ERROR_RESTORE_INVALID_PAYLOAD =
"DUMP payload version or checksum are wrong";
+ public static final String ERROR_WRONG_SLOT =
+ "Keys in request don't hash to the same slot";
+ public static final String ERROR_WEIGHT_NOT_A_FLOAT =
+ "weight value is not a float";
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index 9464eb5..77df08b 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -25,7 +25,6 @@ import static
org.apache.geode.redis.internal.data.RedisDataType.REDIS_SET;
import static
org.apache.geode.redis.internal.data.RedisDataType.REDIS_SORTED_SET;
import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_STRING;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -150,7 +149,7 @@ public class RegionProvider {
return redisStats;
}
- public <T> T execute(Object key, Callable<T> callable) {
+ public <T> T execute(RedisKey key, Callable<T> callable) {
try {
return partitionedRegion.computeWithPrimaryLocked(key,
() -> stripedCoordinator.execute(key, callable));
@@ -163,6 +162,19 @@ public class RegionProvider {
}
}
+ public <T> T execute(RedisKey key, List<RedisKey> keysToLock, Callable<T>
callable) {
+ try {
+ return partitionedRegion.computeWithPrimaryLocked(key,
+ () -> stripedCoordinator.execute(keysToLock, callable));
+ } catch (PrimaryBucketLockException | BucketMovedException |
RegionDestroyedException ex) {
+ throw createRedisDataMovedException((RedisKey) key);
+ } catch (RedisException bex) {
+ throw bex;
+ } catch (Exception ex) {
+ throw new RedisException(ex);
+ }
+ }
+
public RedisData getRedisData(RedisKey key) {
return getRedisData(key, NullRedisDataStructures.NULL_REDIS_DATA);
}
@@ -299,20 +311,22 @@ public class RegionProvider {
}
/**
- * A means to consistently order 2 keys for locking to avoid typical
deadlock situations.
- *
- * @return the keys ordered in the sequence in which they should be locked.
+ * A means to consistently order multiple keys for locking to avoid typical
deadlock situations.
+ * Note that the list of keys is sorted in place.
*/
- public List<RedisKey> orderForLocking(RedisKey key1, RedisKey key2) {
- List<RedisKey> orderedKeys = new ArrayList<>();
- if (stripedCoordinator.compareStripes(key1, key2) > 0) {
- orderedKeys.add(key1);
- orderedKeys.add(key2);
- } else {
- orderedKeys.add(key2);
- orderedKeys.add(key1);
- }
+ public void orderForLocking(List<RedisKey> keys) {
+ keys.sort(stripedCoordinator::compareStripes);
+ }
- return orderedKeys;
+ /**
+ * Check if a key would be stored locally (in a primary bucket on this
server). Otherwise throw a
+ * {@link RedisDataMovedException}. Note that this will not check for the
actual existence of the
+ * key.
+ */
+ public void ensureKeyIsLocal(RedisKey key) {
+ if (!slotAdvisor.isLocal(key)) {
+ throw createRedisDataMovedException(key);
+ }
}
+
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
index 56f3e4f..a4bd517 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSortedSet.java
@@ -17,7 +17,6 @@
package org.apache.geode.redis.internal.data;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
index 757f276..a1a9082 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
@@ -44,15 +44,16 @@ public abstract class RedisDataCommandsFunctionExecutor {
return regionProvider.getLocalDataRegion();
}
- protected <T> T stripedExecute(Object key, Callable<T> callable) {
+ protected <T> T stripedExecute(RedisKey key, Callable<T> callable) {
return regionProvider.execute(key, callable);
}
+ protected <T> T stripedExecute(RedisKey key, List<RedisKey> keysToLock,
Callable<T> callable) {
+ return regionProvider.execute(key, keysToLock, callable);
+ }
+
protected RedisData getRedisData(RedisKey key) {
return regionProvider.getRedisData(key);
}
- protected List<RedisKey> orderForLocking(RedisKey key1, RedisKey key2) {
- return regionProvider.orderForLocking(key1, key2);
- }
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
index 986e1a9..9b67953 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
@@ -17,6 +17,7 @@
package org.apache.geode.redis.internal.data;
+import java.util.Arrays;
import java.util.List;
import org.apache.geode.redis.internal.RegionProvider;
@@ -135,7 +136,8 @@ public class RedisKeyCommandsFunctionExecutor extends
RedisDataCommandsFunctionE
@Override
public boolean rename(RedisKey oldKey, RedisKey newKey) {
- List<RedisKey> orderedKeys = orderForLocking(oldKey, newKey);
+ List<RedisKey> orderedKeys = Arrays.asList(oldKey, newKey);
+ getRegionProvider().orderForLocking(orderedKeys);
return stripedExecute(orderedKeys.get(0), () ->
rename0(orderedKeys.get(1), oldKey, newKey));
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
index 85ec034..854b8cd 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
@@ -19,12 +19,14 @@ package org.apache.geode.redis.internal.data;
import static java.lang.Double.compare;
import static org.apache.geode.internal.JvmSizeUtils.memoryOverhead;
import static
org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_A_VALID_FLOAT;
+import static
org.apache.geode.redis.internal.data.NullRedisDataStructures.NULL_REDIS_SORTED_SET;
import static
org.apache.geode.redis.internal.data.RedisDataType.REDIS_SORTED_SET;
import static org.apache.geode.redis.internal.netty.Coder.bytesToDouble;
import static org.apache.geode.redis.internal.netty.Coder.doubleToBytes;
import static
org.apache.geode.redis.internal.netty.Coder.stripTrailingZeroFromDouble;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bGREATEST_MEMBER_NAME;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bLEAST_MEMBER_NAME;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bZERO;
import java.io.DataInput;
import java.io.DataOutput;
@@ -45,6 +47,7 @@ import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.size.Sizeable;
import org.apache.geode.redis.internal.RedisConstants;
+import org.apache.geode.redis.internal.RegionProvider;
import org.apache.geode.redis.internal.collections.OrderStatisticsTree;
import
org.apache.geode.redis.internal.collections.SizeableBytes2ObjectOpenCustomHashMapWithCursor;
import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
@@ -55,6 +58,9 @@ import
org.apache.geode.redis.internal.executor.sortedset.SortedSetLexRangeOptio
import
org.apache.geode.redis.internal.executor.sortedset.SortedSetRankRangeOptions;
import
org.apache.geode.redis.internal.executor.sortedset.SortedSetScoreRangeOptions;
import org.apache.geode.redis.internal.executor.sortedset.ZAddOptions;
+import org.apache.geode.redis.internal.executor.sortedset.ZAggregator;
+import org.apache.geode.redis.internal.executor.sortedset.ZKeyWeight;
+import org.apache.geode.redis.internal.netty.Coder;
public class RedisSortedSet extends AbstractRedisData {
protected static final int REDIS_SORTED_SET_OVERHEAD =
memoryOverhead(RedisSortedSet.class);
@@ -379,6 +385,51 @@ public class RedisSortedSet extends AbstractRedisData {
return null;
}
+ long zunionstore(RegionProvider regionProvider, RedisKey key,
List<ZKeyWeight> keyWeights,
+ ZAggregator aggregator) {
+ for (ZKeyWeight keyWeight : keyWeights) {
+ RedisSortedSet set =
+ regionProvider.getTypedRedisData(REDIS_SORTED_SET,
keyWeight.getKey(), false);
+ if (set == NULL_REDIS_SORTED_SET) {
+ continue;
+ }
+ double weight = keyWeight.getWeight();
+
+ for (AbstractOrderedSetEntry entry : set.members.values()) {
+ OrderedSetEntry existingValue = members.get(entry.member);
+ if (existingValue == null) {
+ byte[] scoreBytes;
+ // Redis math and Java math are different when handling infinity.
Specifically:
+ // Java: INFINITY * 0 = NaN
+ // Redis: INFINITY * 0 = 0
+ if (weight == 0) {
+ scoreBytes = bZERO;
+ } else if (weight == 1) {
+ scoreBytes = entry.getScoreBytes();
+ } else {
+ double newScore = entry.score * weight;
+ if (Double.isNaN(newScore)) {
+ scoreBytes = entry.getScoreBytes();
+ } else {
+ scoreBytes = Coder.doubleToBytes(entry.score * weight);
+ }
+ }
+ members.put(entry.member, new OrderedSetEntry(entry.member,
scoreBytes));
+ continue;
+ }
+
+
existingValue.updateScore(aggregator.getFunction().apply(existingValue.score,
+ entry.score * weight));
+ }
+ }
+
+ scoreSet.addAll(members.values());
+
+ regionProvider.getLocalDataRegion().put(key, this);
+
+ return getSortedSetSize();
+ }
+
private List<byte[]> zpop(Iterator<AbstractOrderedSetEntry> scoresIterator,
Region<RedisKey, RedisData> region, RedisKey key) {
if (!scoresIterator.hasNext()) {
@@ -663,6 +714,13 @@ public class RedisSortedSet extends AbstractRedisData {
score = processByteArrayAsDouble(newScore);
}
}
+
+ public double updateScore(double newScore) {
+ score = newScore;
+ scoreBytes = Coder.doubleToBytes(newScore);
+
+ return score;
+ }
}
// Dummy entry used to find the rank of an element with the given score for
inclusive or
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
index 03c0773..959e803 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
@@ -17,6 +17,8 @@
package org.apache.geode.redis.internal.data;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.geode.redis.internal.RegionProvider;
@@ -25,6 +27,8 @@ import
org.apache.geode.redis.internal.executor.sortedset.SortedSetLexRangeOptio
import
org.apache.geode.redis.internal.executor.sortedset.SortedSetRankRangeOptions;
import
org.apache.geode.redis.internal.executor.sortedset.SortedSetScoreRangeOptions;
import org.apache.geode.redis.internal.executor.sortedset.ZAddOptions;
+import org.apache.geode.redis.internal.executor.sortedset.ZAggregator;
+import org.apache.geode.redis.internal.executor.sortedset.ZKeyWeight;
public class RedisSortedSetCommandsFunctionExecutor extends
RedisDataCommandsFunctionExecutor
implements RedisSortedSetCommands {
@@ -132,4 +136,23 @@ public class RedisSortedSetCommandsFunctionExecutor
extends RedisDataCommandsFun
public byte[] zscore(RedisKey key, byte[] member) {
return stripedExecute(key, () -> getRedisSortedSet(key,
true).zscore(member));
}
+
+ @Override
+ public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
+ ZAggregator aggregator) {
+ List<RedisKey> keysToLock = new ArrayList<>(keyWeights.size());
+ for (ZKeyWeight kw : keyWeights) {
+ getRegionProvider().ensureKeyIsLocal(kw.getKey());
+ keysToLock.add(kw.getKey());
+ }
+ getRegionProvider().ensureKeyIsLocal(destinationKey);
+ keysToLock.add(destinationKey);
+
+ getRegionProvider().orderForLocking(keysToLock);
+
+ return stripedExecute(destinationKey, keysToLock,
+ () -> new
RedisSortedSet(Collections.emptyList()).zunionstore(getRegionProvider(),
+ destinationKey, keyWeights, aggregator));
+ }
+
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index 2f0a775..d5156d6 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -133,6 +133,10 @@ public class RedisResponse {
return new RedisResponse((bba) -> Coder.getOOMResponse(bba, error));
}
+ public static RedisResponse crossSlot(String error) {
+ return new RedisResponse((bba) -> Coder.getCrossSlotResponse(bba, error));
+ }
+
public static RedisResponse busykey(String error) {
return new RedisResponse((bba) -> Coder.getBusyKeyResponse(bba, error));
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
index e447dbe..14e4419 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/RedisSortedSetCommands.java
@@ -56,4 +56,7 @@ public interface RedisSortedSetCommands {
long zrevrank(RedisKey key, byte[] member);
byte[] zscore(RedisKey key, byte[] member);
+
+ long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
ZAggregator aggregator);
+
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java
new file mode 100644
index 0000000..c33e15e
--- /dev/null
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZAggregator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import java.util.function.BiFunction;
+
+/**
+ * Enums representing aggregation functions used in {@link
ZUnionStoreExecutor} and
+ * {@link ZInterStoreExecutor}.
+ */
+public enum ZAggregator {
+
+ SUM(Double::sum),
+ MIN(Math::min),
+ MAX(Math::max);
+
+ private final BiFunction<Double, Double, Double> function;
+
+ ZAggregator(BiFunction<Double, Double, Double> function) {
+ this.function = function;
+ }
+
+ public BiFunction<Double, Double, Double> getFunction() {
+ return function;
+ }
+}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZKeyWeight.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZKeyWeight.java
new file mode 100644
index 0000000..7eaf203
--- /dev/null
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZKeyWeight.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+
+/**
+ * Simple class to hold the key and weight associations used in {@link
ZUnionStoreExecutor} and
+ * {@link ZInterStoreExecutor}.
+ */
+public class ZKeyWeight {
+ private final RedisKey key;
+ private final double weight;
+
+ public ZKeyWeight(RedisKey key, double weight) {
+ this.key = key;
+ this.weight = weight;
+ }
+
+ public RedisKey getKey() {
+ return key;
+ }
+
+ public double getWeight() {
+ return weight;
+ }
+}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
new file mode 100644
index 0000000..fe96aea
--- /dev/null
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZUnionStoreExecutor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static
org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZUnionStoreExecutor extends AbstractExecutor {
+
+ @Override
+ public RedisResponse executeCommand(Command command, ExecutionHandlerContext
context) {
+ List<byte[]> commandElements = command.getProcessedCommand();
+
+ Iterator<byte[]> argIterator = commandElements.iterator();
+ // Skip command and destination key
+ argIterator.next();
+ argIterator.next();
+
+ long numKeys;
+ try {
+ numKeys = Coder.bytesToLong(argIterator.next());
+ } catch (NumberFormatException nex) {
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+
+ // Rough validation so that we can use numKeys to initialize the array
sizes below.
+ if (numKeys > commandElements.size()) {
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+
+ List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+ List<Double> weights = new ArrayList<>((int) numKeys);
+ ZAggregator aggregator = ZAggregator.SUM;
+
+ while (argIterator.hasNext()) {
+ byte[] arg = argIterator.next();
+
+ if (sourceKeys.size() < numKeys) {
+ sourceKeys.add(new RedisKey(arg));
+ continue;
+ }
+
+ arg = toUpperCaseBytes(arg);
+ if (Arrays.equals(arg, bWEIGHTS)) {
+ if (!weights.isEmpty()) {
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+ for (int i = 0; i < numKeys; i++) {
+ if (!argIterator.hasNext()) {
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+ try {
+ weights.add(Coder.bytesToDouble(argIterator.next()));
+ } catch (NumberFormatException nex) {
+ return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+ }
+ }
+ continue;
+ }
+
+ if (Arrays.equals(arg, bAGGREGATE)) {
+ try {
+ aggregator =
ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+ } catch (IllegalArgumentException | NoSuchElementException e) {
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+ continue;
+ }
+
+ // End up here if we have more keys than weights
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+
+ if (sourceKeys.size() != numKeys) {
+ return RedisResponse.error(ERROR_SYNTAX);
+ }
+
+ int bucket = command.getKey().getBucketId();
+ for (RedisKey key : sourceKeys) {
+ if (key.getBucketId() != bucket) {
+ return RedisResponse.crossSlot(ERROR_WRONG_SLOT);
+ }
+ }
+
+ List<ZKeyWeight> keyWeights = new ArrayList<>();
+ for (int i = 0; i < sourceKeys.size(); i++) {
+ double weight = weights.isEmpty() ? 1 : weights.get(i);
+ keyWeights.add(new ZKeyWeight(sourceKeys.get(i), weight));
+ }
+
+ long result = context.getSortedSetCommands()
+ .zunionstore(command.getKey(), keyWeights, aggregator);
+
+ return RedisResponse.integer(result);
+ }
+
+}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 0023d77..461b970 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -26,6 +26,7 @@ import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.NUMBER_0
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.SIMPLE_STRING_ID;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bBUSYKEY;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bCRLF;
+import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bCROSSSLOT;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bEMPTY_ARRAY;
import static
org.apache.geode.redis.internal.netty.StringBytesGlossary.bEMPTY_STRING;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bERR;
@@ -220,6 +221,10 @@ public class Coder {
return getErrorResponse0(buffer, bWRONGTYPE, error);
}
+ public static ByteBuf getCrossSlotResponse(ByteBuf buffer, String error) {
+ return getErrorResponse0(buffer, bCROSSSLOT, error);
+ }
+
public static ByteBuf getBusyKeyResponse(ByteBuf buffer, String error) {
byte[] errorAr = stringToBytes(error);
buffer.writeByte(ERROR_ID);
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
index cc4f3f8..7f422d8 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
@@ -97,6 +97,9 @@ public class StringBytesGlossary {
@MakeImmutable
public static final byte[] bBUSYKEY = stringToBytes("BUSYKEY ");
+ @MakeImmutable
+ public static final byte[] bCROSSSLOT = stringToBytes("CROSSSLOT ");
+
// ********** Redis Command constants **********
// ClusterExecutor
@@ -163,6 +166,12 @@ public class StringBytesGlossary {
@MakeImmutable
public static final byte[] bINCR = stringToBytes("INCR");
+ // ZUnionStoreExecutor
+ @MakeImmutable
+ public static final byte[] bWEIGHTS = stringToBytes("WEIGHTS");
+ @MakeImmutable
+ public static final byte[] bAGGREGATE = stringToBytes("AGGREGATE");
+
// SetExecutor
@MakeImmutable
public static final byte[] bEX = stringToBytes("EX");
@@ -183,6 +192,8 @@ public class StringBytesGlossary {
public static final byte bPLUS = SIMPLE_STRING_ID; // +
public static final byte bMINUS = ERROR_ID; // -
+ public static final byte[] bZERO = stringToBytes("0");
+
// ********** Constants for Double Infinity comparisons **********
public static final String P_INF = "+inf";
public static final String INF = "inf";
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
index 19efa75..3ceb638 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedCoordinator.java
@@ -15,8 +15,11 @@
package org.apache.geode.redis.internal.services;
+import java.util.List;
import java.util.concurrent.Callable;
+import org.apache.geode.redis.internal.data.RedisKey;
+
/**
* Allows users to "stripe" their execution in such a way that all tasks
belonging to one stripe are
@@ -33,7 +36,9 @@ public interface StripedCoordinator {
* @param stripeId defines the "stripe"
* @param callable the unit of work to do sequentially. May be called after
run returns.
*/
- <T> T execute(Object stripeId, Callable<T> callable);
+ <T> T execute(RedisKey stripeId, Callable<T> callable);
+
+ <T> T execute(List<RedisKey> stripeIds, Callable<T> callable);
int compareStripes(Object object1, Object object2);
}
diff --git
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
index 5cdeb17..7d435b6 100644
---
a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
+++
b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/SynchronizedStripedCoordinator.java
@@ -15,8 +15,11 @@
package org.apache.geode.redis.internal.services;
+import java.util.List;
import java.util.concurrent.Callable;
+import org.apache.geode.redis.internal.data.RedisKey;
+
/**
* Implements {@link StripedCoordinator} by using
* synchronization. The thread that calls execute will also be the thread that
does the work. But it
@@ -39,7 +42,7 @@ public class SynchronizedStripedCoordinator implements
StripedCoordinator {
}
@Override
- public <T> T execute(Object stripeId, Callable<T> callable) {
+ public <T> T execute(RedisKey stripeId, Callable<T> callable) {
synchronized (getSync(stripeId)) {
try {
return callable.call();
@@ -51,6 +54,21 @@ public class SynchronizedStripedCoordinator implements
StripedCoordinator {
}
}
+ @Override
+ public <T> T execute(List<RedisKey> stripeIds, Callable<T> callable) {
+ return execute(stripeIds, 0, callable);
+ }
+
+ private <T> T execute(List<RedisKey> stripeIds, int index, Callable<T>
callable) {
+ if (index + 1 == stripeIds.size()) {
+ return execute(stripeIds.get(index), callable);
+ }
+
+ synchronized (getSync(stripeIds.get(index))) {
+ return execute(stripeIds, ++index, callable);
+ }
+ }
+
private Object getSync(Object stripeId) {
return syncs[getStripeIndex(stripeId)];
}
diff --git
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
index d08b2e9..8b62ac0 100644
---
a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
+++
b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/executor/SynchronizedStripedCoordinatorTest.java
@@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
+import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.services.StripedCoordinator;
import org.apache.geode.redis.internal.services.SynchronizedStripedCoordinator;
@@ -42,7 +43,7 @@ public class SynchronizedStripedCoordinatorTest {
assertThat(result).isEqualTo("OK");
}
- private static class Hashy {
+ private static class Hashy extends RedisKey {
private final int hashcode;
public Hashy(int hashcode) {