This is an automated email from the ASF dual-hosted git repository. eskabetxe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit 510a2b18ea8ad42932a37604b57ad5d47fb62a03 Author: Sebastian Ramirez <[email protected]> AuthorDate: Thu Oct 20 16:49:51 2022 -0700 [BAHIR-315] Move SSL config to FlinkJedisConfigBase This allows us to support SSL connections in non-cluster configurations. JedisSentinelPool currently doesn't support SSL connections. --- .../common/config/FlinkJedisClusterConfig.java | 30 +++++++--------------- .../redis/common/config/FlinkJedisConfigBase.java | 13 +++++++++- .../redis/common/config/FlinkJedisPoolConfig.java | 22 +++++++++++++--- .../common/config/FlinkJedisSentinelConfig.java | 2 +- .../container/RedisCommandsContainerBuilder.java | 13 +++++++--- .../common/config/FlinkJedisConfigBaseTest.java | 2 +- .../common/config/JedisClusterConfigTest.java | 6 ++--- 7 files changed, 54 insertions(+), 34 deletions(-) diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java index cc7762a..2995572 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java @@ -35,8 +35,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { private final Set<InetSocketAddress> nodes; private final int maxRedirections; - private final boolean ssl; - /** * Jedis cluster configuration. @@ -49,22 +47,21 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { * @param maxIdle the cap on the number of "idle" instances in the pool * @param minIdle the minimum number of idle objects to maintain in the pool * @param password the password of redis cluster - * @param ssl Whether SSL connection should be established, default value is false + * @param useSsl Whether SSL connection should be established, default value is false * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false * @throws NullPointerException if parameter {@code nodes} is {@code null} */ private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections, - int maxTotal, int maxIdle, int minIdle, String password, boolean ssl, + int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) { - super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle); + super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle); Objects.requireNonNull(nodes, "Node information should be presented"); Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); this.nodes = new HashSet<>(nodes); this.maxRedirections = maxRedirections; - this.ssl = ssl; } @@ -91,15 +88,6 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { return maxRedirections; } - /** - * Returns ssl. - * - * @return ssl - */ - public boolean getSsl() { - return ssl; - } - /** * Builder for initializing {@link FlinkJedisClusterConfig}. */ @@ -114,7 +102,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN; private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE; private String password; - private boolean ssl = false; + private boolean useSsl = false; /** * Sets list of node. @@ -200,11 +188,11 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { /** * Sets value for the {@code ssl} configuration attribute. * - * @param ssl flag if an SSL connection should be established + * @param useSsl flag if an SSL connection should be established * @return Builder itself */ - public Builder setSsl(boolean ssl){ - this.ssl = ssl; + public Builder setUseSsl(boolean useSsl){ + this.useSsl = useSsl; return this; } @@ -253,7 +241,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { * @return JedisClusterConfig */ public FlinkJedisClusterConfig build() { - return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, ssl, testOnBorrow, testOnReturn, testWhileIdle); + return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle); } } @@ -267,7 +255,7 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { ", minIdle=" + minIdle + ", connectionTimeout=" + connectionTimeout + ", password=" + password + - ", ssl=" + ssl + + ", useSsl=" + useSsl + ", testOnBorrow=" + testOnBorrow + ", testOnReturn=" + testOnReturn + ", testWhileIdle=" + testWhileIdle + diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java index a41b0e0..4e68091 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java @@ -32,12 +32,13 @@ public abstract class FlinkJedisConfigBase implements Serializable { protected final int minIdle; protected final int connectionTimeout; protected final String password; + protected final boolean useSsl; protected final boolean testOnBorrow; protected final boolean testOnReturn; protected final boolean testWhileIdle; - protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) { + protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean useSsl, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) { Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative"); Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative"); @@ -52,6 +53,7 @@ public abstract class FlinkJedisConfigBase implements Serializable { this.testOnReturn = testOnReturn; this.testWhileIdle = testWhileIdle; this.password = password; + this.useSsl = useSsl; } /** @@ -108,6 +110,15 @@ public abstract class FlinkJedisConfigBase implements Serializable { return password; } + + /** + * Whether connection to Redis should use SSL + * @return true if connection to Redis uses SSL, false otherwise + */ + public boolean getUseSsl() { + return useSsl; + } + /** * Get the value for the {@code testOnBorrow} configuration attribute * for pools to be created with this configuration instance. diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java index 5012da1..86c717b 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java @@ -32,7 +32,6 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { private final int port; private final int database; - /** * Jedis pool configuration. * The host is mandatory, and when host is not set, it throws NullPointerException. @@ -41,6 +40,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { * @param port port, default value is 6379 * @param connectionTimeout socket / connection timeout, default value is 2000 milli second * @param password password, if any + * @param useSsl Whether SSL connection should be established, default value is false * @param database database index * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8 * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8 @@ -50,10 +50,10 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false * @throws NullPointerException if parameter {@code host} is {@code null} */ - private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database, + private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, boolean useSsl, int database, int maxTotal, int maxIdle, int minIdle, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) { - super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle); + super(connectionTimeout, maxTotal, maxIdle, minIdle, password, useSsl, testOnBorrow, testOnReturn, testWhileIdle); Objects.requireNonNull(host, "Host information should be presented"); this.host = host; @@ -104,6 +104,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW; private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN; private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE; + private boolean useSsl = false; /** * Sets value for the {@code maxTotal} configuration attribute @@ -235,13 +236,25 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { return this; } + + /** + * Sets value for the {@code ssl} configuration attribute. + * + * @param useSsl flag if an SSL connection should be established + * @return Builder itself + */ + public Builder setUseSsl(boolean useSsl) { + this.useSsl = useSsl; + return this; + } + /** * Builds JedisPoolConfig. * * @return JedisPoolConfig */ public FlinkJedisPoolConfig build() { - return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle); + return new FlinkJedisPoolConfig(host, port, timeout, password, useSsl, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle); } } @@ -250,6 +263,7 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { return "FlinkJedisPoolConfig{" + "host=" + host + ", port=" + port + + ", useSsl=" + useSsl + ", database=" + database + ", maxTotal=" + maxTotal + ", maxIdle=" + maxIdle + diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java index 340eb4e..a6a29ff 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java @@ -63,7 +63,7 @@ public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase { String password, int database, int maxTotal, int maxIdle, int minIdle, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) { - super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle); + super(connectionTimeout, maxTotal, maxIdle, minIdle, password, false, testOnBorrow, testOnReturn, testWhileIdle); Objects.requireNonNull(masterName, "Master name should be presented"); Objects.requireNonNull(sentinels, "Sentinels information should be presented"); diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java index 7f5af3d..6dd43a9 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java @@ -69,8 +69,11 @@ public class RedisCommandsContainerBuilder { GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig); JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), - jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), - jedisPoolConfig.getDatabase()); + jedisPoolConfig.getPort(), + jedisPoolConfig.getConnectionTimeout(), + jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase(), + jedisPoolConfig.getUseSsl()); return new RedisContainer(jedisPool); } @@ -93,7 +96,7 @@ public class RedisCommandsContainerBuilder { jedisClusterConfig.getPassword(), DEFAULT_CLIENT_NAME, genericObjectPoolConfig, - jedisClusterConfig.getSsl()); + jedisClusterConfig.getUseSsl()); return new RedisClusterContainer(jedisCluster); } @@ -109,6 +112,10 @@ public class RedisCommandsContainerBuilder { GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig); + if (jedisSentinelConfig.getUseSsl()) { + throw new RuntimeException("JedisSentinelPool does not support SSL connections yet."); + } + JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java index 80189df..54984d5 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java @@ -44,7 +44,7 @@ public class FlinkJedisConfigBaseTest extends TestLogger { private class TestConfig extends FlinkJedisConfigBase { protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) { - super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", testOnBorrow, testOnReturn, testWhileIdle); + super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", false, testOnBorrow, testOnReturn, testWhileIdle); } } } diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java index 76208b9..7cefc1f 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java @@ -94,9 +94,9 @@ public class JedisClusterConfigTest extends TestLogger { .setMaxTotal(0) .setTimeout(0) .setNodes(set) - .setSsl(true) + .setUseSsl(true) .build(); - assertTrue(clusterConfig.getSsl()); + assertTrue(clusterConfig.getUseSsl()); } @Test @@ -111,7 +111,7 @@ public class JedisClusterConfigTest extends TestLogger { .setTimeout(0) .setNodes(set) .build(); - assertFalse(clusterConfig.getSsl()); + assertFalse(clusterConfig.getUseSsl()); } }
