This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new cd8f61e [BAHIR-285] Redis: add srem command (#164)
cd8f61e is described below
commit cd8f61e2808d9e62650632e5e1ce5a38843ef346
Author: Hyeonho Kim <[email protected]>
AuthorDate: Wed May 31 01:09:45 2023 +0900
[BAHIR-285] Redis: add srem command (#164)
---
.../flink/streaming/connectors/redis/RedisSink.java | 3 +++
.../redis/common/container/RedisClusterContainer.java | 13 +++++++++++++
.../redis/common/container/RedisCommandsContainer.java | 10 ++++++++++
.../redis/common/container/RedisContainer.java | 17 +++++++++++++++++
.../connectors/redis/common/mapper/RedisCommand.java | 5 +++++
.../streaming/connectors/redis/RedisSinkITCase.java | 13 ++++++++++---
6 files changed, 58 insertions(+), 3 deletions(-)
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 0101339..a6a8d9b 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -152,6 +152,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
+ case SREM:
+ this.redisCommandsContainer.srem(key, value);
+ break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 5c67aae..eb79738 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -133,6 +133,19 @@ public class RedisClusterContainer implements
RedisCommandsContainer, Closeable
}
}
+ @Override
+ public void srem(final String setName, final String value) {
+ try {
+ jedisCluster.srem(setName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SREM to set
{} error message {}",
+ setName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
@Override
public void publish(final String channelName, final String message) {
try {
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 9fbad93..26007f4 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -82,6 +82,16 @@ public interface RedisCommandsContainer extends Serializable
{
*/
void sadd(String setName, String value);
+
+ /**
+ * Remove the specified member from the set stored at key.
+ * Specified members that are not a member of this set are ignored.
+ * If key does not exist, an exception will be raised.
+ * @param setName
+ * @param value
+ */
+ void srem(String setName, String value);
+
/**
* Posts a message to the given channel.
*
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 955fa31..5857d25 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -193,6 +193,23 @@ public class RedisContainer implements
RedisCommandsContainer, Closeable {
}
}
+ @Override
+ public void srem(final String setName, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.srem(setName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SREM to set
{} error message {}",
+ setName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
@Override
public void publish(final String channelName, final String message) {
Jedis jedis = null;
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 1e48e7f..3bf4991 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -39,6 +39,11 @@ public enum RedisCommand {
*/
SADD(RedisDataType.SET),
+ /**
+ * Remove the specified members from the set stored at key.
+ */
+ SREM(RedisDataType.SET),
+
/**
* Set key to hold the string value. If key already holds a value,
* it is overwritten, regardless of its type.
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 72fdf09..70dfe03 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -77,14 +77,21 @@ public class RedisSinkITCase extends RedisITCaseBase {
@Test
public void testRedisSetDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new
TestSourceFunction());
- RedisSink<Tuple2<String, String>> redisSink = new
RedisSink<>(jedisPoolConfig,
+ RedisSink<Tuple2<String, String>> redisSaddSink = new
RedisSink<>(jedisPoolConfig,
new RedisCommandMapper(RedisCommand.SADD));
- source.addSink(redisSink);
- env.execute("Test Redis Set Data Type");
+ source.addSink(redisSaddSink);
+ env.execute("Test SADD");
assertEquals(NUM_ELEMENTS.longValue(), jedis.scard(REDIS_KEY));
+ RedisSink<Tuple2<String, String>> redisSremSink = new
RedisSink<>(jedisPoolConfig,
+ new RedisCommandMapper(RedisCommand.SREM));
+ source.addSink(redisSremSink);
+ env.execute("Test SREM");
+
+ assertEquals(ZERO.longValue(), jedis.scard(REDIS_KEY));
+
jedis.del(REDIS_KEY);
}