This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d14557f  [BAHIR-205] Support password configuration for redis cluster
d14557f is described below

commit d14557f38959c66be7f5c5e77d5ea3f95c46fd81
Author: Like <[email protected]>
AuthorDate: Sat Jun 8 16:10:18 2019 +0800

    [BAHIR-205] Support password configuration for redis cluster
    
    Upgrade jedis to 2.9.0 and add password for redis cluster sink.
    
    Closes #57
---
 flink-connector-redis/pom.xml                      |  2 +-
 .../common/config/FlinkJedisClusterConfig.java     | 21 +++++++++++--
 .../redis/common/config/FlinkJedisConfigBase.java  | 13 ++++++++-
 .../redis/common/config/FlinkJedisPoolConfig.java  | 13 +--------
 .../common/config/FlinkJedisSentinelConfig.java    | 13 +--------
 .../container/RedisCommandsContainerBuilder.java   |  8 +++--
 .../common/config/FlinkJedisConfigBaseTest.java    |  2 +-
 .../common/config/JedisClusterConfigTest.java      | 34 ++++++++++++++++++++++
 8 files changed, 74 insertions(+), 32 deletions(-)

diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
index 3ed9fe8..c9b7a63 100644
--- a/flink-connector-redis/pom.xml
+++ b/flink-connector-redis/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <packaging>jar</packaging>
 
     <properties>
-        <jedis.version>2.8.0</jedis.version>
+        <jedis.version>2.9.0</jedis.version>
     </properties>
 
     <dependencies>
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index 119ade3..f05dfd8 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -47,11 +47,13 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
      * @param maxTotal the maximum number of objects that can be allocated by 
the pool
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the 
pool
+     * @param password the password of redis cluster
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int 
connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+                                    int maxTotal, int maxIdle, int minIdle,
+                                    String password) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
 
         Objects.requireNonNull(nodes, "Node information should be presented");
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not 
be empty");
@@ -94,6 +96,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private String password;
 
         /**
          * Sets list of node.
@@ -165,12 +168,24 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         }
 
         /**
+         * Sets value for the {@code password} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param password the password for accessing redis cluster
+         * @return Builder itself
+         */
+        public Builder setPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /**
          * Builds JedisClusterConfig.
          *
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle);
+            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle, password);
         }
     }
 
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 0d821ed..84b1bf2 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -31,8 +31,9 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
     protected final int maxIdle;
     protected final int minIdle;
     protected final int connectionTimeout;
+    protected final String password;
 
-    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle){
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle, String password) {
         Util.checkArgument(connectionTimeout >= 0, "connection timeout can not 
be negative");
         Util.checkArgument(maxTotal >= 0, "maxTotal value can not be 
negative");
         Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -42,6 +43,7 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
         this.maxTotal = maxTotal;
         this.maxIdle = maxIdle;
         this.minIdle = minIdle;
+        this.password = password;
     }
 
     /**
@@ -88,4 +90,13 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
     public int getMinIdle() {
         return minIdle;
     }
+
+    /**
+     * Returns password.
+     *
+     * @return password
+     */
+    public String getPassword() {
+        return password;
+    }
 }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index d4c30ff..7c37ecb 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -31,7 +31,6 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
     private final String host;
     private final int port;
     private final int database;
-    private final String password;
 
 
     /**
@@ -50,12 +49,11 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
      */
     private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, 
String password, int database,
                                 int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
         Objects.requireNonNull(host, "Host information should be presented");
         this.host = host;
         this.port = port;
         this.database = database;
-        this.password = password;
     }
 
     /**
@@ -87,15 +85,6 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
     }
 
     /**
-     * Returns password.
-     *
-     * @return password
-     */
-    public String getPassword() {
-        return password;
-    }
-
-    /**
      * Builder for initializing  {@link FlinkJedisPoolConfig}.
      */
     public static class Builder {
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 6058a53..2fb87b9 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -37,7 +37,6 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
     private final String masterName;
     private final Set<String> sentinels;
     private final int soTimeout;
-    private final String password;
     private final int database;
 
     /**
@@ -61,7 +60,7 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
                                     int connectionTimeout, int soTimeout,
                                     String password, int database,
                                     int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
         Objects.requireNonNull(masterName, "Master name should be presented");
         Objects.requireNonNull(sentinels, "Sentinels information should be 
presented");
         Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be 
empty");
@@ -69,7 +68,6 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
         this.masterName = masterName;
         this.sentinels = new HashSet<>(sentinels);
         this.soTimeout = soTimeout;
-        this.password = password;
         this.database = database;
     }
 
@@ -101,15 +99,6 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
     }
 
     /**
-     * Returns password.
-     *
-     * @return password
-     */
-    public String getPassword() {
-        return password;
-    }
-
-    /**
      * Returns database index.
      *
      * @return database index
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 0db5b05..3e80d57 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -88,8 +88,12 @@ public class RedisCommandsContainerBuilder {
         genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
         genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
 
-        JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(), 
jedisClusterConfig.getConnectionTimeout(),
-            jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+        JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
+                jedisClusterConfig.getConnectionTimeout(),
+                jedisClusterConfig.getConnectionTimeout(),
+                jedisClusterConfig.getMaxRedirections(),
+                jedisClusterConfig.getPassword(),
+                genericObjectPoolConfig);
         return new RedisClusterContainer(jedisCluster);
     }
 
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 2601e40..6f519ed 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
     private class TestConfig extends FlinkJedisConfigBase {
 
         protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, 
int minIdle) {
-            super(connectionTimeout, maxTotal, maxIdle, minIdle);
+            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
         }
     }
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 6d0e787..addb469 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -23,6 +23,9 @@ import java.net.InetSocketAddress;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 public class JedisClusterConfigTest extends TestLogger {
 
     @Test(expected = NullPointerException.class)
@@ -46,4 +49,35 @@ public class JedisClusterConfigTest extends TestLogger {
             .setNodes(set)
             .build();
     }
+
+    @Test
+    public void shouldSetPasswordSuccessfully() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .setPassword("test-pwd")
+                .build();
+        assertEquals("test-pwd", clusterConfig.getPassword());
+    }
+
+    @Test
+    public void shouldPasswordNotBeenSet() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .build();
+        assertNull(clusterConfig.getPassword());
+    }
 }

Reply via email to