This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new f968c81 [BAHIR-262] Add support to redis cluster password (#101)
f968c81 is described below
commit f968c8183fff067569a0924a0a407b0f8c5956ff
Author: ahern88 <[email protected]>
AuthorDate: Thu Mar 11 20:59:12 2021 +0800
[BAHIR-262] Add support to redis cluster password (#101)
* support set redis cluster password
---
.../streaming/connectors/redis/RedisTableSinkFactory.java | 1 +
.../config/handler/FlinkJedisClusterConfigHandler.java | 12 ++++++++++--
.../connectors/redis/descriptor/RedisValidator.java | 1 +
.../streaming/connectors/redis/common/RedisHandlerTest.java | 12 ++++++++++++
.../redis/common/config/JedisClusterConfigTest.java | 1 +
5 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
index 0ddbcea..16dfdbc 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java
@@ -57,6 +57,7 @@ public class RedisTableSinkFactory implements
StreamTableSinkFactory<Tuple2<Bool
properties.add(REDIS_COMMAND);
properties.add(REDIS_NODES);
properties.add(REDIS_MASTER_NAME);
+ properties.add(REDIS_CLUSTER_PASSWORD);
properties.add(REDIS_SENTINEL);
properties.add(REDIS_KEY_TTL);
// schema
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
index 6ac0c0a..9e85182 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/handler/FlinkJedisClusterConfigHandler.java
@@ -19,6 +19,7 @@ package
org.apache.flink.streaming.connectors.redis.common.config.handler;
import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER;
import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER_PASSWORD;
import java.net.InetSocketAddress;
import java.util.Arrays;
@@ -26,6 +27,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import
org.apache.flink.streaming.connectors.redis.common.hanlder.FlinkJedisConfigHandler;
@@ -44,8 +47,13 @@ public class FlinkJedisClusterConfigHandler implements
FlinkJedisConfigHandler {
String[] arr = r.split(":");
return new InetSocketAddress(arr[0].trim(),
Integer.parseInt(arr[1].trim()));
}).collect(Collectors.toSet());
- return new FlinkJedisClusterConfig.Builder()
- .setNodes(nodes).build();
+ String clusterPassword =
properties.getOrDefault(REDIS_CLUSTER_PASSWORD, null);
+ FlinkJedisClusterConfig.Builder builder = new
FlinkJedisClusterConfig.Builder();
+ builder.setNodes(nodes);
+ if (StringUtils.isNotBlank(clusterPassword)) {
+ builder.setPassword(clusterPassword);
+ }
+ return builder.build();
}
@Override
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
index ac97722..2b4aae2 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java
@@ -30,6 +30,7 @@ public class RedisValidator {
public static final String REDIS_MASTER_NAME = "master.name";
public static final String SENTINELS_INFO = "sentinels.info";
public static final String SENTINELS_PASSWORD = "sentinels.password";
+ public static final String REDIS_CLUSTER_PASSWORD = "cluster.password";
public static final String REDIS_KEY_TTL = "key.ttl";
}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
index ee1e789..9fe3b0c 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/RedisHandlerTest.java
@@ -22,8 +22,10 @@ import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValida
import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL;
import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_MODE;
import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_NODES;
+import static
org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_CLUSTER_PASSWORD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
import java.util.HashMap;
import java.util.Map;
@@ -48,6 +50,7 @@ public class RedisHandlerTest extends AbstractTestBase {
properties.put(REDIS_COMMAND, RedisCommand.SETEX.name());
properties.put(REDIS_NODES, "localhost:8080");
properties.put(REDIS_KEY_TTL, "1000");
+ properties.put(REDIS_CLUSTER_PASSWORD, "test-pwd");
}
@Test
@@ -65,4 +68,13 @@ public class RedisHandlerTest extends AbstractTestBase {
.createFlinkJedisConfig(properties);
assertTrue(flinkJedisConfigBase instanceof FlinkJedisClusterConfig);
}
+
+ @Test
+ public void testFlinkJedisConfigHasPassword() {
+ FlinkJedisConfigBase flinkJedisConfigBase = RedisHandlerServices
+ .findRedisHandler(FlinkJedisConfigHandler.class, properties)
+ .createFlinkJedisConfig(properties);
+ assertNotNull(flinkJedisConfigBase.getPassword());
+ assertEquals("test-pwd", flinkJedisConfigBase.getPassword());
+ }
}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index addb469..d64be84 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -80,4 +80,5 @@ public class JedisClusterConfigTest extends TestLogger {
.build();
assertNull(clusterConfig.getPassword());
}
+
}