Repository: ignite Updated Branches: refs/heads/ignite-2788 99115ee0f -> a450ae187
IGNITE-2788: Basic Redis protocol implementation. GET command to see how cache commands work and more Redis return types. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a450ae18 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a450ae18 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a450ae18 Branch: refs/heads/ignite-2788 Commit: a450ae1872ce7cac9205a1614c74acc8edf04f5e Parents: 99115ee Author: shtykh_roman <[email protected]> Authored: Wed Apr 13 18:43:30 2016 +0900 Committer: shtykh_roman <[email protected]> Committed: Wed Apr 13 18:43:30 2016 +0900 ---------------------------------------------------------------------- .../processors/redis/RedisProtocolSelfTest.java | 4 +- .../protocols/tcp/redis/GridRedisCommand.java | 5 +- .../protocols/tcp/redis/GridRedisMessage.java | 27 ++++- .../tcp/redis/GridRedisNioListener.java | 46 +++++-- .../tcp/redis/GridRedisProtocolParser.java | 120 ++++++++++++++++++- 5 files changed, 182 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a450ae18/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java index 2ad86e8..4899b72 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/redis/RedisProtocolSelfTest.java @@ -153,10 +153,10 @@ public class RedisProtocolSelfTest extends GridCommonAbstractTest { public void testGet() throws Exception { try (Jedis jedis = pool.getResource()) { jcache().put("getKey1", "getVal1"); - jcache().put("getKey2", "getVal2"); + jcache().put("getKey2", 0); Assert.assertEquals("getVal1", jedis.get("getKey1")); - Assert.assertEquals("getVal2", jedis.get("getKey2")); + Assert.assertEquals("0", jedis.get("getKey2")); Assert.assertNull(jedis.get("wrongKey")); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a450ae18/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java index 520fd98..e7e4dac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisCommand.java @@ -29,7 +29,10 @@ public enum GridRedisCommand { /** Connection close. */ QUIT("QUIT"), /** Echo. */ - ECHO("ECHO"); + ECHO("ECHO"), + + /** GET. */ + GET("GET"); /** String for command. */ private final String cmd; http://git-wip-us.apache.org/repos/asf/ignite/blob/a450ae18/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisMessage.java index ca16a7c..a22c349 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisMessage.java @@ -21,7 +21,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage; +import org.jetbrains.annotations.Nullable; /** * Message to communicate with Redis client. Contains command, its attributes and response. @@ -30,6 +32,9 @@ public class GridRedisMessage implements GridClientMessage { /** */ private static final long serialVersionUID = 0L; + /** Random UUID used for RESP clients authentication. */ + private static final UUID RESP_ID = UUID.randomUUID(); + /** Request byte. */ public static final byte RESP_REQ_FLAG = GridRedisProtocolParser.ARRAY; @@ -42,6 +47,8 @@ public class GridRedisMessage implements GridClientMessage { private ByteBuffer response; + private String cacheName; + public GridRedisMessage(int msgLen) { msgParts = new ArrayList<>(msgLen); } @@ -79,6 +86,22 @@ public class GridRedisMessage implements GridClientMessage { return "GridRedisMessage [msg: " + msgParts + "]"; } + /** + * @return Cache name. + */ + @Nullable public String cacheName() { + return cacheName; + } + + /** + * @param cacheName Cache name. + */ + public void cacheName(String cacheName) { + assert cacheName != null; + + this.cacheName = cacheName; + } + /** {@inheritDoc} */ @Override public long requestId() { return 0; @@ -91,12 +114,12 @@ public class GridRedisMessage implements GridClientMessage { /** {@inheritDoc} */ @Override public UUID clientId() { - return null; + return RESP_ID; } /** {@inheritDoc} */ @Override public void clientId(UUID id) { - + throw new IgniteException("Setting client id is not expected!"); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a450ae18/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java index 2d6ae7a..a180be3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisNioListener.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.rest.protocols.tcp.redis; +import java.nio.ByteBuffer; import java.util.EnumMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; @@ -35,7 +36,8 @@ import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.typedef.CIX1; import org.jetbrains.annotations.Nullable; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET; /** * Listener for Redis protocol requests. @@ -115,14 +117,27 @@ public class GridRedisNioListener extends GridNioServerListenerAdapter<GridRedis GridRestResponse restRes = f.get(); GridRedisMessage res = msg; + ByteBuffer resp; + + if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS) { + switch (res.command()) { + case GET: + resp = (restRes.getResponse() == null ? GridRedisProtocolParser.nil() + : GridRedisProtocolParser.toBulkString(restRes.getResponse())); + + break; + default: + resp = GridRedisProtocolParser.toGenericError("Unsupported operation!"); + } + res.setResponse(resp); + } + else + res.setResponse(GridRedisProtocolParser.toGenericError("Operation error!")); - // TODO: set response. - - sendResponse(ses, msg); + sendResponse(ses, res); } }); } - } /** @@ -136,21 +151,34 @@ public class GridRedisNioListener extends GridNioServerListenerAdapter<GridRedis return ses.send(res); } + /** + * @param msg {@link GridRedisMessage} + * @return {@link GridRestRequest} + */ private GridRestRequest toRestRequest(GridRedisMessage msg) { assert msg != null; GridRestCacheRequest restReq = new GridRestCacheRequest(); restReq.command(redisToRestCommand(msg.command())); -// restReq.clientId(req.clientId()); -// restReq.ttl(req.expiration()); -// restReq.cacheName(req.cacheName()); + restReq.clientId(msg.clientId()); restReq.key(msg.key()); return restReq; } private GridRestCommand redisToRestCommand(GridRedisCommand cmd) { - throw new NotImplementedException(); + GridRestCommand restCmd; + + switch (cmd) { + case GET: + restCmd = CACHE_GET; + + break; + default: + return null; + } + + return restCmd; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a450ae18/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisProtocolParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisProtocolParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisProtocolParser.java index 6167bd2..b3e5b5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisProtocolParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/redis/GridRedisProtocolParser.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; /** - * Parser to decode/encode Redis protocol requests. + * Parser to decode/encode Redis protocol (RESP) requests. */ public class GridRedisProtocolParser { /** + prefix. */ @@ -48,6 +48,18 @@ public class GridRedisProtocolParser { /** CRLF. */ private static final byte[] CRLF = new byte[] {13, 10}; + /** Generic error prefix. */ + private static final byte[] ERR_GENERIC = "ERR ".getBytes(); + + /** Prefix for errors on operations with the wrong type. */ + private static final byte[] ERR_TYPE = "WRONGTYPE ".getBytes(); + + /** Prefix for errors on authentication. */ + private static final byte[] ERR_AUTH = "NOAUTH ".getBytes(); + + /** Null bulk string for nil response. */ + private static final byte[] NIL = "$-1\r\n".getBytes(); + /** * Reads array. * @@ -75,7 +87,7 @@ public class GridRedisProtocolParser { * Reads a bulk string. * * @param buf - * @return + * @return Bulk string. * @throws IgniteCheckedException */ public static String readBulkStr(ByteBuffer buf) throws IgniteCheckedException { @@ -120,11 +132,11 @@ public class GridRedisProtocolParser { /** * Converts a simple string data to a {@link ByteBuffer}. * - * @param str - * @return + * @param val String to be converted to a simple string. + * @return Redis simple string. */ - public static ByteBuffer toSimpleString(String str) { - byte[] b = str.getBytes(); + public static ByteBuffer toSimpleString(String val) { + byte[] b = val.getBytes(); ByteBuffer buf = ByteBuffer.allocate(b.length + 3); buf.put(SIMPLE_STRING); @@ -135,4 +147,100 @@ public class GridRedisProtocolParser { return buf; } + + /** + * Creates a generic error response. + * + * @param errMsg Error message. + * @return Error response. + */ + public static ByteBuffer toGenericError(String errMsg) { + return toError(errMsg, ERR_GENERIC); + } + + /** + * Creates an error response on operation against the wrong data type. + * + * @param errMsg Error message. + * @return Error response. + */ + public static ByteBuffer toTypeError(String errMsg) { + return toError(errMsg, ERR_TYPE); + } + + /** + * Creates an error response. + * + * @param errMsg Error message. + * @param errPrefix Error prefix. + * @return Error response. + */ + private static ByteBuffer toError(String errMsg, byte[] errPrefix) { + byte[] b = errMsg.getBytes(); + + ByteBuffer buf = ByteBuffer.allocate(b.length + errPrefix.length + 3); + buf.put(ERROR); + buf.put(errPrefix); + buf.put(b); + buf.put(CRLF); + + buf.flip(); + + return buf; + } + + /** + * Converts an integer result to a RESP integer. + * + * @param integer Integer result. + * @return RESP integer. + */ + public static ByteBuffer toInteger(int integer) { + byte[] b = String.valueOf(integer).getBytes(); + + ByteBuffer buf = ByteBuffer.allocate(b.length + 3); + buf.put(INTEGER); + buf.put(b); + buf.put(CRLF); + + buf.flip(); + + return buf; + } + + /** + * @return Nil response. + */ + public static ByteBuffer nil() { + ByteBuffer buf = ByteBuffer.allocate(NIL.length); + buf.put(NIL); + + buf.flip(); + + return buf; + } + + /** + * Converts a resultant object to a bulk string. + * + * @param val Object. + * @return Bulk string. + */ + public static ByteBuffer toBulkString(Object val) { + assert val != null; + + byte[] b = String.valueOf(val).getBytes(); + byte[] l = String.valueOf(b.length).getBytes(); + + ByteBuffer buf = ByteBuffer.allocate(b.length + l.length + 5); + buf.put(BULK_STRING); + buf.put(l); + buf.put(CRLF); + buf.put(b); + buf.put(CRLF); + + buf.flip(); + + return buf; + } }
