Repository: ignite Updated Branches: refs/heads/ignite-5578 cee2ac459 -> 57579bb15
IGNITE-5706: Redis FLUSHDB and FLUSHALL command support. - Fixes #2250. Signed-off-by: shroman <rsht...@yahoo.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1be77384 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1be77384 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1be77384 Branch: refs/heads/ignite-5578 Commit: 1be77384122e41ce69db828e4432e56824c76f79 Parents: 96b43e5 Author: shroman <rsht...@yahoo.com> Authored: Mon Jul 31 15:35:14 2017 +0900 Committer: shroman <rsht...@yahoo.com> Committed: Mon Jul 31 15:35:14 2017 +0900 ---------------------------------------------------------------------- .../client/suite/IgniteClientTestSuite.java | 2 + .../tcp/redis/RedisProtocolServerSelfTest.java | 110 +++++++++++++++++ .../tcp/redis/RedisProtocolStringSelfTest.java | 19 --- .../processors/rest/GridRestCommand.java | 3 + .../handlers/cache/GridCacheCommandHandler.java | 41 +++++++ .../server/GridRedisFlushCommandHandler.java | 117 +++++++++++++++++++ .../protocols/tcp/redis/GridRedisCommand.java | 6 +- .../tcp/redis/GridRedisNioListener.java | 2 + 8 files changed, 280 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/clients/src/test/java/org/apache/ignite/internal/client/suite/IgniteClientTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/suite/IgniteClientTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/suite/IgniteClientTestSuite.java index ff7e7e7..223268c 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/suite/IgniteClientTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/suite/IgniteClientTestSuite.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.rest.RestProcessorStartSelfTest; import org.apache.ignite.internal.processors.rest.TaskCommandHandlerSelfTest; import org.apache.ignite.internal.processors.rest.protocols.tcp.TcpRestParserSelfTest; import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.RedisProtocolConnectSelfTest; +import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.RedisProtocolServerSelfTest; import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.RedisProtocolStringSelfTest; import org.apache.ignite.testframework.IgniteTestSuite; @@ -92,6 +93,7 @@ public class IgniteClientTestSuite extends TestSuite { // Test TCP rest processor with original REDIS client. suite.addTestSuite(RedisProtocolStringSelfTest.class); suite.addTestSuite(RedisProtocolConnectSelfTest.class); + suite.addTestSuite(RedisProtocolServerSelfTest.class); suite.addTestSuite(RestProcessorStartSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolServerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolServerSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolServerSelfTest.java new file mode 100644 index 0000000..a424d77 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolServerSelfTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp.redis; + +import java.util.HashMap; +import org.junit.Assert; +import redis.clients.jedis.Jedis; + +/** + * Tests for Server commands of Redis protocol. + */ +public class RedisProtocolServerSelfTest extends RedisCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testDbSize() throws Exception { + try (Jedis jedis = pool.getResource()) { + Assert.assertEquals(0, (long)jedis.dbSize()); + + jcache().putAll(new HashMap<Integer, Integer>() { + { + for (int i = 0; i < 100; i++) + put(i, i); + } + }); + + Assert.assertEquals(100, (long)jedis.dbSize()); + } + } + + /** + * @throws Exception If failed. + */ + public void testFlushDb() throws Exception { + try (Jedis jedis = pool.getResource()) { + Assert.assertEquals(0, (long)jedis.dbSize()); + + jcache().putAll(new HashMap<Integer, Integer>() { + { + for (int i = 0; i < 100; i++) + put(i, i); + } + }); + + Assert.assertEquals(100, (long)jedis.dbSize()); + + jedis.select(1); + + jcache().putAll(new HashMap<Integer, Integer>() { + { + for (int i = 0; i < 100; i++) + put(i, i); + } + }); + + // flush database 1. + jedis.flushDB(); + + Assert.assertEquals(0, (long)jedis.dbSize()); + + jedis.select(0); + + Assert.assertEquals(100, (long)jedis.dbSize()); + } + } + + /** + * @throws Exception If failed. + */ + public void testFlushAll() throws Exception { + try (Jedis jedis = pool.getResource()) { + Assert.assertEquals(0, (long)jedis.dbSize()); + + for (int i = 0; i < 100; i++) + jedis.set(String.valueOf(i), String.valueOf(i)); + + Assert.assertEquals(100, (long)jedis.dbSize()); + + jedis.select(1); + + for (int i = 0; i < 100; i++) + jedis.set(String.valueOf(i), String.valueOf(i)); + + Assert.assertEquals(100, (long)jedis.dbSize()); + + jedis.flushAll(); + + Assert.assertEquals(0, (long)jedis.dbSize()); + + jedis.select(0); + + Assert.assertEquals(0, (long)jedis.dbSize()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolStringSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolStringSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolStringSelfTest.java index dff346e..68b42c4 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolStringSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/RedisProtocolStringSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.rest.protocols.tcp.redis; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import org.junit.Assert; @@ -424,22 +423,4 @@ public class RedisProtocolStringSelfTest extends RedisCommonAbstractTest { Assert.assertEquals(2, (long)jedis.exists("existsKey1", "existsKey2")); } } - - /** - * @throws Exception If failed. - */ - public void testDbSize() throws Exception { - try (Jedis jedis = pool.getResource()) { - Assert.assertEquals(0, (long)jedis.dbSize()); - - jcache().putAll(new HashMap<Integer, Integer>() { - { - for (int i = 0; i < 100; i++) - put(i, i); - } - }); - - Assert.assertEquals(100, (long)jedis.dbSize()); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java index 226db43..2ed370d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java @@ -72,6 +72,9 @@ public enum GridRestCommand { /** Remove several values from cache. */ CACHE_REMOVE_ALL("rmvall"), + /** Clear the specified cache, or all caches if none is specified. */ + CACHE_CLEAR("clear"), + /** Replace cache value only if there is currently a mapping for it. */ CACHE_REPLACE("rep"), http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 0006f4b..003fcdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandle import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest; import org.apache.ignite.internal.processors.rest.request.GridRestRequest; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.IgniteClosure2X; import org.apache.ignite.internal.util.typedef.CX1; @@ -79,6 +80,7 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.ATOMIC_ import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_ADD; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_APPEND; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_CAS; +import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_CLEAR; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_CONTAINS_KEY; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_CONTAINS_KEYS; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET; @@ -127,6 +129,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { CACHE_REPLACE_VALUE, CACHE_GET_AND_REMOVE, CACHE_REMOVE_ALL, + CACHE_CLEAR, CACHE_REPLACE, CACHE_CAS, CACHE_APPEND, @@ -545,6 +548,44 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { break; } + case CACHE_CLEAR: { + Map<Object, Object> map = req0.values(); + + // HashSet wrapping for correct serialization + Set<Object> cacheNames = map == null ? + new HashSet<>(ctx.cache().publicCaches()) : new HashSet<>(map.keySet()); + + GridCompoundFuture compFut = new GridCompoundFuture(); + + for (Object cName : cacheNames) + compFut.add(executeCommand(req.destinationId(), req.clientId(), (String)cName, skipStore, key, + new RemoveAllCommand(null))); + + compFut.markInitialized(); + + fut = compFut.chain(new CX1<GridCompoundFuture<GridCacheRestResponse, ?>, GridRestResponse>() { + @Override public GridRestResponse applyx( + GridCompoundFuture<GridCacheRestResponse, ?> cf) throws IgniteCheckedException { + boolean success = true; + + for (IgniteInternalFuture<GridCacheRestResponse> f : cf.futures()) + if ((Boolean)f.get().getResponse() != true) + success = false; + + GridCacheRestResponse resp = new GridCacheRestResponse(); + + if (success) + resp.setResponse(true); + else + resp.setResponse(false); + + return resp; + } + }); + + break; + } + case CACHE_REPLACE: { final Object val = req0.value(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/server/GridRedisFlushCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/server/GridRedisFlushCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/server/GridRedisFlushCommandHandler.java new file mode 100644 index 0000000..9e48b6b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/server/GridRedisFlushCommandHandler.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.redis.server; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler; +import org.apache.ignite.internal.processors.rest.GridRestResponse; +import org.apache.ignite.internal.processors.rest.handlers.redis.GridRedisRestCommandHandler; +import org.apache.ignite.internal.processors.rest.handlers.redis.exception.GridRedisGenericException; +import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisCommand; +import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisMessage; +import org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisProtocolParser; +import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest; +import org.apache.ignite.internal.processors.rest.request.GridRestRequest; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_CLEAR; +import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_REMOVE_ALL; +import static org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisCommand.FLUSHALL; +import static org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisCommand.FLUSHDB; +import static org.apache.ignite.internal.processors.rest.protocols.tcp.redis.GridRedisMessage.CACHE_NAME_PREFIX; + +/** + * Redis FLUSHDB/FLUSHALL command handler. + */ +public class GridRedisFlushCommandHandler extends GridRedisRestCommandHandler { + /** Supported commands. */ + private static final Collection<GridRedisCommand> SUPPORTED_COMMANDS = U.sealList( + FLUSHDB, + FLUSHALL + ); + + /** Grid context. */ + private final GridKernalContext ctx; + + /** + * Handler constructor. + * + * @param log Logger to use. + * @param hnd Rest handler. + * @param ctx Context. + */ + public GridRedisFlushCommandHandler(final IgniteLogger log, final GridRestProtocolHandler hnd, + GridKernalContext ctx) { + super(log, hnd); + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public Collection<GridRedisCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public GridRestRequest asRestRequest(GridRedisMessage msg) throws IgniteCheckedException { + assert msg != null; + + GridRestCacheRequest restReq = new GridRestCacheRequest(); + + restReq.clientId(msg.clientId()); + + switch (msg.command()) { + case FLUSHDB: + restReq.command(CACHE_REMOVE_ALL); + restReq.cacheName(msg.cacheName()); + + break; + default: + // CACHE_CLEAR + Map<Object, Object> redisCaches = new HashMap<>(); + + for (IgniteCacheProxy<?, ?> cache : ctx.cache().publicCaches()) { + if (cache.getName().startsWith(CACHE_NAME_PREFIX)) { + redisCaches.put(cache.getName(), null); + } + } + + if (redisCaches.isEmpty()) + throw new GridRedisGenericException("No Redis caches found"); + + restReq.command(CACHE_CLEAR); + restReq.values(redisCaches); + } + + return restReq; + } + + /** {@inheritDoc} */ + @Override public ByteBuffer makeResponse(final GridRestResponse restRes, List<String> params) { + return ((Boolean)restRes.getResponse() == true ? GridRedisProtocolParser.oKString() + : GridRedisProtocolParser.toGenericError("Failed to flush")); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java index fd04d6a..bc3b9a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java @@ -70,7 +70,11 @@ public enum GridRedisCommand { // Server commands. /** DBSIZE. */ - DBSIZE("DBSIZE"); + DBSIZE("DBSIZE"), + /** FLUSHDB. */ + FLUSHDB("FLUSHDB"), + /** FLUSHALL. */ + FLUSHALL("FLUSHALL"); /** String for command. */ private final String cmd; http://git-wip-us.apache.org/repos/asf/ignite/blob/1be77384/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java index 6775562..9436369 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.rest.handlers.redis.GridRedisConnec import org.apache.ignite.internal.processors.rest.handlers.redis.key.GridRedisDelCommandHandler; import org.apache.ignite.internal.processors.rest.handlers.redis.key.GridRedisExistsCommandHandler; import org.apache.ignite.internal.processors.rest.handlers.redis.server.GridRedisDbSizeCommandHandler; +import org.apache.ignite.internal.processors.rest.handlers.redis.server.GridRedisFlushCommandHandler; import org.apache.ignite.internal.processors.rest.handlers.redis.string.GridRedisAppendCommandHandler; import org.apache.ignite.internal.processors.rest.handlers.redis.string.GridRedisGetCommandHandler; import org.apache.ignite.internal.processors.rest.handlers.redis.string.GridRedisGetRangeCommandHandler; @@ -89,6 +90,7 @@ public class GridRedisNioListener extends GridNioServerListenerAdapter<GridRedis // server commands. addCommandHandler(new GridRedisDbSizeCommandHandler(log, hnd)); + addCommandHandler(new GridRedisFlushCommandHandler(log, hnd, ctx)); } /**