Repository: nifi Updated Branches: refs/heads/master f1e03b5ed -> 84c32f913
NIFI-5830 - RedisConnectionPoolService does not work with Standalone Redis using non-localhost deployment Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #3176. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84c32f91 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84c32f91 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84c32f91 Branch: refs/heads/master Commit: 84c32f913780d585ba16960c3f23e83313e1bcab Parents: f1e03b5 Author: Alexander Bukarev <buka...@yandex.ru> Authored: Tue Dec 4 22:15:37 2018 +0300 Committer: Pierre Villard <pierre.villard...@gmail.com> Committed: Thu Dec 6 23:31:25 2018 +0100 ---------------------------------------------------------------------- .../org/apache/nifi/redis/util/RedisUtils.java | 60 ++++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/84c32f91/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java index aed823b..6489fcc 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java @@ -27,11 +27,15 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.redis.RedisType; import org.apache.nifi.util.StringUtils; import org.springframework.data.redis.connection.RedisClusterConfiguration; +import org.springframework.data.redis.connection.RedisConfiguration; +import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisSentinelConfiguration; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import redis.clients.jedis.JedisPoolConfig; -import redis.clients.jedis.JedisShardInfo; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -264,19 +268,29 @@ public class RedisUtils { final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final JedisPoolConfig poolConfig = createJedisPoolConfig(context); + final JedisClientConfiguration jedisClientConfiguration = JedisClientConfiguration.builder() + .connectTimeout(Duration.ofMillis(timeout)) + .readTimeout(Duration.ofMillis(timeout)) + .usePooling() + .poolConfig(poolConfig) + .build(); JedisConnectionFactory connectionFactory; if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) { - final JedisShardInfo jedisShardInfo = createJedisShardInfo(connectionString, timeout, password); - logger.info("Connecting to Redis in standalone mode at " + connectionString); - connectionFactory = new JedisConnectionFactory(jedisShardInfo); + final String[] hostAndPortSplit = connectionString.split("[:]"); + final String host = hostAndPortSplit[0].trim(); + final Integer port = Integer.parseInt(hostAndPortSplit[1].trim()); + final RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port); + enrichRedisConfiguration(redisStandaloneConfiguration, dbIndex, password); + + connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration); } else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) { final String[] sentinels = connectionString.split("[,]"); final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue(); final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels))); - final JedisShardInfo jedisShardInfo = createJedisShardInfo(sentinels[0], timeout, password); + enrichRedisConfiguration(sentinelConfiguration, dbIndex, password); logger.info("Connecting to Redis in sentinel mode..."); logger.info("Redis master = " + sentinelMaster); @@ -285,14 +299,14 @@ public class RedisUtils { logger.info("Redis sentinel at " + sentinel); } - connectionFactory = new JedisConnectionFactory(sentinelConfiguration, poolConfig); - connectionFactory.setShardInfo(jedisShardInfo); + connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration); } else { final String[] clusterNodes = connectionString.split("[,]"); final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger(); final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes)); + enrichRedisConfiguration(clusterConfiguration, dbIndex, password); clusterConfiguration.setMaxRedirects(maxRedirects); logger.info("Connecting to Redis in clustered mode..."); @@ -300,16 +314,7 @@ public class RedisUtils { logger.info("Redis cluster node at " + clusterNode); } - connectionFactory = new JedisConnectionFactory(clusterConfiguration, poolConfig); - } - - connectionFactory.setUsePool(true); - connectionFactory.setPoolConfig(poolConfig); - connectionFactory.setDatabase(dbIndex); - connectionFactory.setTimeout(timeout); - - if (!StringUtils.isBlank(password)) { - connectionFactory.setPassword(password); + connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration); } // need to call this to initialize the pool/connections @@ -325,20 +330,15 @@ public class RedisUtils { return trimmedValues; } - private static JedisShardInfo createJedisShardInfo(final String hostAndPort, final Integer timeout, final String password) { - final String[] hostAndPortSplit = hostAndPort.split("[:]"); - final String host = hostAndPortSplit[0].trim(); - final Integer port = Integer.parseInt(hostAndPortSplit[1].trim()); - - final JedisShardInfo jedisShardInfo = new JedisShardInfo(host, port); - jedisShardInfo.setConnectionTimeout(timeout); - jedisShardInfo.setSoTimeout(timeout); - - if (!StringUtils.isEmpty(password)) { - jedisShardInfo.setPassword(password); + private static void enrichRedisConfiguration(final RedisConfiguration redisConfiguration, + final Integer dbIndex, + final String password) { + if (redisConfiguration instanceof RedisConfiguration.WithDatabaseIndex) { + ((RedisConfiguration.WithDatabaseIndex) redisConfiguration).setDatabase(dbIndex); + } + if (redisConfiguration instanceof RedisConfiguration.WithPassword) { + ((RedisConfiguration.WithPassword) redisConfiguration).setPassword(RedisPassword.of(password)); } - - return jedisShardInfo; } private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) {