[
https://issues.apache.org/jira/browse/NIFI-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054826#comment-16054826
]
ASF GitHub Bot commented on NIFI-4061:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1918#discussion_r122837202
--- Diff:
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.util;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.util.StringUtils;
+import org.springframework.data.redis.connection.RedisClusterConfiguration;
+import
org.springframework.data.redis.connection.RedisSentinelConfiguration;
+import
org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisShardInfo;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class RedisUtils {
+
+ // These properties are shared between the connection pool controller
service and the state provider, the name
+ // is purposely set to be more human-readable since that will be
referenced in state-management.xml
+
+ public static final AllowableValue REDIS_MODE_STANDALONE = new
AllowableValue(RedisType.STANDALONE.getDisplayName(),
RedisType.STANDALONE.getDisplayName(), RedisType.STANDALONE.getDescription());
+ public static final AllowableValue REDIS_MODE_SENTINEL = new
AllowableValue(RedisType.SENTINEL.getDisplayName(),
RedisType.SENTINEL.getDisplayName(), RedisType.SENTINEL.getDescription());
+ public static final AllowableValue REDIS_MODE_CLUSTER = new
AllowableValue(RedisType.CLUSTER.getDisplayName(),
RedisType.CLUSTER.getDisplayName(), RedisType.CLUSTER.getDescription());
+
+ public static final PropertyDescriptor REDIS_MODE = new
PropertyDescriptor.Builder()
+ .name("Redis Mode")
+ .displayName("Redis Mode")
+ .description("The type of Redis being communicated with -
standalone, sentinel, or clustered.")
+ .allowableValues(REDIS_MODE_STANDALONE, REDIS_MODE_SENTINEL,
REDIS_MODE_CLUSTER)
+ .defaultValue(REDIS_MODE_STANDALONE.getValue())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor CONNECTION_STRING = new
PropertyDescriptor.Builder()
+ .name("Connection String")
+ .displayName("Connection String")
+ .description("The connection string for Redis. In a standalone
instance this value will be of the form hostname:port. " +
+ "In a sentinel instance this value will be the
comma-separated list of sentinels, such as host1:port1,host2:port2,host3:port3.
" +
+ "In a clustered instance this value will be the
comma-separated list of cluster masters, such as
host1:port,host2:port,host3:port.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor DATABASE = new
PropertyDescriptor.Builder()
+ .name("Database Index")
+ .displayName("Database Index")
+ .description("The database index to be used by connections
created from this connection pool. " +
+ "See the databases property in redis.conf, by default
databases 0-15 will be available.")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor COMMUNICATION_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Communication Timeout")
+ .displayName("Communication Timeout")
+ .description("The timeout to use when attempting to
communicate with Redis.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("10 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor CLUSTER_MAX_REDIRECTS = new
PropertyDescriptor.Builder()
+ .name("Cluster Max Redirects")
+ .displayName("Cluster Max Redirects")
+ .description("The maximum number of redirects that can be
performed when clustered.")
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("5")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor SENTINEL_MASTER = new
PropertyDescriptor.Builder()
+ .name("Sentinel Master")
+ .displayName("Sentinel Master")
+ .description("The name of the sentinel master, require when
Mode is set to Sentinel")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor PASSWORD = new
PropertyDescriptor.Builder()
+ .name("Password")
+ .displayName("Password")
+ .description("The password used to authenticate to the Redis
server. See the requirepass property in redis.conf.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .sensitive(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MAX_TOTAL = new
PropertyDescriptor.Builder()
+ .name("Pool - Max Total")
+ .displayName("Pool - Max Total")
+ .description("The maximum number of connections that can be
allocated by the pool (checked out to clients, or idle awaiting checkout). " +
+ "A negative value indicates that there is no limit.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("8")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MAX_IDLE = new
PropertyDescriptor.Builder()
+ .name("Pool - Max Idle")
+ .displayName("Pool - Max Idle")
+ .description("The maximum number of idle connections that can
be held in the pool, or a negative value if there is no limit.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("8")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MIN_IDLE = new
PropertyDescriptor.Builder()
+ .name("Pool - Min Idle")
+ .displayName("Pool - Min Idle")
+ .description("The target for the minimum number of idle
connections to maintain in the pool. If the configured value of Min Idle is " +
+ "greater than the configured value for Max Idle, then
the value of Max Idle will be used instead.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_BLOCK_WHEN_EXHAUSTED = new
PropertyDescriptor.Builder()
+ .name("Pool - Block When Exhausted")
+ .displayName("Pool - Block When Exhausted")
+ .description("Whether or not clients should block and wait
when trying to obtain a connection from the pool when the pool has no available
connections. " +
+ "Setting this to false means an error will occur
immediately when a client requests a connection and none are available.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MAX_WAIT_TIME = new
PropertyDescriptor.Builder()
+ .name("Pool - Max Wait Time")
+ .displayName("Pool - Max Wait Time")
+ .description("The amount of time to wait for an available
connection when Block When Exhausted is set to true.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("10 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_MIN_EVICTABLE_IDLE_TIME =
new PropertyDescriptor.Builder()
+ .name("Pool - Min Evictable Idle Time")
+ .displayName("Pool - Min Evictable Idle Time")
+ .description("The minimum amount of time an object may sit
idle in the pool before it is eligible for eviction.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("60 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TIME_BETWEEN_EVICTION_RUNS
= new PropertyDescriptor.Builder()
+ .name("Pool - Time Between Eviction Runs")
+ .displayName("Pool - Time Between Eviction Runs")
+ .description("The amount of time between attempting to evict
idle connections from the pool.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 seconds")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_NUM_TESTS_PER_EVICTION_RUN
= new PropertyDescriptor.Builder()
+ .name("Pool - Num Tests Per Eviction Run")
+ .displayName("Pool - Num Tests Per Eviction Run")
+ .description("The number of connections to tests per eviction
attempt. A negative value indicates to test all connections.")
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .defaultValue("-1")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_ON_CREATE = new
PropertyDescriptor.Builder()
+ .name("Pool - Test On Create")
+ .displayName("Pool - Test On Create")
+ .description("Whether or not connections should be tested upon
creation.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_ON_BORROW = new
PropertyDescriptor.Builder()
+ .name("Pool - Test On Borrow")
+ .displayName("Pool - Test On Borrow")
+ .description("Whether or not connections should be tested upon
borrowing from the pool.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_ON_RETURN = new
PropertyDescriptor.Builder()
+ .name("Pool - Test On Return")
+ .displayName("Pool - Test On Return")
+ .description("Whether or not connections should be tested upon
returning to the pool.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor POOL_TEST_WHILE_IDLE = new
PropertyDescriptor.Builder()
+ .name("Pool - Test While Idle")
+ .displayName("Pool - Test While Idle")
+ .description("Whether or not connections should be tested
while idle.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ public static final List<PropertyDescriptor>
REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(RedisUtils.REDIS_MODE);
+ props.add(RedisUtils.CONNECTION_STRING);
+ props.add(RedisUtils.DATABASE);
+ props.add(RedisUtils.COMMUNICATION_TIMEOUT);
+ props.add(RedisUtils.CLUSTER_MAX_REDIRECTS);
+ props.add(RedisUtils.SENTINEL_MASTER);
+ props.add(RedisUtils.PASSWORD);
+ props.add(RedisUtils.POOL_MAX_TOTAL);
+ props.add(RedisUtils.POOL_MAX_IDLE);
+ props.add(RedisUtils.POOL_MIN_IDLE);
+ props.add(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED);
+ props.add(RedisUtils.POOL_MAX_WAIT_TIME);
+ props.add(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME);
+ props.add(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS);
+ props.add(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN);
+ props.add(RedisUtils.POOL_TEST_ON_CREATE);
+ props.add(RedisUtils.POOL_TEST_ON_BORROW);
+ props.add(RedisUtils.POOL_TEST_ON_RETURN);
+ props.add(RedisUtils.POOL_TEST_WHILE_IDLE);
+ REDIS_CONNECTION_PROPERTY_DESCRIPTORS =
Collections.unmodifiableList(props);
+ }
+
+
+ public static JedisConnectionFactory createConnectionFactory(final
PropertyContext context, final ComponentLog logger) {
+ final String redisMode =
context.getProperty(RedisUtils.REDIS_MODE).getValue();
+ final String connectionString =
context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
+ final Integer dbIndex =
context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
+ final String password =
context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue();
+ final Integer timeout =
context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
+
+ JedisConnectionFactory connectionFactory;
+
+ if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode))
{
+ final JedisShardInfo jedisShardInfo =
createJedisShardInfo(connectionString, timeout, password);
+
+ logger.info("Connecting to Redis in standalone mode at " +
connectionString);
+ connectionFactory = new JedisConnectionFactory(jedisShardInfo);
+
+ } else if
(RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
+ final String[] sentinels = connectionString.split("[,]");
+ final String sentinelMaster =
context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
+ final RedisSentinelConfiguration sentinelConfiguration = new
RedisSentinelConfiguration(sentinelMaster, new
HashSet<>(Arrays.asList(sentinels)));
+ final JedisShardInfo jedisShardInfo =
createJedisShardInfo(sentinels[0], timeout, password);
+
+ logger.info("Connecting to Redis in sentinel mode...");
+ logger.info("Redis master = " + sentinelMaster);
+
+ for (final String sentinel : sentinels) {
+ logger.info("Redis sentinel at " + sentinel);
+ }
+
+ connectionFactory = new
JedisConnectionFactory(sentinelConfiguration, poolConfig);
+ connectionFactory.setShardInfo(jedisShardInfo);
+
+ } else {
+ final String[] clusterNodes = connectionString.split("[,]");
+ final Integer maxRedirects =
context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
+
+ final RedisClusterConfiguration clusterConfiguration = new
RedisClusterConfiguration(Arrays.asList(clusterNodes));
+ clusterConfiguration.setMaxRedirects(maxRedirects);
+
+ logger.info("Connecting to Redis in clustered mode...");
+ for (final String clusterNode : clusterNodes) {
+ logger.info("Redis cluster node at " + clusterNode);
+ }
+
+ connectionFactory = new
JedisConnectionFactory(clusterConfiguration, poolConfig);
+ }
+
+ connectionFactory.setUsePool(true);
+ connectionFactory.setPoolConfig(poolConfig);
+ connectionFactory.setDatabase(dbIndex);
+ connectionFactory.setTimeout(timeout);
+
+ if (!StringUtils.isBlank(password)) {
+ connectionFactory.setPassword(password);
+ }
+
+ // need to call this to initialize the pool/connections
+ connectionFactory.afterPropertiesSet();
+ return connectionFactory;
+ }
+
+ private static JedisShardInfo createJedisShardInfo(final String
hostAndPort, final Integer timeout, final String password) {
+ final String[] hostAndPortSplit = hostAndPort.split("[:]");
+ final String host = hostAndPortSplit[0].trim();
+ final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
+
+ final JedisShardInfo jedisShardInfo = new JedisShardInfo(host,
port);
+ jedisShardInfo.setConnectionTimeout(timeout);
+ jedisShardInfo.setSoTimeout(timeout);
+
+ if (!StringUtils.isEmpty(password)) {
+ jedisShardInfo.setPassword(password);
+ }
+
+ return jedisShardInfo;
+ }
+
+ private static JedisPoolConfig createJedisPoolConfig(final
PropertyContext context) {
+ final JedisPoolConfig poolConfig = new JedisPoolConfig();
+
poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger());
+
poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger());
+
poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger());
+
poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean());
+
poolConfig.setMaxWaitMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+
poolConfig.setMinEvictableIdleTimeMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS));
+
poolConfig.setTimeBetweenEvictionRunsMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS));
+
poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger());
+
poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean());
+
poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean());
+
poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean());
+
poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean());
+ return poolConfig;
+ }
+
+ public static Collection<ValidationResult> validate(ValidationContext
validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final String redisMode =
validationContext.getProperty(RedisUtils.REDIS_MODE).getValue();
+ final String connectionString =
validationContext.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
+ final Integer dbIndex =
validationContext.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
+
+ if (StringUtils.isBlank(connectionString)) {
+ results.add(new ValidationResult.Builder()
+ .subject(RedisUtils.CONNECTION_STRING.getDisplayName())
+ .valid(false)
+ .explanation("Connection String cannot be blank")
+ .build());
+ } else if
(RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
+ final String[] hostAndPort = connectionString.split("[:]");
+ if (hostAndPort == null || hostAndPort.length != 2 ||
!isInteger(hostAndPort[1])) {
+ results.add(new ValidationResult.Builder()
+
.subject(RedisUtils.CONNECTION_STRING.getDisplayName())
+ .input(connectionString)
+ .valid(false)
+ .explanation("Standalone Connection String must be
in the form host:port")
+ .build());
+ }
+ } else {
+ for (final String connection : connectionString.split("[,]")) {
+ final String[] hostAndPort = connection.split("[:]");
--- End diff --
I tried to make the validation and trimming consistent since I was already
trimming the values on our end.
> Implement a StateProvider backed by Redis
> -----------------------------------------
>
> Key: NIFI-4061
> URL: https://issues.apache.org/jira/browse/NIFI-4061
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
>
> We currently have only one clustered state provider which is a ZooKeeper
> implementation. Redis would make a good candidate to provide another option.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)