This is an automated email from the ASF dual-hosted git repository. eskabetxe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit ef3d18b07ca5ab2d4b653171777122ae257e747c Author: netsi <[email protected]> AuthorDate: Tue Oct 11 17:43:55 2022 +0200 [BAHIR-315] update jedis dependency to 4.3.0 --- flink-connector-redis/pom.xml | 2 +- .../common/container/RedisClusterContainer.java | 9 +------- .../container/RedisCommandsContainerBuilder.java | 14 ++++++++----- .../connectors/redis/RedisSinkITCase.java | 24 +++++++++++----------- .../streaming/connectors/redis/RedisSinkTest.java | 4 ++-- .../RedisCommandsContainerBuilderTest.java | 7 ++++--- 6 files changed, 29 insertions(+), 31 deletions(-) diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml index d7d67de..71c5eff 100644 --- a/flink-connector-redis/pom.xml +++ b/flink-connector-redis/pom.xml @@ -34,7 +34,7 @@ under the License. <name>flink-connector-redis</name> <properties> - <jedis.version>2.9.0</jedis.version> + <jedis.version>4.3.0</jedis.version> </properties> <dependencyManagement> diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java index d61716b..d2aa13e 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -47,14 +47,7 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable } @Override - public void open() throws Exception { - - // echo() tries to open a connection and echos back the - // message passed as argument. Here we use it to monitor - // if we can communicate with the cluster. - - jedisCluster.echo("Test"); - } + public void open() throws Exception {} @Override public void hset(final String key, final String hashField, final String value, final Integer ttl) { diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java index b06a6e9..0ffbbe0 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java @@ -25,6 +25,8 @@ import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisSentinelPool; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Connection; import java.util.Objects; @@ -32,6 +34,7 @@ import java.util.Objects; * The builder for {@link RedisCommandsContainer}. */ public class RedisCommandsContainerBuilder { + private static final String DEFAULT_CLIENT_NAME = "default_client"; /** * Initialize the {@link RedisCommandsContainer} based on the instance type. @@ -63,7 +66,7 @@ public class RedisCommandsContainerBuilder { public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null"); - GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig); + GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig); JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), @@ -81,13 +84,14 @@ public class RedisCommandsContainerBuilder { public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) { Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null"); - GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig); + GenericObjectPoolConfig<Connection> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig); JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(), jedisClusterConfig.getConnectionTimeout(), jedisClusterConfig.getMaxRedirections(), jedisClusterConfig.getPassword(), + DEFAULT_CLIENT_NAME, genericObjectPoolConfig); return new RedisClusterContainer(jedisCluster); } @@ -102,7 +106,7 @@ public class RedisCommandsContainerBuilder { public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); - GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig); + GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, @@ -111,8 +115,8 @@ public class RedisCommandsContainerBuilder { return new RedisContainer(jedisSentinelPool); } - public static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) { - GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new GenericObjectPoolConfig(); + public static <T> GenericObjectPoolConfig<T> getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) { + GenericObjectPoolConfig<T> genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? (GenericObjectPoolConfig<T>) new JedisPoolConfig() : new GenericObjectPoolConfig<T>(); genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle()); diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java index ee1cc7f..5858eb1 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java @@ -69,7 +69,7 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisSink); env.execute("Test Redis List Data Type"); - assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + assertEquals(NUM_ELEMENTS.longValue(), jedis.llen(REDIS_KEY)); jedis.del(REDIS_KEY); } @@ -83,7 +83,7 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisSink); env.execute("Test Redis Set Data Type"); - assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + assertEquals(NUM_ELEMENTS.longValue(), jedis.scard(REDIS_KEY)); jedis.del(REDIS_KEY); } @@ -97,8 +97,8 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisSink); env.execute("Test Redis Set Data Type With TTL"); - assertEquals(TEST_MESSAGE_LENGTH, jedis.strlen(REDIS_KEY)); - assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_KEY)); + assertEquals(TEST_MESSAGE_LENGTH.longValue(), jedis.strlen(REDIS_KEY)); + assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_KEY)); jedis.del(REDIS_KEY); } @@ -126,7 +126,7 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisZaddSink); env.execute("Test ZADD"); - assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY)); + assertEquals(NUM_ELEMENTS.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY)); RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig, new RedisAdditionalDataMapper(RedisCommand.ZREM)); @@ -134,7 +134,7 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisZremSink); env.execute("Test ZREM"); - assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY)); + assertEquals(ZERO.longValue(), jedis.zcard(REDIS_ADDITIONAL_KEY)); jedis.del(REDIS_ADDITIONAL_KEY); } @@ -148,8 +148,8 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisSink); env.execute("Test Redis Hash Data Type"); - assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY)); - assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG, jedis.ttl(REDIS_ADDITIONAL_KEY)); + assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY)); + assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY)); jedis.del(REDIS_ADDITIONAL_KEY); } @@ -163,8 +163,8 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisSink); env.execute("Test Redis Hash Data Type"); - assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY)); - assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY)); + assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY)); + assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY)); jedis.del(REDIS_ADDITIONAL_KEY); } @@ -178,8 +178,8 @@ public class RedisSinkITCase extends RedisITCaseBase { source.addSink(redisSink); env.execute("Test Redis Hash Data Type 2"); - assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY)); - assertEquals(REDIS_TTL_IN_SECS, jedis.ttl(REDIS_ADDITIONAL_KEY)); + assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY)); + assertEquals(REDIS_TTL_IN_SECS.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY)); jedis.del(REDIS_ADDITIONAL_KEY); } diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java index 0ec4cd5..338e14e 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java @@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDes import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.TestLogger; import org.junit.Test; -import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; import java.net.InetSocketAddress; import java.util.HashSet; @@ -110,7 +110,7 @@ public class RedisSinkTest extends TestLogger { Throwable t = e; int depth = 0; - while (!(t instanceof JedisConnectionException)) { + while (!(t instanceof JedisException)) { t = t.getCause(); if (t == null || depth++ == 20) { throw e; diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java index eac5ca0..2b89ec6 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java @@ -25,13 +25,14 @@ import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolC import org.apache.flink.test.util.AbstractTestBase; import org.junit.Test; import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.Jedis; public class RedisCommandsContainerBuilderTest extends AbstractTestBase { @Test public void testNotTestWhileIdle() { FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build(); - GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig); + GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig); assertFalse(genericObjectPoolConfig.getTestWhileIdle()); assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig); } @@ -39,7 +40,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase { @Test public void testTestWhileIdle() { FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build(); - GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig); + GenericObjectPoolConfig<Jedis> genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig); assertTrue(genericObjectPoolConfig.getTestWhileIdle()); assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig); @@ -49,7 +50,7 @@ public class RedisCommandsContainerBuilderTest extends AbstractTestBase { assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), jedisPoolConfig.getNumTestsPerEvictionRun()); } - private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig genericObjectPoolConfig) { + private <T> void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig<T> genericObjectPoolConfig) { assertEquals(genericObjectPoolConfig.getMaxIdle(), flinkJedisPoolConfig.getMaxIdle()); assertEquals(genericObjectPoolConfig.getMinIdle(), flinkJedisPoolConfig.getMinIdle()); assertEquals(genericObjectPoolConfig.getMaxTotal(), flinkJedisPoolConfig.getMaxTotal());
