This is an automated email from the ASF dual-hosted git repository.
lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new d14557f [BAHIR-205] Support password configuration for redis cluster
d14557f is described below
commit d14557f38959c66be7f5c5e77d5ea3f95c46fd81
Author: Like <[email protected]>
AuthorDate: Sat Jun 8 16:10:18 2019 +0800
[BAHIR-205] Support password configuration for redis cluster
Upgrade jedis to 2.9.0 and add password for redis cluster sink.
Closes #57
---
flink-connector-redis/pom.xml | 2 +-
.../common/config/FlinkJedisClusterConfig.java | 21 +++++++++++--
.../redis/common/config/FlinkJedisConfigBase.java | 13 ++++++++-
.../redis/common/config/FlinkJedisPoolConfig.java | 13 +--------
.../common/config/FlinkJedisSentinelConfig.java | 13 +--------
.../container/RedisCommandsContainerBuilder.java | 8 +++--
.../common/config/FlinkJedisConfigBaseTest.java | 2 +-
.../common/config/JedisClusterConfigTest.java | 34 ++++++++++++++++++++++
8 files changed, 74 insertions(+), 32 deletions(-)
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 3ed9fe8..c9b7a63 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -34,7 +34,7 @@ under the License.
<packaging>jar</packaging>
<properties>
- <jedis.version>2.8.0</jedis.version>
+ <jedis.version>2.9.0</jedis.version>
</properties>
<dependencies>
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index 119ade3..f05dfd8 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -47,11 +47,13 @@ public class FlinkJedisClusterConfig extends
FlinkJedisConfigBase {
* @param maxTotal the maximum number of objects that can be allocated by
the pool
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the
pool
+ * @param password the password of redis cluster
* @throws NullPointerException if parameter {@code nodes} is {@code null}
*/
private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int
connectionTimeout, int maxRedirections,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ int maxTotal, int maxIdle, int minIdle,
+ String password) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
Objects.requireNonNull(nodes, "Node information should be presented");
Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not
be empty");
@@ -94,6 +96,7 @@ public class FlinkJedisClusterConfig extends
FlinkJedisConfigBase {
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private String password;
/**
* Sets list of node.
@@ -165,12 +168,24 @@ public class FlinkJedisClusterConfig extends
FlinkJedisConfigBase {
}
/**
+ * Sets value for the {@code password} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param password the password for accessing redis cluster
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
* Builds JedisClusterConfig.
*
* @return JedisClusterConfig
*/
public FlinkJedisClusterConfig build() {
- return new FlinkJedisClusterConfig(nodes, timeout,
maxRedirections, maxTotal, maxIdle, minIdle);
+ return new FlinkJedisClusterConfig(nodes, timeout,
maxRedirections, maxTotal, maxIdle, minIdle, password);
}
}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 0d821ed..84b1bf2 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -31,8 +31,9 @@ public abstract class FlinkJedisConfigBase implements
Serializable {
protected final int maxIdle;
protected final int minIdle;
protected final int connectionTimeout;
+ protected final String password;
- protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int
maxIdle, int minIdle){
+ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int
maxIdle, int minIdle, String password) {
Util.checkArgument(connectionTimeout >= 0, "connection timeout can not
be negative");
Util.checkArgument(maxTotal >= 0, "maxTotal value can not be
negative");
Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -42,6 +43,7 @@ public abstract class FlinkJedisConfigBase implements
Serializable {
this.maxTotal = maxTotal;
this.maxIdle = maxIdle;
this.minIdle = minIdle;
+ this.password = password;
}
/**
@@ -88,4 +90,13 @@ public abstract class FlinkJedisConfigBase implements
Serializable {
public int getMinIdle() {
return minIdle;
}
+
+ /**
+ * Returns password.
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
}
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index d4c30ff..7c37ecb 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -31,7 +31,6 @@ public class FlinkJedisPoolConfig extends
FlinkJedisConfigBase {
private final String host;
private final int port;
private final int database;
- private final String password;
/**
@@ -50,12 +49,11 @@ public class FlinkJedisPoolConfig extends
FlinkJedisConfigBase {
*/
private FlinkJedisPoolConfig(String host, int port, int connectionTimeout,
String password, int database,
int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
Objects.requireNonNull(host, "Host information should be presented");
this.host = host;
this.port = port;
this.database = database;
- this.password = password;
}
/**
@@ -87,15 +85,6 @@ public class FlinkJedisPoolConfig extends
FlinkJedisConfigBase {
}
/**
- * Returns password.
- *
- * @return password
- */
- public String getPassword() {
- return password;
- }
-
- /**
* Builder for initializing {@link FlinkJedisPoolConfig}.
*/
public static class Builder {
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 6058a53..2fb87b9 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -37,7 +37,6 @@ public class FlinkJedisSentinelConfig extends
FlinkJedisConfigBase {
private final String masterName;
private final Set<String> sentinels;
private final int soTimeout;
- private final String password;
private final int database;
/**
@@ -61,7 +60,7 @@ public class FlinkJedisSentinelConfig extends
FlinkJedisConfigBase {
int connectionTimeout, int soTimeout,
String password, int database,
int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
Objects.requireNonNull(masterName, "Master name should be presented");
Objects.requireNonNull(sentinels, "Sentinels information should be
presented");
Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be
empty");
@@ -69,7 +68,6 @@ public class FlinkJedisSentinelConfig extends
FlinkJedisConfigBase {
this.masterName = masterName;
this.sentinels = new HashSet<>(sentinels);
this.soTimeout = soTimeout;
- this.password = password;
this.database = database;
}
@@ -101,15 +99,6 @@ public class FlinkJedisSentinelConfig extends
FlinkJedisConfigBase {
}
/**
- * Returns password.
- *
- * @return password
- */
- public String getPassword() {
- return password;
- }
-
- /**
* Returns database index.
*
* @return database index
diff --git
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 0db5b05..3e80d57 100644
---
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -88,8 +88,12 @@ public class RedisCommandsContainerBuilder {
genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
- JedisCluster jedisCluster = new
JedisCluster(jedisClusterConfig.getNodes(),
jedisClusterConfig.getConnectionTimeout(),
- jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+ JedisCluster jedisCluster = new
JedisCluster(jedisClusterConfig.getNodes(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ jedisClusterConfig.getPassword(),
+ genericObjectPoolConfig);
return new RedisClusterContainer(jedisCluster);
}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 2601e40..6f519ed 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
private class TestConfig extends FlinkJedisConfigBase {
protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle,
int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
}
}
}
diff --git
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 6d0e787..addb469 100644
---
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -23,6 +23,9 @@ import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
public class JedisClusterConfigTest extends TestLogger {
@Test(expected = NullPointerException.class)
@@ -46,4 +49,35 @@ public class JedisClusterConfigTest extends TestLogger {
.setNodes(set)
.build();
}
+
+ @Test
+ public void shouldSetPasswordSuccessfully() {
+ Set<InetSocketAddress> set = new HashSet<>();
+ InetSocketAddress address =
InetSocketAddress.createUnresolved("localhost", 8080);
+ set.add(address);
+ FlinkJedisClusterConfig.Builder builder = new
FlinkJedisClusterConfig.Builder();
+ FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .setNodes(set)
+ .setPassword("test-pwd")
+ .build();
+ assertEquals("test-pwd", clusterConfig.getPassword());
+ }
+
+ @Test
+ public void shouldPasswordNotBeenSet() {
+ Set<InetSocketAddress> set = new HashSet<>();
+ InetSocketAddress address =
InetSocketAddress.createUnresolved("localhost", 8080);
+ set.add(address);
+ FlinkJedisClusterConfig.Builder builder = new
FlinkJedisClusterConfig.Builder();
+ FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+ .setMaxIdle(0)
+ .setMaxTotal(0)
+ .setTimeout(0)
+ .setNodes(set)
+ .build();
+ assertNull(clusterConfig.getPassword());
+ }
}