Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1918#discussion_r122761385
--- Diff:
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.util;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.util.StringUtils;
+import org.springframework.data.redis.connection.RedisClusterConfiguration;
+import
org.springframework.data.redis.connection.RedisSentinelConfiguration;
+import
org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisShardInfo;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class RedisUtils {
+
+ // These properties are shared between the connection pool controller
service and the state provider, the name
+ // is purposely set to be more human-readable since that will be
referenced in state-management.xml
+
+ public static final AllowableValue REDIS_MODE_STANDALONE = new
AllowableValue(RedisType.STANDALONE.getDisplayName(),
RedisType.STANDALONE.getDisplayName(), RedisType.STANDALONE.getDescription());
+ public static final AllowableValue REDIS_MODE_SENTINEL = new
AllowableValue(RedisType.SENTINEL.getDisplayName(),
RedisType.SENTINEL.getDisplayName(), RedisType.SENTINEL.getDescription());
+ public static final AllowableValue REDIS_MODE_CLUSTER = new
AllowableValue(RedisType.CLUSTER.getDisplayName(),
RedisType.CLUSTER.getDisplayName(), RedisType.CLUSTER.getDescription());
+
+ public static final PropertyDescriptor REDIS_MODE = new
PropertyDescriptor.Builder()
+ .name("Redis Mode")
+ .displayName("Redis Mode")
+ .description("The type of Redis being communicated with -
standalone, sentinel, or clustered.")
+ .allowableValues(REDIS_MODE_STANDALONE, REDIS_MODE_SENTINEL,
REDIS_MODE_CLUSTER)
+ .defaultValue(REDIS_MODE_STANDALONE.getValue())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor CONNECTION_STRING = new
PropertyDescriptor.Builder()
+ .name("Connection String")
+ .displayName("Connection String")
+ .description("The connection string for Redis. In a standalone
instance this value will be of the form hostname:port. " +
+ "In a sentinel instance this value will be the
comma-separated list of sentinels, such as host1:port1,host2:port2,host3:port3.
" +
+ "In a clustered instance this value will be the
comma-separated list of cluster masters, such as
host1:port,host2:port,host3:port.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor DATABASE = new
PropertyDescriptor.Builder()
+ .name("Database Index")
+ .displayName("Database Index")
+ .description("The database index to be used by connections
created from this connection pool. " +
+ "See the databases property in redis.conf, by default
databases 0-15 will be available.")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor COMMUNICATION_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Communication Timeout")
+ .displayName("Communication Timeout")
+ .description("The timeout to use when attempting to
communicate with Redis.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("10 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor CLUSTER_MAX_REDIRECTS = new
PropertyDescriptor.Builder()
+ .name("Cluster Max Redirects")
+ .displayName("Cluster Max Redirects")
+ .description("The maximum number of redirects that can be
performed when clustered.")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("5")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor SENTINEL_MASTER = new
PropertyDescriptor.Builder()
+ .name("Sentinel Master")
+ .displayName("Sentinel Master")
+ .description("The name of the sentinel master, require when
Mode is set to Sentinel")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor PASSWORD = new
PropertyDescriptor.Builder()
+ .name("Password")
+ .displayName("Password")
+ .description("The password used to authenticate to the Redis
server. See the requirepass property in redis.conf.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .sensitive(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MAX_TOTAL = new
PropertyDescriptor.Builder()
+ .name("Pool - Max Total")
+ .displayName("Pool - Max Total")
+ .description("The maximum number of connections that can be
allocated by the pool (checked out to clients, or idle awaiting checkout). " +
+ "A negative value indicates that there is no limit.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("8")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MAX_IDLE = new
PropertyDescriptor.Builder()
+ .name("Pool - Max Idle")
+ .displayName("Pool - Max Idle")
+ .description("The maximum number of idle connections that can
be held in the pool, or a negative value if there is no limit.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("8")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MIN_IDLE = new
PropertyDescriptor.Builder()
+ .name("Pool - Min Idle")
+ .displayName("Pool - Min Idle")
+ .description("The target for the minimum number of idle
connections to maintain in the pool. If the configured value of Min Idle is " +
+ "greater than the configured value for Max Idle, then
the value of Max Idle will be used instead.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_BLOCK_WHEN_EXHAUSTED = new
PropertyDescriptor.Builder()
+ .name("Pool - Block When Exhausted")
+ .displayName("Pool - Block When Exhausted")
+ .description("Whether or not clients should block and wait
when trying to obtain a connection from the pool when the pool has no available
connections. " +
+ "Setting this to false means an error will occur
immediately when a client requests a connection and none are available.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MAX_WAIT_TIME = new
PropertyDescriptor.Builder()
+ .name("Pool - Max Wait Time")
+ .displayName("Pool - Max Wait Time")
+ .description("The amount of time to wait for an available
connection when Block When Exhausted is set to true.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("10 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MIN_EVICTABLE_IDLE_TIME =
new PropertyDescriptor.Builder()
+ .name("Pool - Min Evictable Idle Time")
+ .displayName("Pool - Min Evictable Idle Time")
+ .description("The minimum amount of time an object may sit
idle in the pool before it is eligible for eviction.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("60 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TIME_BETWEEN_EVICTION_RUNS
= new PropertyDescriptor.Builder()
+ .name("Pool - Time Between Eviction Runs")
+ .displayName("Pool - Time Between Eviction Runs")
+ .description("The amount of time between attempting to evict
idle connections from the pool.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_NUM_TESTS_PER_EVICTION_RUN
= new PropertyDescriptor.Builder()
+ .name("Pool - Num Tests Per Eviction Run")
+ .displayName("Pool - Num Tests Per Eviction Run")
+ .description("The number of connections to tests per eviction
attempt. A negative value indicates to test all connections.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("-1")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_ON_CREATE = new
PropertyDescriptor.Builder()
+ .name("Pool - Test On Create")
+ .displayName("Pool - Test On Create")
+ .description("Whether or not connections should be tested upon
creation.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_ON_BORROW = new
PropertyDescriptor.Builder()
+ .name("Pool - Test On Borrow")
+ .displayName("Pool - Test On Borrow")
+ .description("Whether or not connections should be tested upon
borrowing from the pool.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_ON_RETURN = new
PropertyDescriptor.Builder()
+ .name("Pool - Test On Return")
+ .displayName("Pool - Test On Return")
+ .description("Whether or not connections should be tested upon
returning to the pool.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_WHILE_IDLE = new
PropertyDescriptor.Builder()
+ .name("Pool - Test While Idle")
+ .displayName("Pool - Test While Idle")
+ .description("Whether or not connections should be tested
while idle.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ public static final List<PropertyDescriptor>
REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(RedisUtils.REDIS_MODE);
+ props.add(RedisUtils.CONNECTION_STRING);
+ props.add(RedisUtils.DATABASE);
+ props.add(RedisUtils.COMMUNICATION_TIMEOUT);
+ props.add(RedisUtils.CLUSTER_MAX_REDIRECTS);
+ props.add(RedisUtils.SENTINEL_MASTER);
+ props.add(RedisUtils.PASSWORD);
+ props.add(RedisUtils.POOL_MAX_TOTAL);
+ props.add(RedisUtils.POOL_MAX_IDLE);
+ props.add(RedisUtils.POOL_MIN_IDLE);
+ props.add(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED);
+ props.add(RedisUtils.POOL_MAX_WAIT_TIME);
+ props.add(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME);
+ props.add(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS);
+ props.add(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN);
+ props.add(RedisUtils.POOL_TEST_ON_CREATE);
+ props.add(RedisUtils.POOL_TEST_ON_BORROW);
+ props.add(RedisUtils.POOL_TEST_ON_RETURN);
+ props.add(RedisUtils.POOL_TEST_WHILE_IDLE);
+ REDIS_CONNECTION_PROPERTY_DESCRIPTORS =
Collections.unmodifiableList(props);
+ }
+
+
+ public static JedisConnectionFactory createConnectionFactory(final
PropertyContext context, final ComponentLog logger) {
+ final String redisMode =
context.getProperty(RedisUtils.REDIS_MODE).getValue();
+ final String connectionString =
context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
+ final Integer dbIndex =
context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
+ final String password =
context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue();
+ final Integer timeout =
context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
+
+ JedisConnectionFactory connectionFactory;
+
+ if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode))
{
+ final JedisShardInfo jedisShardInfo =
createJedisShardInfo(connectionString, timeout, password);
+
+ logger.info("Connecting to Redis in standalone mode at " +
connectionString);
+ connectionFactory = new JedisConnectionFactory(jedisShardInfo);
+
+ } else if
(RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
+ final String[] sentinels = connectionString.split("[,]");
+ final String sentinelMaster =
context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
+ final RedisSentinelConfiguration sentinelConfiguration = new
RedisSentinelConfiguration(sentinelMaster, new
HashSet<>(Arrays.asList(sentinels)));
+ final JedisShardInfo jedisShardInfo =
createJedisShardInfo(sentinels[0], timeout, password);
+
+ logger.info("Connecting to Redis in sentinel mode...");
+ logger.info("Redis master = " + sentinelMaster);
+
+ for (final String sentinel : sentinels) {
+ logger.info("Redis sentinel at " + sentinel);
+ }
+
+ connectionFactory = new
JedisConnectionFactory(sentinelConfiguration, poolConfig);
+ connectionFactory.setShardInfo(jedisShardInfo);
+
+ } else {
+ final String[] clusterNodes = connectionString.split("[,]");
+ final Integer maxRedirects =
context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
+
+ final RedisClusterConfiguration clusterConfiguration = new
RedisClusterConfiguration(Arrays.asList(clusterNodes));
+ clusterConfiguration.setMaxRedirects(maxRedirects);
+
+ logger.info("Connecting to Redis in clustered mode...");
+ for (final String clusterNode : clusterNodes) {
+ logger.info("Redis cluster node at " + clusterNode);
+ }
+
+ connectionFactory = new
JedisConnectionFactory(clusterConfiguration, poolConfig);
+ }
+
+ connectionFactory.setUsePool(true);
+ connectionFactory.setPoolConfig(poolConfig);
+ connectionFactory.setDatabase(dbIndex);
+ connectionFactory.setTimeout(timeout);
+
+ if (!StringUtils.isBlank(password)) {
+ connectionFactory.setPassword(password);
+ }
+
+ // need to call this to initialize the pool/connections
+ connectionFactory.afterPropertiesSet();
+ return connectionFactory;
+ }
+
+ private static JedisShardInfo createJedisShardInfo(final String
hostAndPort, final Integer timeout, final String password) {
+ final String[] hostAndPortSplit = hostAndPort.split("[:]");
+ final String host = hostAndPortSplit[0].trim();
+ final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
+
+ final JedisShardInfo jedisShardInfo = new JedisShardInfo(host,
port);
+ jedisShardInfo.setConnectionTimeout(timeout);
+ jedisShardInfo.setSoTimeout(timeout);
+
+ if (!StringUtils.isEmpty(password)) {
+ jedisShardInfo.setPassword(password);
+ }
+
+ return jedisShardInfo;
+ }
+
+ private static JedisPoolConfig createJedisPoolConfig(final
PropertyContext context) {
+ final JedisPoolConfig poolConfig = new JedisPoolConfig();
+
poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger());
+
poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger());
+
poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger());
+
poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean());
+
poolConfig.setMaxWaitMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+
poolConfig.setMinEvictableIdleTimeMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+
poolConfig.setTimeBetweenEvictionRunsMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS));
+
poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger());
+
poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean());
+
poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean());
+
poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean());
+
poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean());
+ return poolConfig;
+ }
+
+ public static Collection<ValidationResult> validate(ValidationContext
validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final String redisMode =
validationContext.getProperty(RedisUtils.REDIS_MODE).getValue();
+ final String connectionString =
validationContext.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
+ final Integer dbIndex =
validationContext.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
+
+ if (StringUtils.isBlank(connectionString)) {
+ results.add(new ValidationResult.Builder()
+ .subject(RedisUtils.CONNECTION_STRING.getDisplayName())
+ .valid(false)
+ .explanation("Connection String cannot be blank")
+ .build());
+ } else if
(RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
+ final String[] hostAndPort = connectionString.split("[:]");
+ if (hostAndPort == null || hostAndPort.length != 2 ||
!isInteger(hostAndPort[1])) {
+ results.add(new ValidationResult.Builder()
+
.subject(RedisUtils.CONNECTION_STRING.getDisplayName())
+ .input(connectionString)
+ .valid(false)
+ .explanation("Standalone Connection String must be
in the form host:port")
+ .build());
+ }
+ } else {
+ for (final String connection : connectionString.split("[,]")) {
+ final String[] hostAndPort = connection.split("[:]");
--- End diff --
Does the Redis connection support trimming values, such as host1:port1,
host2:port2, ... instead of host1:port1,host2:port2 ?? If so, we should make
sure that we allow that here. If not, then that's ok
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---