Repository: incubator-gearpump Updated Branches: refs/heads/master 080bdca62 -> 0c3ff4edd
[GEARPUMP-319] Support Sorted Set in Redis Sorted Set is similar with Set in Redis with unique values . Each element storage in Sorted Set is associated with a score that is use to sort the element in order . Author: darionyaphet <[email protected]> Closes #191 from darionyaphet/GEARPUMP-319. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/0c3ff4ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/0c3ff4ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/0c3ff4ed Branch: refs/heads/master Commit: 0c3ff4edd4ae751758426d6448953574f40a7369 Parents: 080bdca Author: darionyaphet <[email protected]> Authored: Tue Jun 27 11:58:44 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Jun 27 11:58:44 2017 +0800 ---------------------------------------------------------------------- .../apache/gearpump/redis/RedisMessage.scala | 93 ++++++++++++++++++++ .../org/apache/gearpump/redis/RedisSink.scala | 10 +++ 2 files changed, 103 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0c3ff4ed/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala index ea738d6..cacf774 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala @@ -371,6 +371,99 @@ object RedisMessage { } + object SortedSets { + /** + * Adds all the specified members with the specified scores to the sorted set stored at key. + * + * @param key + * @param score + * @param member + */ + case class ZADD(key: Array[Byte], score: Double, member: Array[Byte]) { + def this(key: String, score: Double, member: String) = { + this(toBytes(key), score, toBytes(member)) + } + } + + /** + * Increments the score of member in the sorted set stored at key by increment. + * + * @param key + * @param score + * @param member + */ + case class ZINCRBY(key: Array[Byte], score: Double, member: Array[Byte]) { + def this(key: String, score: Double, member: String) = { + this(toBytes(key), score, toBytes(member)) + } + } + + /** + * Removes the specified members from the sorted set stored at key. + * + * @param key + * @param member + */ + case class ZREM(key: Array[Byte], member: Array[Byte]) { + def this(key: String, member: String) = { + this(toBytes(key), toBytes(member)) + } + } + + /** + * When all the elements in a sorted set are inserted with the same score,in order to + * force lexicographical ordering, this command removes all elements in the sorted set + * stored at key between the lexicographical range specified by min and max. + * + * @param key + * @param min + * @param max + */ + case class ZREMRANGEBYLEX(key: Array[Byte], min: Array[Byte], max: Array[Byte]) { + def this(key: String, min: String, max: String) = { + this(toBytes(key), toBytes(min), toBytes(max)) + } + } + + /** + * Removes all elements in the sorted set stored at key with rank between start and stop. + * + * @param key + * @param start + * @param stop + */ + case class ZREMRANGEBYRANK(key: Array[Byte], start: Long, stop: Long) { + def this(key: String, start: Long, stop: Long) = { + this(toBytes(key), start, stop) + } + } + + /** + * Removes all elements in the sorted set stored at key with a score between min and max. + * + * @param key + * @param min + * @param max + */ + case class ZREMRANGEBYSCORE(key: Array[Byte], min: Double, max: Double) { + def this(key: String, min: Double, max: Double) = { + this(toBytes(key), min, max) + } + } + + /** + * Get the score associated with the given member in a sorted set + * + * @param key + * @param member + */ + case class ZSCORE(key: Array[Byte], member: Array[Byte]) { + def this(key: String, member: String) = { + this(toBytes(key), toBytes(member)) + } + } + } + object String { /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0c3ff4ed/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala index 9afb1fe..36babe4 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -24,6 +24,7 @@ import org.apache.gearpump.redis.RedisMessage.HyperLogLog._ import org.apache.gearpump.redis.RedisMessage.Keys._ import org.apache.gearpump.redis.RedisMessage.Lists._ import org.apache.gearpump.redis.RedisMessage.Sets._ +import org.apache.gearpump.redis.RedisMessage.SortedSets._ import org.apache.gearpump.redis.RedisMessage.String._ import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.TaskContext @@ -109,6 +110,15 @@ class RedisSink( case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value) case msg: SETNX => client.setnx(msg.key, msg.value) case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value) + + // Sorted Set + case msg: ZADD => client.zadd(msg.key, msg.score, msg.member) + case msg: ZINCRBY => client.zincrby(msg.key, msg.score, msg.member) + case msg: ZREM => client.zrem(msg.key, msg.member) + case msg: ZREMRANGEBYLEX => client.zremrangeByLex(msg.key, msg.min, msg.max) + case msg: ZREMRANGEBYRANK => client.zremrangeByRank(msg.key, msg.start, msg.stop) + case msg: ZREMRANGEBYSCORE => client.zremrangeByScore(msg.key, msg.min, msg.max) + case msg: ZSCORE => client.zscore(msg.key, msg.member) } }
