This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit 510a2b18ea8ad42932a37604b57ad5d47fb62a03
Author: Sebastian Ramirez <[email protected]>
AuthorDate: Thu Oct 20 16:49:51 2022 -0700

    [BAHIR-315] Move SSL config to FlinkJedisConfigBase
    
    This allows us to support SSL connections in non-cluster configurations.
    
    JedisSentinelPool currently doesn't support SSL connections.
---
 .../common/config/FlinkJedisClusterConfig.java     | 30 +++++++---------------
 .../redis/common/config/FlinkJedisConfigBase.java  | 13 +++++++++-
 .../redis/common/config/FlinkJedisPoolConfig.java  | 22 +++++++++++++---
 .../common/config/FlinkJedisSentinelConfig.java    |  2 +-
 .../container/RedisCommandsContainerBuilder.java   | 13 +++++++---
 .../common/config/FlinkJedisConfigBaseTest.java    |  2 +-
 .../common/config/JedisClusterConfigTest.java      |  6 ++---
 7 files changed, 54 insertions(+), 34 deletions(-)

diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index cc7762a..2995572 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -35,8 +35,6 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
 
     private final Set<InetSocketAddress> nodes;
     private final int maxRedirections;
-    private final boolean ssl;
-
 
     /**
      * Jedis cluster configuration.
@@ -49,22 +47,21 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the 
pool
      * @param password the password of redis cluster
-     * @param ssl Whether SSL connection should be established, default value 
is false
+     * @param useSsl Whether SSL connection should be established, default 
value is false
      * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned, default value is false
      * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool, default value is false
      * @param testWhileIdle Whether objects sitting idle in the pool will be 
validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int 
connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle, 
String password, boolean ssl,
+                                    int maxTotal, int maxIdle, int minIdle, 
String password, boolean useSsl,
                                     boolean testOnBorrow, boolean 
testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, 
testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, 
testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(nodes, "Node information should be presented");
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not 
be empty");
         this.nodes = new HashSet<>(nodes);
         this.maxRedirections = maxRedirections;
-        this.ssl = ssl;
     }
 
 
@@ -91,15 +88,6 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         return maxRedirections;
     }
 
-    /**
-     * Returns ssl.
-     *
-     * @return ssl
-     */
-    public boolean getSsl() {
-        return ssl;
-    }
-
     /**
      * Builder for initializing  {@link FlinkJedisClusterConfig}.
      */
@@ -114,7 +102,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         private boolean testOnReturn = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = 
GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
         private String password;
-        private boolean ssl = false;
+        private boolean useSsl = false;
 
         /**
          * Sets list of node.
@@ -200,11 +188,11 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         /**
          * Sets value for the {@code ssl} configuration attribute.
          *
-         * @param ssl flag if an SSL connection should be established
+         * @param useSsl flag if an SSL connection should be established
          * @return Builder itself
          */
-        public Builder setSsl(boolean ssl){
-            this.ssl = ssl;
+        public Builder setUseSsl(boolean useSsl){
+            this.useSsl = useSsl;
             return this;
         }
 
@@ -253,7 +241,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, 
testOnReturn, testWhileIdle);
+            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, 
testOnReturn, testWhileIdle);
         }
     }
 
@@ -267,7 +255,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
           ", minIdle=" + minIdle +
           ", connectionTimeout=" + connectionTimeout +
           ", password=" + password +
-          ", ssl=" + ssl +
+          ", useSsl=" + useSsl +
           ", testOnBorrow=" + testOnBorrow +
           ", testOnReturn=" + testOnReturn +
           ", testWhileIdle=" + testWhileIdle +
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index a41b0e0..4e68091 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -32,12 +32,13 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
     protected final int minIdle;
     protected final int connectionTimeout;
     protected final String password;
+    protected final boolean useSsl;
 
     protected final boolean testOnBorrow;
     protected final boolean testOnReturn;
     protected final boolean testWhileIdle;
 
-    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle, String password, boolean testOnBorrow, boolean 
testOnReturn, boolean testWhileIdle) {
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle, String password, boolean useSsl, boolean testOnBorrow, 
boolean testOnReturn, boolean testWhileIdle) {
 
         Util.checkArgument(connectionTimeout >= 0, "connection timeout can not 
be negative");
         Util.checkArgument(maxTotal >= 0, "maxTotal value can not be 
negative");
@@ -52,6 +53,7 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
         this.testOnReturn = testOnReturn;
         this.testWhileIdle = testWhileIdle;
         this.password = password;
+        this.useSsl = useSsl;
     }
 
     /**
@@ -108,6 +110,15 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
         return password;
     }
 
+
+    /**
+     * Whether connection to Redis should use SSL
+     * @return true if connection to Redis uses SSL, false otherwise
+     */
+    public boolean getUseSsl() {
+        return useSsl;
+    }
+
     /**
      * Get the value for the {@code testOnBorrow} configuration attribute
      * for pools to be created with this configuration instance.
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 5012da1..86c717b 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -32,7 +32,6 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
     private final int port;
     private final int database;
 
-
     /**
      * Jedis pool configuration.
      * The host is mandatory, and when host is not set, it throws 
NullPointerException.
@@ -41,6 +40,7 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
      * @param port port, default value is 6379
      * @param connectionTimeout socket / connection timeout, default value is 
2000 milli second
      * @param password password, if any
+     * @param useSsl Whether SSL connection should be established, default 
value is false
      * @param database database index
      * @param maxTotal the maximum number of objects that can be allocated by 
the pool, default value is 8
      * @param maxIdle the cap on the number of "idle" instances in the pool, 
default value is 8
@@ -50,10 +50,10 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
      * @param testWhileIdle Whether objects sitting idle in the pool will be 
validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code host} is {@code null}
      */
-    private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, 
String password, int database,
+    private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, 
String password, boolean useSsl, int database,
                                  int maxTotal, int maxIdle, int minIdle,
                                  boolean testOnBorrow, boolean testOnReturn, 
boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, 
testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, 
testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(host, "Host information should be presented");
         this.host = host;
@@ -104,6 +104,7 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
         private boolean testOnBorrow = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
         private boolean testOnReturn = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = 
GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
+        private boolean useSsl = false;
 
         /**
          * Sets value for the {@code maxTotal} configuration attribute
@@ -235,13 +236,25 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
             return this;
         }
 
+
+        /**
+         * Sets value for the {@code ssl} configuration attribute.
+         *
+         * @param useSsl flag if an SSL connection should be established
+         * @return Builder itself
+         */
+        public Builder setUseSsl(boolean useSsl) {
+            this.useSsl = useSsl;
+            return this;
+        }
+
         /**
          * Builds JedisPoolConfig.
          *
          * @return JedisPoolConfig
          */
         public FlinkJedisPoolConfig build() {
-            return new FlinkJedisPoolConfig(host, port, timeout, password, 
database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, 
testWhileIdle);
+            return new FlinkJedisPoolConfig(host, port, timeout, password, 
useSsl, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, 
testWhileIdle);
         }
     }
 
@@ -250,6 +263,7 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
         return "FlinkJedisPoolConfig{" +
           "host=" + host +
           ", port=" + port +
+          ", useSsl=" + useSsl +
           ", database=" + database +
           ", maxTotal=" + maxTotal +
           ", maxIdle=" + maxIdle +
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 340eb4e..a6a29ff 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -63,7 +63,7 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
                                      String password, int database,
                                      int maxTotal, int maxIdle, int minIdle,
                                      boolean testOnBorrow, boolean 
testOnReturn, boolean testWhileIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, 
testOnBorrow, testOnReturn, testWhileIdle);
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, false, 
testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(masterName, "Master name should be presented");
         Objects.requireNonNull(sentinels, "Sentinels information should be 
presented");
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 7f5af3d..6dd43a9 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -69,8 +69,11 @@ public class RedisCommandsContainerBuilder {
         GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisPoolConfig);
 
         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
-          jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), 
jedisPoolConfig.getPassword(),
-          jedisPoolConfig.getDatabase());
+          jedisPoolConfig.getPort(),
+          jedisPoolConfig.getConnectionTimeout(),
+          jedisPoolConfig.getPassword(),
+          jedisPoolConfig.getDatabase(),
+          jedisPoolConfig.getUseSsl());
         return new RedisContainer(jedisPool);
     }
 
@@ -93,7 +96,7 @@ public class RedisCommandsContainerBuilder {
           jedisClusterConfig.getPassword(),
           DEFAULT_CLIENT_NAME,
           genericObjectPoolConfig,
-          jedisClusterConfig.getSsl());
+          jedisClusterConfig.getUseSsl());
         return new RedisClusterContainer(jedisCluster);
     }
 
@@ -109,6 +112,10 @@ public class RedisCommandsContainerBuilder {
 
         GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisSentinelConfig);
 
+        if (jedisSentinelConfig.getUseSsl()) {
+            throw new RuntimeException("JedisSentinelPool does not support SSL 
connections yet.");
+        }
+
         JedisSentinelPool jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
           jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
           jedisSentinelConfig.getConnectionTimeout(), 
jedisSentinelConfig.getSoTimeout(),
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 80189df..54984d5 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -44,7 +44,7 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
     private class TestConfig extends FlinkJedisConfigBase {
         protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, 
int minIdle,
                              boolean testOnBorrow, boolean testOnReturn, 
boolean testWhileIdle) {
-            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", 
testOnBorrow, testOnReturn, testWhileIdle);
+            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", 
false, testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index 76208b9..7cefc1f 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -94,9 +94,9 @@ public class JedisClusterConfigTest extends TestLogger {
                 .setMaxTotal(0)
                 .setTimeout(0)
                 .setNodes(set)
-                .setSsl(true)
+                .setUseSsl(true)
                 .build();
-        assertTrue(clusterConfig.getSsl());
+        assertTrue(clusterConfig.getUseSsl());
     }
 
     @Test
@@ -111,7 +111,7 @@ public class JedisClusterConfigTest extends TestLogger {
                 .setTimeout(0)
                 .setNodes(set)
                 .build();
-        assertFalse(clusterConfig.getSsl());
+        assertFalse(clusterConfig.getUseSsl());
     }
 
 }

Reply via email to