[
https://issues.apache.org/jira/browse/GEODE-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294212#comment-17294212
]
ASF GitHub Bot commented on GEODE-8864:
---------------------------------------
sabbey37 commented on a change in pull request #5954:
URL: https://github.com/apache/geode/pull/5954#discussion_r585937669
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
##########
@@ -44,26 +45,29 @@
@Override
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
+
+ final UUID CLIENT_ID = context.getClientUUID();
+
List<byte[]> commandElems = command.getProcessedCommand();
String cursorString = Coder.bytesToString(commandElems.get(2));
- BigInteger cursor;
+ int cursor;
Pattern matchPattern;
String globPattern = null;
int count = DEFAULT_COUNT;
try {
- cursor = new BigInteger(cursorString).abs();
+ cursor = Integer.valueOf(cursorString);
} catch (NumberFormatException e) {
return RedisResponse.error(ERROR_CURSOR);
}
- if (cursor.compareTo(UNSIGNED_LONG_CAPACITY) > 0) {
+ if (cursor > Integer.MAX_VALUE) {
return RedisResponse.error(ERROR_CURSOR);
}
Review comment:
You can actually remove this conditional block now. We needed it before
because the capacity of `BigInteger` is greater than the unsigned long capacity.
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+
+ @ClassRule
+ public static RedisClusterStartupRule redisClusterStartupRule = new
RedisClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static RedisCommands<String, String> commands;
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private static Properties locatorProperties;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ static final String HASH_KEY = "key";
+ static final String BASE_FIELD = "baseField_";
+ static final Map<String, String> INITIAL_DATA_SET = makeEntrySet(1000);
+
+ static int[] redisPorts;
+
+ @BeforeClass
+ public static void classSetup() throws Exception {
+ int locatorPort;
+ locatorProperties = new Properties();
+ locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+ locator = redisClusterStartupRule.startLocatorVM(0, locatorProperties);
+ locatorPort = locator.getPort();
+ redisPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+
+ // note: due to rules around member weighting in split-brain scenarios,
+ // vm1 (server1) should not be crashed or it will cause additional
(unrelated) failures
+ String redisPort1 = redisPorts[0] + "";
+ server1 = redisClusterStartupRule.startRedisVM(1, redisPort1, locatorPort);
+
+ String redisPort2 = redisPorts[1] + "";
+ server2 = redisClusterStartupRule.startServerVM(2, redisPort2,
locatorPort);
+
+ String redisPort3 = redisPorts[2] + "";
+ server3 = redisClusterStartupRule.startServerVM(3, redisPort3,
locatorPort);
+
+ gfsh.connectAndVerify(locator);
+ }
+
+ @Before
+ public void testSetup() {
+ addIgnoredException(FunctionException.class);
+ String[] redisPortsAsStrings = new String[redisPorts.length];
+
+ for (int i = 0; i < redisPorts.length; i++) {
+ redisPortsAsStrings[i] = String.valueOf(redisPorts[i]);
+ }
+
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(redisPortsAsStrings);
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+
+ connection = redisClient.connect();
+ commands = connection.sync();
+ commands.hset(HASH_KEY, INITIAL_DATA_SET);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ commands.quit();
+
+ server1.stop();
+ server2.stop();
+ server3.stop();
+ }
+
+ @Test
+ public void
should_allow_hscan_iteration_to_complete_successfully_given_server_crashes_during_iteration()
Review comment:
If you end up removing the gfsh rule above, you could also change this
test name to be more consistent with the others.
##########
File path:
geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.MapScanCursor;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.RedisException;
+import io.lettuce.core.ScanCursor;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.internal.AvailablePortHelper;
+import
org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class HScanDunitTest {
+
+ @ClassRule
+ public static RedisClusterStartupRule redisClusterStartupRule = new
RedisClusterStartupRule(4);
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
Review comment:
I apologize for not noticing this before, but the `GfshCommandRule()`
could be removed altogether since it's not used at all (we connect it to the
locator, but never use it afterwards).
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
##########
@@ -257,11 +258,13 @@ protected Object compute(ByteArrayWrapper key, Object[]
args) {
case HSCAN: {
Pattern pattern = (Pattern) args[1];
int count = (int) args[2];
- BigInteger cursor = (BigInteger) args[3];
- return hashCommands.hscan(key, pattern, count, cursor);
+ int cursor = Integer.valueOf(args[3].toString());
Review comment:
You could cast this to an `int` since at this point we have already
verified the argument in `HScanExecutor`.
##########
File path:
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java
##########
@@ -37,22 +36,28 @@ public int getPort() {
return server.getPort();
}
+
+ // Note: these tests will not pass native redis, so included here in
concrete test class
@Test
- public void
givenDifferentCursorThanSpecifiedByPreviousHscan_returnsAllEntries() {
- Map<String, String> entryMap = new HashMap<>();
- for (int i = 0; i < 10; i++) {
- entryMap.put(String.valueOf(i), String.valueOf(i));
- }
- jedis.hmset("a", entryMap);
-
- ScanParams scanParams = new ScanParams();
- scanParams.count(5);
- ScanResult<Map.Entry<String, String>> result = jedis.hscan("a", "0",
scanParams);
- assertThat(result.isCompleteIteration()).isFalse();
-
- result = jedis.hscan("a", "100");
-
- assertThat(result.getResult()).hasSize(10);
- assertThat(new
HashSet<>(result.getResult())).isEqualTo(entryMap.entrySet());
+ public void givenCursorGreaterThanIntMaxValue_returnsCursorError() {
+ int largestCursorValue = Integer.MAX_VALUE;
+
+ BigInteger tooBigCursor =
+ new
BigInteger(String.valueOf(largestCursorValue)).add(BigInteger.valueOf(1));
+
+ assertThatThrownBy(() -> jedis.hscan("a", tooBigCursor.toString()))
+ .hasMessageContaining(ERROR_CURSOR);
}
+
+ @Test
+ public void givenCursorLessThanIntMinValue_returnsCursorError() {
+ int smallestCursorValue = Integer.MIN_VALUE;
+
+ BigInteger tooSmallCursor =
+ new
BigInteger(String.valueOf(smallestCursorValue)).subtract(BigInteger.valueOf(1));
+
+ assertThatThrownBy(() -> jedis.hscan("a", tooSmallCursor.toString()))
+ .hasMessageContaining(ERROR_CURSOR);
+ }
Review comment:
Thanks for adding these tests!
##########
File path:
geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -219,13 +298,13 @@ public void
givenCount_returnsAllEntriesWithoutDuplicates() {
cursor = result.getCursor();
} while (!result.isCompleteIteration());
- assertThat(allEntries).hasSize(3);
- assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+ assertThat(allEntries.size()).isCloseTo(3, Offset.offset(1));
Review comment:
A few thoughts:
- The test name is kinda misleading, since we say
`returnsAllEntriesWithoutDuplicates`, but we are admitting the possibility of
duplicates.
- You could use lambdas (like you did in another part of the code) to remove
any duplicates before the assertion, something like:
```
List<Map.Entry<String, String>> allEntriesWithoutDuplicates =
allEntries.stream().distinct().collect(Collectors.toList());
```
- But really none of the above matters because, thinking about it more, I
realize that for most of our tests, the data is so small that native Redis is
always going to return the entire structure at once (per the `Why SCAN may
return all the items of an aggregate data type in a single call?` section of
the SCAN documentation as well as the Redis code (see `db.c` file,
`scanGenericCommand` method)). So we can actually check that the size and
entries match exactly what we expect.
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
##########
@@ -46,19 +53,88 @@
public class RedisHash extends AbstractRedisData {
public static final RedisHash NULL_REDIS_HASH = new NullRedisHash();
private HashMap<ByteArrayWrapper, ByteArrayWrapper> hash;
+ private ConcurrentHashMap<UUID, List<ByteArrayWrapper>> hScanSnapShots;
+ private ConcurrentHashMap<UUID, Long> hScanSnapShotCreationTimes;
+ private ScheduledExecutorService HSCANSnapshotExpirationExecutor = null;
+
+ private static int default_hscan_snapshots_expire_check_frequency =
+ Integer.getInteger("redis.hscan-snapshot-cleanup-interval", 30000);
+
+ private static int default_hscan_snapshots_milliseconds_to_live =
+ Integer.getInteger("redis.hscan-snapshot-expiry", 30000);
+
+ private int HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS;
+ private int MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE;
+
+ @VisibleForTesting
+ public RedisHash(List<ByteArrayWrapper> fieldsToSet, int
hscanSnapShotExpirationCheckFrequency,
+ int minimumLifeForHscanSnaphot) {
+ this();
+
+ this.HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS =
+ hscanSnapShotExpirationCheckFrequency;
+ this.MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE =
minimumLifeForHscanSnaphot;
- public RedisHash(List<ByteArrayWrapper> fieldsToSet) {
- hash = new HashMap<>();
Iterator<ByteArrayWrapper> iterator = fieldsToSet.iterator();
while (iterator.hasNext()) {
hashPut(iterator.next(), iterator.next());
}
}
+ public RedisHash(List<ByteArrayWrapper> fieldsToSet) {
+ this(fieldsToSet,
+ default_hscan_snapshots_expire_check_frequency,
+ default_hscan_snapshots_milliseconds_to_live);
+ }
+
+ // for serialization
public RedisHash() {
- // for serialization
+ this.hash = new HashMap<>();
+ this.hScanSnapShots = new ConcurrentHashMap<>();
+ this.hScanSnapShotCreationTimes = new ConcurrentHashMap<>();
+
+ this.HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS =
+ this.default_hscan_snapshots_expire_check_frequency;
+
+ this.MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE =
+ this.default_hscan_snapshots_milliseconds_to_live;
}
+
+ private void expireHScanSnapshots() {
+
+ this.hScanSnapShotCreationTimes.entrySet().forEach(entry -> {
Review comment:
I was confused for a second, but then realized the above comment went
with the comment that was in the `createKeySnapshot` method. Those changes look
good!
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
##########
@@ -44,26 +45,29 @@
@Override
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
+
+ final UUID CLIENT_ID = context.getClientUUID();
+
List<byte[]> commandElems = command.getProcessedCommand();
String cursorString = Coder.bytesToString(commandElems.get(2));
- BigInteger cursor;
+ int cursor;
Pattern matchPattern;
String globPattern = null;
int count = DEFAULT_COUNT;
try {
- cursor = new BigInteger(cursorString).abs();
+ cursor = Integer.valueOf(cursorString);
Review comment:
This can be changed to `Integer.parseInt(cursorString)` ..apparently,
according to the interwebz, this is the preferred way since we are using the
primitive `int` and have no need for the `Integer` object.
##########
File path:
geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
##########
@@ -203,47 +280,118 @@ public int hstrlen(ByteArrayWrapper field) {
return new ArrayList<>(hash.keySet());
}
- public Pair<BigInteger, List<Object>> hscan(Pattern matchPattern, int count,
BigInteger cursor) {
- List<Object> returnList = new ArrayList<Object>();
- int size = hash.size();
- BigInteger beforeCursor = new BigInteger("0");
- int numElements = 0;
- int i = -1;
- for (Map.Entry<ByteArrayWrapper, ByteArrayWrapper> entry :
hash.entrySet()) {
- ByteArrayWrapper key = entry.getKey();
- ByteArrayWrapper value = entry.getValue();
- i++;
- if (beforeCursor.compareTo(cursor) < 0) {
- beforeCursor = beforeCursor.add(new BigInteger("1"));
+ public ImmutablePair<Integer, List<Object>> hscan(UUID clientID, Pattern
matchPattern,
+ int count,
+ int cursorParameter) {
+
+ int startCursor = cursorParameter;
Review comment:
The `startCursor` variable is unnecessary here.
##########
File path:
geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
##########
@@ -161,4 +163,109 @@ public void
setExpirationTimestamp_stores_delta_that_is_stable() throws IOExcept
o2.fromDelta(in);
assertThat(o2).isEqualTo(o1);
}
+
+ @Test
+ public void hscanSnaphots_shouldBeEmpty_givenHscanHasNotBeenCalled() {
+ RedisHash subject = createRedisHash(100);
+ assertThat(subject.getHscanSnapShots()).isEmpty();
+ }
+
+ @Test
+ public void hscanSnaphots_shouldContainSnapshot_givenHscanHasBeenCalled() {
+
+ final List<ByteArrayWrapper> FIELDS_AND_VALUES_FOR_HASH =
createListOfDataElements(100);
+ RedisHash subject = new RedisHash(FIELDS_AND_VALUES_FOR_HASH);
+ UUID clientID = UUID.randomUUID();
+
+ subject.hscan(clientID, null, 10, BigInteger.valueOf(0));
+
+ ConcurrentHashMap<UUID, List<ByteArrayWrapper>> hscanSnapShotMap =
subject.getHscanSnapShots();
+
+ assertThat(hscanSnapShotMap.containsKey(clientID)).isTrue();
+
+ List<ByteArrayWrapper> keyList = hscanSnapShotMap.get(clientID);
+ assertThat(keyList).isNotEmpty();
+
+ FIELDS_AND_VALUES_FOR_HASH.forEach((entry) -> {
+ if (entry.toString().contains("field")) {
+ assertThat(keyList.contains(entry)).isTrue();
+ } else if (entry.toString().contains("value")) {
+ assertThat(keyList.contains(entry)).isFalse();
Review comment:
I think this got updated for the test below it, but not this one.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> finish implementation of Redis HScan Command
> --------------------------------------------
>
> Key: GEODE-8864
> URL: https://issues.apache.org/jira/browse/GEODE-8864
> Project: Geode
> Issue Type: New Feature
> Components: redis
> Reporter: John Hutchison
> Priority: Major
> Labels: blocks-1.14.0, pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)