Repository: bahir-flink Updated Branches: refs/heads/master 3f180342c -> 130475839
[BAHIR-95] Add ZREM to Redis commands This closes #13 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/13047583 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/13047583 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/13047583 Branch: refs/heads/master Commit: 13047583906338d8b1428f3ae1b8c8dd8a8a1089 Parents: 3f18034 Author: ariskk <ak.koliopou...@gmail.com> Authored: Tue Mar 14 10:13:32 2017 +0000 Committer: Robert Metzger <rmetz...@apache.org> Committed: Tue Mar 14 14:14:31 2017 +0100 ---------------------------------------------------------------------- flink-connector-redis/README.md | 5 ++++- .../streaming/connectors/redis/RedisSink.java | 3 +++ .../common/container/RedisClusterContainer.java | 12 ++++++++++++ .../common/container/RedisCommandsContainer.java | 8 ++++++++ .../redis/common/container/RedisContainer.java | 17 +++++++++++++++++ .../redis/common/mapper/RedisCommand.java | 5 +++++ .../connectors/redis/RedisSinkITCase.java | 15 ++++++++++++--- 7 files changed, 61 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-redis/README.md b/flink-connector-redis/README.md index 0748a92..87da6d3 100644 --- a/flink-connector-redis/README.md +++ b/flink-connector-redis/README.md @@ -141,6 +141,9 @@ This section gives a description of all the available data types and what Redis </tr> <tr> <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td> - </tr> + </tr> + <tr> + <td>SORTED_SET</td><td><a href="http://redis.io/commands/zrem">ZREM</a></td> + </tr> </tbody> </table> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java index 688f94a..9138862 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -151,6 +151,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> { case ZADD: this.redisCommandsContainer.zadd(this.additionalKey, value, key); break; + case ZREM: + this.redisCommandsContainer.zrem(this.additionalKey, key); + break; case HSET: this.redisCommandsContainer.hset(this.additionalKey, key, value); break; http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java index cc1d626..ba733f7 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -160,6 +160,18 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable } } + @Override + public void zrem(final String key, final String element) { + try { + jedisCluster.zrem(key, element); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.error("Cannot send Redis message with command ZREM to set {} error message {}", + key, e.getMessage()); + } + } + } + /** * Closes the {@link JedisCluster}. */ http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java index 78771f1..5d7993c 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -107,6 +107,14 @@ public interface RedisCommandsContainer extends Serializable { void zadd(String key, String score, String element); /** + * Removes the specified member from the sorted set stored at key. + * + * @param key The name of the Sorted Set + * @param element element to be removed + */ + void zrem(String key, String element); + + /** * Close the Jedis container. * * @throws IOException if the instance can not be closed properly http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java index fb73a27..b862ea4 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -221,6 +221,23 @@ public class RedisContainer implements RedisCommandsContainer, Closeable { } } + @Override + public void zrem(final String key, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.zrem(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZREM to set {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + /** * Returns Jedis instance from the pool. * http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java index cf9842c..019ad46 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java @@ -61,6 +61,11 @@ public enum RedisCommand { ZADD(RedisDataType.SORTED_SET), /** + * Removes the specified members from the sorted set stored at key. + */ + ZREM(RedisDataType.SORTED_SET), + + /** * Sets field in the hash stored at key to value. If key does not exist, * a new key holding a hash is created. If field already exists in the hash, it is overwritten. */ http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java index e071894..47544f7 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java @@ -35,6 +35,7 @@ public class RedisSinkITCase extends RedisITCaseBase { private FlinkJedisPoolConfig jedisPoolConfig; private static final Long NUM_ELEMENTS = 20L; + private static final Long ZERO = 0L; private static final String REDIS_KEY = "TEST_KEY"; private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; @@ -97,14 +98,22 @@ public class RedisSinkITCase extends RedisITCaseBase { @Test public void testRedisSortedSetDataType() throws Exception { DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet()); - RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, + RedisSink<Tuple2<String, String>> redisZaddSink = new RedisSink<>(jedisPoolConfig, new RedisAdditionalDataMapper(RedisCommand.ZADD)); - source.addSink(redisSink); - env.execute("Test Redis Sorted Set Data Type"); + source.addSink(redisZaddSink); + env.execute("Test ZADD"); assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY)); + RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig, + new RedisAdditionalDataMapper(RedisCommand.ZREM)); + + source.addSink(redisZremSink); + env.execute("Test ZREM"); + + assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY)); + jedis.del(REDIS_ADDITIONAL_KEY); }