This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f92704  [BAHIR-85] Enable changing additional key without restarting 
(#60)
4f92704 is described below

commit 4f92704328f5e4be4a6493193c132461cf9b2cc3
Author: Ton van Bart <[email protected]>
AuthorDate: Thu Jul 4 00:11:15 2019 +0200

    [BAHIR-85] Enable changing additional key without restarting (#60)
    
    We have a use case where we want to sink data to Redis as hashes but have 
the hashes stored under different keys which are also extracted from the data. 
This change makes this possible in a backward-compatible manner.
---
 .../apache/flink/streaming/connectors/redis/RedisSink.java   |  8 +++++---
 .../connectors/redis/common/mapper/RedisMapper.java          | 12 ++++++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 6a03f11..e468772 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * A sink that delivers data to a Redis channel using the Jedis client.
@@ -128,6 +129,7 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
     public void invoke(IN input, Context context) throws Exception {
         String key = redisSinkMapper.getKeyFromData(input);
         String value = redisSinkMapper.getValueFromData(input);
+        Optional<String> optAdditionalKey = 
redisSinkMapper.getAdditionalKey(input);
 
         switch (redisCommand) {
             case RPUSH:
@@ -149,13 +151,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
                 this.redisCommandsContainer.publish(key, value);
                 break;
             case ZADD:
-                this.redisCommandsContainer.zadd(this.additionalKey, value, 
key);
+                
this.redisCommandsContainer.zadd(optAdditionalKey.orElse(this.additionalKey), 
value, key);
                 break;
             case ZREM:
-                this.redisCommandsContainer.zrem(this.additionalKey, key);
+                
this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), 
key);
                 break;
             case HSET:
-                this.redisCommandsContainer.hset(this.additionalKey, key, 
value);
+                
this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), 
key, value);
                 break;
             default:
                 throw new IllegalArgumentException("Cannot process such data 
type: " + redisCommand);
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
index b2580a7..96df75e 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.connectors.redis.common.mapper;
 import org.apache.flink.api.common.functions.Function;
 
 import java.io.Serializable;
+import java.util.Optional;
 
 /**
  * Function that creates the description how the input data should be mapped 
to redis type.
@@ -63,4 +64,15 @@ public interface RedisMapper<T> extends Function, 
Serializable {
      * @return value
      */
     String getValueFromData(T data);
+
+    /**
+     * Extracts the additional key from data as an {@link Optional<String>}.
+     * The default implementation returns an empty Optional.
+     *
+     * @param data
+     * @return Optional
+     */
+    default Optional<String> getAdditionalKey(T data) {
+        return Optional.empty();
+    }
 }

Reply via email to