This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f968c81  [BAHIR-262] Add support to redis cluster password (#101)
f968c81 is described below

commit f968c8183fff067569a0924a0a407b0f8c5956ff
Author: ahern88 <[email protected]>
AuthorDate: Thu Mar 11 20:59:12 2021 +0800

    [BAHIR-262] Add support to redis cluster password (#101)
    
    * support set redis cluster password
---
 .../streaming/connectors/redis/RedisTableSinkFactory.java    |  1 +
 .../config/handler/FlinkJedisClusterConfigHandler.java       | 12 ++++++++++--
 .../connectors/redis/descriptor/RedisValidator.java          |  1 +
 .../streaming/connectors/redis/common/RedisHandlerTest.java  | 12 ++++++++++++
 .../redis/common/config/JedisClusterConfigTest.java          |  1 +
 5 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
index 0ddbcea..16dfdbc 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -57,6 +57,7 @@ public class RedisTableSinkFactory implements 
StreamTableSinkFactory<Tuple2<Bool
         properties.add(REDIS_COMMAND);
         properties.add(REDIS_NODES);
         properties.add(REDIS_MASTER_NAME);
+        properties.add(REDIS_CLUSTER_PASSWORD);
         properties.add(REDIS_SENTINEL);
         properties.add(REDIS_KEY_TTL);
         // schema
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
index 6ac0c0a..9e85182 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.connectors.redis.common.config.handler;
 import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER;
 import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
 import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER_PASSWORD;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
@@ -26,6 +27,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
 import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
 import 
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
@@ -44,8 +47,13 @@ public class FlinkJedisClusterConfigHandler implements 
FlinkJedisConfigHandler {
             String[] arr = r.split(":");
             return new InetSocketAddress(arr[0].trim(), 
Integer.parseInt(arr[1].trim()));
         }).collect(Collectors.toSet());
-        return new FlinkJedisClusterConfig.Builder()
-                .setNodes(nodes).build();
+        String clusterPassword = 
properties.getOrDefault(REDIS_CLUSTER_PASSWORD, null);
+        FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+        builder.setNodes(nodes);
+        if (StringUtils.isNotBlank(clusterPassword)) {
+            builder.setPassword(clusterPassword);
+        }
+        return builder.build();
     }
 
     @Override
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
index ac97722..2b4aae2 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
@@ -30,6 +30,7 @@ public class RedisValidator {
     public static final String REDIS_MASTER_NAME = "master.name";
     public static final String SENTINELS_INFO = "sentinels.info";
     public static final String SENTINELS_PASSWORD = "sentinels.password";
+    public static final String REDIS_CLUSTER_PASSWORD = "cluster.password";
     public static final String REDIS_KEY_TTL = "key.ttl";
 
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
index ee1e789..9fe3b0c 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
@@ -22,8 +22,10 @@ import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValida
 import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
 import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
 import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
+import static 
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER_PASSWORD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -48,6 +50,7 @@ public class RedisHandlerTest extends AbstractTestBase {
         properties.put(REDIS_COMMAND, RedisCommand.SETEX.name());
         properties.put(REDIS_NODES, "localhost:8080");
         properties.put(REDIS_KEY_TTL, "1000");
+        properties.put(REDIS_CLUSTER_PASSWORD, "test-pwd");
     }
 
     @Test
@@ -65,4 +68,13 @@ public class RedisHandlerTest extends AbstractTestBase {
                 .createFlinkJedisConfig(properties);
         assertTrue(flinkJedisConfigBase instanceof FlinkJedisClusterConfig);
     }
+
+    @Test
+    public void testFlinkJedisConfigHasPassword() {
+        FlinkJedisConfigBase flinkJedisConfigBase =  RedisHandlerServices
+                .findRedisHandler(FlinkJedisConfigHandler.class, properties)
+                .createFlinkJedisConfig(properties);
+        assertNotNull(flinkJedisConfigBase.getPassword());
+        assertEquals("test-pwd", flinkJedisConfigBase.getPassword());
+    }
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index addb469..d64be84 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -80,4 +80,5 @@ public class JedisClusterConfigTest extends TestLogger {
                 .build();
         assertNull(clusterConfig.getPassword());
     }
+
 }

Reply via email to