This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b5c7e5d  [BAHIR-247] Provide connection validation/idle testing for 
Flink-Redis Connector (#121)
b5c7e5d is described below

commit b5c7e5d5061726df7667f29764ed01dfeda99861
Author: yiksanchan <[email protected]>
AuthorDate: Wed Apr 7 04:24:01 2021 -0700

    [BAHIR-247] Provide connection validation/idle testing for Flink-Redis 
Connector (#121)
---
 .../common/config/FlinkJedisClusterConfig.java     | 71 ++++++++++++++++---
 .../redis/common/config/FlinkJedisConfigBase.java  | 46 ++++++++++++-
 .../redis/common/config/FlinkJedisPoolConfig.java  | 72 +++++++++++++++++---
 .../common/config/FlinkJedisSentinelConfig.java    | 79 ++++++++++++++++++----
 .../container/RedisCommandsContainerBuilder.java   | 28 ++++----
 .../common/config/FlinkJedisConfigBaseTest.java    | 14 ++--
 .../RedisCommandsContainerBuilderTest.java         | 60 ++++++++++++++++
 7 files changed, 314 insertions(+), 56 deletions(-)

diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index c56ac14..0840deb 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -48,12 +48,15 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the 
pool
      * @param password the password of redis cluster
+     * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned, default value is false
+     * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool, default value is false
+     * @param testWhileIdle Whether objects sitting idle in the pool will be 
validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code nodes} is {@code null}
      */
     private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int 
connectionTimeout, int maxRedirections,
-                                    int maxTotal, int maxIdle, int minIdle,
-                                    String password) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+                                    int maxTotal, int maxIdle, int minIdle, 
String password,
+                                    boolean testOnBorrow, boolean 
testOnReturn, boolean testWhileIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, 
testOnBorrow, testOnReturn, testWhileIdle);
 
         Objects.requireNonNull(nodes, "Node information should be presented");
         Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not 
be empty");
@@ -96,6 +99,9 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private boolean testOnBorrow = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+        private boolean testOnReturn = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+        private boolean testWhileIdle = 
GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
         private String password;
 
         /**
@@ -180,24 +186,67 @@ public class FlinkJedisClusterConfig extends 
FlinkJedisConfigBase {
         }
 
         /**
+         * Sets value for the {@code testOnBorrow} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned
+         * @return Builder itself
+         */
+        public Builder setTestOnBorrow(boolean testOnBorrow) {
+            this.testOnBorrow = testOnBorrow;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testOnReturn} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool
+         * @return Builder itself
+         */
+        public Builder setTestOnReturn(boolean testOnReturn) {
+            this.testOnReturn = testOnReturn;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testWhileIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * Setting this to true will also set default idle-testing parameters 
provided in Jedis
+         * @see redis.clients.jedis.JedisPoolConfig
+         *
+         * @param testWhileIdle Whether objects sitting idle in the pool will 
be validated by the idle object evictor
+         * @return Builder itself
+         */
+        public Builder setTestWhileIdle(boolean testWhileIdle) {
+            this.testWhileIdle = testWhileIdle;
+            return this;
+        }
+
+        /**
          * Builds JedisClusterConfig.
          *
          * @return JedisClusterConfig
          */
         public FlinkJedisClusterConfig build() {
-            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle, password);
+            return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, 
testOnReturn, testWhileIdle);
         }
     }
 
     @Override
     public String toString() {
         return "FlinkJedisClusterConfig{" +
-            "nodes=" + nodes +
-            ", timeout=" + connectionTimeout +
-            ", maxRedirections=" + maxRedirections +
-            ", maxTotal=" + maxTotal +
-            ", maxIdle=" + maxIdle +
-            ", minIdle=" + minIdle +
-            '}';
+          "nodes=" + nodes +
+          ", maxRedirections=" + maxRedirections +
+          ", maxTotal=" + maxTotal +
+          ", maxIdle=" + maxIdle +
+          ", minIdle=" + minIdle +
+          ", connectionTimeout=" + connectionTimeout +
+          ", password=" + password +
+          ", testOnBorrow=" + testOnBorrow +
+          ", testOnReturn=" + testOnReturn +
+          ", testWhileIdle=" + testWhileIdle +
+          '}';
     }
 }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 84b1bf2..a41b0e0 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -33,7 +33,12 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
     protected final int connectionTimeout;
     protected final String password;
 
-    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle, String password) {
+    protected final boolean testOnBorrow;
+    protected final boolean testOnReturn;
+    protected final boolean testWhileIdle;
+
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle, String password, boolean testOnBorrow, boolean 
testOnReturn, boolean testWhileIdle) {
+
         Util.checkArgument(connectionTimeout >= 0, "connection timeout can not 
be negative");
         Util.checkArgument(maxTotal >= 0, "maxTotal value can not be 
negative");
         Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -43,6 +48,9 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
         this.maxTotal = maxTotal;
         this.maxIdle = maxIdle;
         this.minIdle = minIdle;
+        this.testOnBorrow = testOnBorrow;
+        this.testOnReturn = testOnReturn;
+        this.testWhileIdle = testWhileIdle;
         this.password = password;
     }
 
@@ -99,4 +107,40 @@ public abstract class FlinkJedisConfigBase implements 
Serializable {
     public String getPassword() {
         return password;
     }
+
+    /**
+     * Get the value for the {@code testOnBorrow} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code testOnBorrow} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getTestOnBorrow()
+     */
+    public boolean getTestOnBorrow() {
+        return testOnBorrow;
+    }
+
+    /**
+     * Get the value for the {@code testOnReturn} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code testOnReturn} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getTestOnReturn()
+     */
+    public boolean getTestOnReturn() {
+        return testOnReturn;
+    }
+
+    /**
+     * Get the value for the {@code testWhileIdle} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code testWhileIdle} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getTestWhileIdle()
+     */
+    public boolean getTestWhileIdle() {
+        return testWhileIdle;
+    }
 }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 3f8fc2f..5012da1 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -45,11 +45,16 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
      * @param maxTotal the maximum number of objects that can be allocated by 
the pool, default value is 8
      * @param maxIdle the cap on the number of "idle" instances in the pool, 
default value is 8
      * @param minIdle the minimum number of idle objects to maintain in the 
pool, default value is 0
+     * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned, default value is false
+     * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool, default value is false
+     * @param testWhileIdle Whether objects sitting idle in the pool will be 
validated by the idle object evictor, default value is false
      * @throws NullPointerException if parameter {@code host} is {@code null}
      */
     private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, 
String password, int database,
-                                int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+                                 int maxTotal, int maxIdle, int minIdle,
+                                 boolean testOnBorrow, boolean testOnReturn, 
boolean testWhileIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, 
testOnBorrow, testOnReturn, testWhileIdle);
+
         Objects.requireNonNull(host, "Host information should be presented");
         this.host = host;
         this.port = port;
@@ -96,6 +101,9 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private boolean testOnBorrow = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+        private boolean testOnReturn = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+        private boolean testWhileIdle = 
GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
 
         /**
          * Sets value for the {@code maxTotal} configuration attribute
@@ -188,6 +196,44 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
             return this;
         }
 
+        /**
+         * Sets value for the {@code testOnBorrow} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned
+         * @return Builder itself
+         */
+        public Builder setTestOnBorrow(boolean testOnBorrow) {
+            this.testOnBorrow = testOnBorrow;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testOnReturn} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool
+         * @return Builder itself
+         */
+        public Builder setTestOnReturn(boolean testOnReturn) {
+            this.testOnReturn = testOnReturn;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testWhileIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * Setting this to true will also set default idle-testing parameters 
provided in Jedis
+         * @see redis.clients.jedis.JedisPoolConfig
+         *
+         * @param testWhileIdle Whether objects sitting idle in the pool will 
be validated by the idle object evictor
+         * @return Builder itself
+         */
+        public Builder setTestWhileIdle(boolean testWhileIdle) {
+            this.testWhileIdle = testWhileIdle;
+            return this;
+        }
 
         /**
          * Builds JedisPoolConfig.
@@ -195,20 +241,24 @@ public class FlinkJedisPoolConfig extends 
FlinkJedisConfigBase {
          * @return JedisPoolConfig
          */
         public FlinkJedisPoolConfig build() {
-            return new FlinkJedisPoolConfig(host, port, timeout, password, 
database, maxTotal, maxIdle, minIdle);
+            return new FlinkJedisPoolConfig(host, port, timeout, password, 
database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, 
testWhileIdle);
         }
     }
 
     @Override
     public String toString() {
         return "FlinkJedisPoolConfig{" +
-            "host='" + host + '\'' +
-            ", port=" + port +
-            ", timeout=" + connectionTimeout +
-            ", database=" + database +
-            ", maxTotal=" + maxTotal +
-            ", maxIdle=" + maxIdle +
-            ", minIdle=" + minIdle +
-            '}';
+          "host=" + host +
+          ", port=" + port +
+          ", database=" + database +
+          ", maxTotal=" + maxTotal +
+          ", maxIdle=" + maxIdle +
+          ", minIdle=" + minIdle +
+          ", connectionTimeout=" +
+          ", password=" + password +
+          ", testOnBorrow=" + testOnBorrow +
+          ", testOnReturn=" + testOnReturn +
+          ", testWhileIdle=" + testWhileIdle +
+          '}';
     }
 }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 928f5e8..340eb4e 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -52,15 +52,19 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
      * @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
      * @param maxIdle the cap on the number of "idle" instances in the pool
      * @param minIdle the minimum number of idle objects to maintain in the 
pool
-     *
+     * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned, default value is false
+     * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool, default value is false
+     * @param testWhileIdle Whether objects sitting idle in the pool will be 
validated by the idle object evictor, default value is false
      * @throws NullPointerException if {@code masterName} or {@code sentinels} 
is {@code null}
      * @throws IllegalArgumentException if {@code sentinels} are empty
      */
     private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
-                                    int connectionTimeout, int soTimeout,
-                                    String password, int database,
-                                    int maxTotal, int maxIdle, int minIdle) {
-        super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+                                     int connectionTimeout, int soTimeout,
+                                     String password, int database,
+                                     int maxTotal, int maxIdle, int minIdle,
+                                     boolean testOnBorrow, boolean 
testOnReturn, boolean testWhileIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle, password, 
testOnBorrow, testOnReturn, testWhileIdle);
+
         Objects.requireNonNull(masterName, "Master name should be presented");
         Objects.requireNonNull(sentinels, "Sentinels information should be 
presented");
         Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be 
empty");
@@ -120,6 +124,9 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
         private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
         private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
         private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+        private boolean testOnBorrow = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+        private boolean testOnReturn = 
GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+        private boolean testWhileIdle = 
GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
 
         /**
          * Sets master name of the replica set.
@@ -224,26 +231,70 @@ public class FlinkJedisSentinelConfig extends 
FlinkJedisConfigBase {
         }
 
         /**
+         * Sets value for the {@code testOnBorrow} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnBorrow Whether objects borrowed from the pool will be 
validated before being returned
+         * @return Builder itself
+         */
+        public Builder setTestOnBorrow(boolean testOnBorrow) {
+            this.testOnBorrow = testOnBorrow;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testOnReturn} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param testOnReturn Whether objects borrowed from the pool will be 
validated when they are returned to the pool
+         * @return Builder itself
+         */
+        public Builder setTestOnReturn(boolean testOnReturn) {
+            this.testOnReturn = testOnReturn;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code testWhileIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * Setting this to true will also set default idle-testing parameters 
provided in Jedis
+         * @see redis.clients.jedis.JedisPoolConfig
+         *
+         * @param testWhileIdle Whether objects sitting idle in the pool will 
be validated by the idle object evictor
+         * @return Builder itself
+         */
+        public Builder setTestWhileIdle(boolean testWhileIdle) {
+            this.testWhileIdle = testWhileIdle;
+            return this;
+        }
+
+        /**
          * Builds JedisSentinelConfig.
          *
          * @return JedisSentinelConfig
          */
         public FlinkJedisSentinelConfig build(){
             return new FlinkJedisSentinelConfig(masterName, sentinels, 
connectionTimeout, soTimeout,
-                password, database, maxTotal, maxIdle, minIdle);
+                password, database, maxTotal, maxIdle, minIdle, testOnBorrow, 
testOnReturn, testWhileIdle);
         }
     }
 
     @Override
     public String toString() {
         return "FlinkJedisSentinelConfig{" +
-            "masterName='" + masterName + '\'' +
-            ", connectionTimeout=" + connectionTimeout +
-            ", soTimeout=" + soTimeout +
-            ", database=" + database +
-            ", maxTotal=" + maxTotal +
-            ", maxIdle=" + maxIdle +
-            ", minIdle=" + minIdle +
-            '}';
+          "masterName=" + masterName +
+          ", sentinels=" + sentinels +
+          ", soTimeout=" + soTimeout +
+          ", database=" + database +
+          ", maxTotal=" + maxTotal +
+          ", maxIdle=" + maxIdle +
+          ", minIdle=" + minIdle +
+          ", connectionTimeout=" + connectionTimeout +
+          ", password=" + password +
+          ", testOnBorrow=" + testOnBorrow +
+          ", testOnReturn=" + testOnReturn +
+          ", testWhileIdle=" + testWhileIdle +
+          '}';
     }
 }
diff --git 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index bdb9fed..b06a6e9 100644
--- 
a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ 
b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolC
 import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
 import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
 import redis.clients.jedis.JedisSentinelPool;
 
 import java.util.Objects;
@@ -65,8 +66,8 @@ public class RedisCommandsContainerBuilder {
         GenericObjectPoolConfig genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisPoolConfig);
 
         JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
-            jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), 
jedisPoolConfig.getPassword(),
-            jedisPoolConfig.getDatabase());
+          jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), 
jedisPoolConfig.getPassword(),
+          jedisPoolConfig.getDatabase());
         return new RedisContainer(jedisPool);
     }
 
@@ -83,11 +84,11 @@ public class RedisCommandsContainerBuilder {
         GenericObjectPoolConfig genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisClusterConfig);
 
         JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(),
-                jedisClusterConfig.getConnectionTimeout(),
-                jedisClusterConfig.getConnectionTimeout(),
-                jedisClusterConfig.getMaxRedirections(),
-                jedisClusterConfig.getPassword(),
-                genericObjectPoolConfig);
+          jedisClusterConfig.getConnectionTimeout(),
+          jedisClusterConfig.getConnectionTimeout(),
+          jedisClusterConfig.getMaxRedirections(),
+          jedisClusterConfig.getPassword(),
+          genericObjectPoolConfig);
         return new RedisClusterContainer(jedisCluster);
     }
 
@@ -104,17 +105,20 @@ public class RedisCommandsContainerBuilder {
         GenericObjectPoolConfig genericObjectPoolConfig = 
getGenericObjectPoolConfig(jedisSentinelConfig);
 
         JedisSentinelPool jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
-            jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
-            jedisSentinelConfig.getConnectionTimeout(), 
jedisSentinelConfig.getSoTimeout(),
-            jedisSentinelConfig.getPassword(), 
jedisSentinelConfig.getDatabase());
+          jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+          jedisSentinelConfig.getConnectionTimeout(), 
jedisSentinelConfig.getSoTimeout(),
+          jedisSentinelConfig.getPassword(), 
jedisSentinelConfig.getDatabase());
         return new RedisContainer(jedisSentinelPool);
     }
 
-    private static GenericObjectPoolConfig 
getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
-        GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+    public static GenericObjectPoolConfig 
getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+        GenericObjectPoolConfig genericObjectPoolConfig = 
jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new 
GenericObjectPoolConfig();
         genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
         genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
         genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
+        genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
+        genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
+
         return genericObjectPoolConfig;
     }
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 6f519ed..80189df 100644
--- 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -23,28 +23,28 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
-        new TestConfig(-1, 0, 0, 0);
+        new TestConfig(-1, 0, 0, 0, false, false, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
-        new TestConfig(1, -1, 0, 0);
+        new TestConfig(1, -1, 0, 0, false, false, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
-        new TestConfig(0, 0, -1, 0);
+        new TestConfig(0, 0, -1, 0, false, false, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
-        new TestConfig(0, 0, 0, -1);
+        new TestConfig(0, 0, 0, -1, false, false, false);
     }
 
     private class TestConfig extends FlinkJedisConfigBase {
-
-        protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, 
int minIdle) {
-            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
+        protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, 
int minIdle,
+                             boolean testOnBorrow, boolean testOnReturn, 
boolean testWhileIdle) {
+            super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", 
testOnBorrow, testOnReturn, testWhileIdle);
         }
     }
 }
diff --git 
a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
new file mode 100644
index 0000000..eac5ca0
--- /dev/null
+++ 
b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.Test;
+import redis.clients.jedis.JedisPoolConfig;
+
+public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
+
+    @Test
+    public void testNotTestWhileIdle() {
+        FlinkJedisPoolConfig flinkJedisPoolConfig = new 
FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build();
+        GenericObjectPoolConfig genericObjectPoolConfig = 
RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        assertFalse(genericObjectPoolConfig.getTestWhileIdle());
+        assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+    }
+
+    @Test
+    public void testTestWhileIdle() {
+        FlinkJedisPoolConfig flinkJedisPoolConfig = new 
FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build();
+        GenericObjectPoolConfig genericObjectPoolConfig = 
RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+        assertTrue(genericObjectPoolConfig.getTestWhileIdle());
+        assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+
+        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+        assertEquals(genericObjectPoolConfig.getMinEvictableIdleTimeMillis(), 
jedisPoolConfig.getMinEvictableIdleTimeMillis());
+        
assertEquals(genericObjectPoolConfig.getTimeBetweenEvictionRunsMillis(), 
jedisPoolConfig.getTimeBetweenEvictionRunsMillis());
+        assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), 
jedisPoolConfig.getNumTestsPerEvictionRun());
+    }
+
+    private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, 
GenericObjectPoolConfig genericObjectPoolConfig) {
+        assertEquals(genericObjectPoolConfig.getMaxIdle(), 
flinkJedisPoolConfig.getMaxIdle());
+        assertEquals(genericObjectPoolConfig.getMinIdle(), 
flinkJedisPoolConfig.getMinIdle());
+        assertEquals(genericObjectPoolConfig.getMaxTotal(), 
flinkJedisPoolConfig.getMaxTotal());
+        assertEquals(genericObjectPoolConfig.getTestWhileIdle(), 
flinkJedisPoolConfig.getTestWhileIdle());
+        assertEquals(genericObjectPoolConfig.getTestOnBorrow(), 
flinkJedisPoolConfig.getTestOnBorrow());
+        assertEquals(genericObjectPoolConfig.getTestOnReturn(), 
flinkJedisPoolConfig.getTestOnReturn());
+    }
+}

Reply via email to