This is an automated email from the ASF dual-hosted git repository. eskabetxe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit 522624f481eaf8fdaced084bf166b0e84c664b68 Author: netsi <ne...@users.noreply.github.com> AuthorDate: Tue Oct 11 17:46:07 2022 +0200 [BAHIR-315] add support for SSL connection --- .../common/config/FlinkJedisClusterConfig.java | 28 ++++++++++++++++-- .../container/RedisCommandsContainerBuilder.java | 3 +- .../common/config/JedisClusterConfigTest.java | 33 ++++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java index 0840deb..cc7762a 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java @@ -35,6 +35,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { private final Set<InetSocketAddress> nodes; private final int maxRedirections; + private final boolean ssl; /** @@ -48,13 +49,14 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { * @param maxIdle the cap on the number of "idle" instances in the pool * @param minIdle the minimum number of idle objects to maintain in the pool * @param password the password of redis cluster + * @param ssl Whether SSL connection should be established, default value is false * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false * @throws NullPointerException if parameter {@code nodes} is {@code null} */ private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections, - int maxTotal, int maxIdle, int minIdle, String password, + int maxTotal, int maxIdle, int minIdle, String password, boolean ssl, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) { super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle); @@ -62,6 +64,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); this.nodes = new HashSet<>(nodes); this.maxRedirections = maxRedirections; + this.ssl = ssl; } @@ -88,6 +91,14 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { return maxRedirections; } + /** + * Returns ssl. + * + * @return ssl + */ + public boolean getSsl() { + return ssl; + } /** * Builder for initializing {@link FlinkJedisClusterConfig}. @@ -103,6 +114,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN; private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE; private String password; + private boolean ssl = false; /** * Sets list of node. @@ -185,6 +197,17 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { return this; } + /** + * Sets value for the {@code ssl} configuration attribute. + * + * @param ssl flag if an SSL connection should be established + * @return Builder itself + */ + public Builder setSsl(boolean ssl){ + this.ssl = ssl; + return this; + } + /** * Sets value for the {@code testOnBorrow} configuration attribute * for pools to be created with this configuration instance. @@ -230,7 +253,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { * @return JedisClusterConfig */ public FlinkJedisClusterConfig build() { - return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle); + return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, testOnReturn, testWhileIdle); } } @@ -244,6 +267,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { ", minIdle=" + minIdle + ", connectionTimeout=" + connectionTimeout + ", password=" + password + + ", ssl=" + ssl + ", testOnBorrow=" + testOnBorrow + ", testOnReturn=" + testOnReturn + ", testWhileIdle=" + testWhileIdle + diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java index 0ffbbe0..7f5af3d 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java @@ -92,7 +92,8 @@ public class RedisCommandsContainerBuilder { jedisClusterConfig.getMaxRedirections(), jedisClusterConfig.getPassword(), DEFAULT_CLIENT_NAME, - genericObjectPoolConfig); + genericObjectPoolConfig, + jedisClusterConfig.getSsl()); return new RedisClusterContainer(jedisCluster); } diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java index d64be84..76208b9 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java @@ -25,6 +25,8 @@ import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; public class JedisClusterConfigTest extends TestLogger { @@ -81,4 +83,35 @@ public class JedisClusterConfigTest extends TestLogger { assertNull(clusterConfig.getPassword()); } + @Test + public void shouldSetSslSuccessfully() { + Set<InetSocketAddress> set = new HashSet<>(); + InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080); + set.add(address); + FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder(); + FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0) + .setMaxIdle(0) + .setMaxTotal(0) + .setTimeout(0) + .setNodes(set) + .setSsl(true) + .build(); + assertTrue(clusterConfig.getSsl()); + } + + @Test + public void shouldSslNotBeenSet() { + Set<InetSocketAddress> set = new HashSet<>(); + InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8080); + set.add(address); + FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder(); + FlinkJedisClusterConfig clusterConfig = builder.setMinIdle(0) + .setMaxIdle(0) + .setMaxTotal(0) + .setTimeout(0) + .setNodes(set) + .build(); + assertFalse(clusterConfig.getSsl()); + } + }