This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit 522624f481eaf8fdaced084bf166b0e84c664b68
Author: netsi <ne...@users.noreply.github.com>
AuthorDate: Tue Oct 11 17:46:07 2022 +0200

    [BAHIR-315] add support for SSL connection
---
 .../common/config/FlinkJedisClusterConfig.java     | 28 ++++++++++++++++--
 .../container/RedisCommandsContainerBuilder.java   |  3 +-
 .../common/config/JedisClusterConfigTest.java      | 33 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index 0840deb..cc7762a 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -35,6 +35,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
 
     private final Set<InetSocketAddress> nodes;
     private final int maxRedirections;
+    private final boolean ssl;
 
 
     /**
@@ -48,13 +49,14 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the 
pool
      * @param password the password of redis cluster
+     * @param ssl Whether SSL connection should be established, default value 
is false
      * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned, default value is false
      * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool, default value is false
      * @param testWhileIdle Whether objects sitting idle in the pool will be 
validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int 
connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle, 
String password,
+                                    int maxTotal, int maxIdle, int minIdle, 
String password, boolean ssl,
                                     boolean testOnBorrow, boolean 
testOnReturn, boolean testWhileIdle) {
         super(connectionTimeout, maxTotal, maxIdle, minIdle, password, 
testOnBorrow, testOnReturn, testWhileIdle);
 
@@ -62,6 +64,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not 
be empty");
         this.nodes = new HashSet<>(nodes);
         this.maxRedirections = maxRedirections;
+        this.ssl = ssl;
     }
 
 
@@ -88,6 +91,14 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         return maxRedirections;
     }
 
+    /**
+     * Returns ssl.
+     *
+     * @return ssl
+     */
+    public boolean getSsl() {
+        return ssl;
+    }
 
     /**
      * Builder for initializing  {@link FlinkJedisClusterConfig}.
@@ -103,6 +114,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         private boolean testOnReturn = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
         private boolean testWhileIdle = 
GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
         private String password;
+        private boolean ssl = false;
 
         /**
          * Sets list of node.
@@ -185,6 +197,17 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
             return this;
         }
 
+        /**
+         * Sets value for the {@code ssl} configuration attribute.
+         *
+         * @param ssl flag if an SSL connection should be established
+         * @return Builder itself
+         */
+        public Builder setSsl(boolean ssl){
+            this.ssl = ssl;
+            return this;
+        }
+
         /**
          * Sets value for the {@code testOnBorrow} configuration attribute
          * for pools to be created with this configuration instance.
@@ -230,7 +253,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, 
testOnReturn, testWhileIdle);
+            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, 
testOnReturn, testWhileIdle);
         }
     }
 
@@ -244,6 +267,7 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
           ", minIdle=" + minIdle +
           ", connectionTimeout=" + connectionTimeout +
           ", password=" + password +
+          ", ssl=" + ssl +
           ", testOnBorrow=" + testOnBorrow +
           ", testOnReturn=" + testOnReturn +
           ", testWhileIdle=" + testWhileIdle +
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index 0ffbbe0..7f5af3d 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -92,7 +92,8 @@ public class RedisCommandsContainerBuilder {
           jedisClusterConfig.getMaxRedirections(),
           jedisClusterConfig.getPassword(),
           DEFAULT_CLIENT_NAME,
-          genericObjectPoolConfig);
+          genericObjectPoolConfig,
+          jedisClusterConfig.getSsl());
         return new RedisClusterContainer(jedisCluster);
     }
 
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
index d64be84..76208b9 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -25,6 +25,8 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 public class JedisClusterConfigTest extends TestLogger {
 
@@ -81,4 +83,35 @@ public class JedisClusterConfigTest extends TestLogger {
         assertNull(clusterConfig.getPassword());
     }
 
+    @Test
+    public void shouldSetSslSuccessfully() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .setSsl(true)
+                .build();
+        assertTrue(clusterConfig.getSsl());
+    }
+
+    @Test
+    public void shouldSslNotBeenSet() {
+        Set<InetSocketAddress> set = new HashSet<>();
+        InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 8080);
+        set.add(address);
+        FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+        FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0)
+                .setMaxIdle(0)
+                .setMaxTotal(0)
+                .setTimeout(0)
+                .setNodes(set)
+                .build();
+        assertFalse(clusterConfig.getSsl());
+    }
+
 }

Reply via email to