This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 551625f7bf NIFI-12149 Create nifi-redis-utils and minor improvements 
to util methods
551625f7bf is described below

commit 551625f7bf4662cac9ba6fb5d5f90355ee3bd705
Author: Bryan Bende <bbe...@apache.org>
AuthorDate: Fri Sep 29 10:15:22 2023 -0400

    NIFI-12149 Create nifi-redis-utils and minor improvements to util methods
    
    Signed-off-by: Joe Gresock <jgres...@gmail.com>
    This closes #7812.
---
 .../nifi-redis-extensions/pom.xml                  |   5 +
 .../redis/service/RedisConnectionPoolService.java  |   2 +-
 .../nifi/redis/state/RedisStateProvider.java       |   2 +-
 .../service/TestRedisConnectionPoolService.java    |   2 +-
 .../pom.xml                                        |  62 +------
 .../org/apache/nifi/redis/util/RedisAction.java    |   0
 .../org/apache/nifi/redis/util/RedisConfig.java    | 206 +++++++++++++++++++++
 .../org/apache/nifi/redis/util/RedisUtils.java     | 152 +++++++++++----
 nifi-nar-bundles/nifi-redis-bundle/pom.xml         |   1 +
 9 files changed, 335 insertions(+), 97 deletions(-)

diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
index 82a3b96607..bf1a5f2793 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
@@ -38,6 +38,11 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-redis-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.springframework.data</groupId>
             <artifactId>spring-data-redis</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
index 70e5ca7f5f..abaa47c19f 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
@@ -90,7 +90,7 @@ RedisConnectionPoolService extends AbstractControllerService 
implements RedisCon
         if (connectionFactory == null) {
             synchronized (this) {
                 if (connectionFactory == null) {
-                    connectionFactory = 
RedisUtils.createConnectionFactory(context, getLogger(), sslContext);
+                    connectionFactory = 
RedisUtils.createConnectionFactory(context, sslContext);
                 }
             }
         }
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index b684a77d5d..4792523eef 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -344,7 +344,7 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
     // visible for testing
     synchronized RedisConnection getRedis() {
         if (connectionFactory == null) {
-            connectionFactory = RedisUtils.createConnectionFactory(context, 
logger, sslContext);
+            connectionFactory = RedisUtils.createConnectionFactory(context, 
sslContext);
         }
 
         return connectionFactory.getConnection();
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java
index 5ec27d1996..c77e12e4fa 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java
@@ -126,7 +126,7 @@ public class TestRedisConnectionPoolService {
             final SSLContextService sslContextService = 
configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
             providedSslContext = sslContextService.createContext();
         }
-        JedisConnectionFactory connectionFactory = 
RedisUtils.createConnectionFactory(configContext, testRunner.getLogger(), 
providedSslContext);
+        JedisConnectionFactory connectionFactory = 
RedisUtils.createConnectionFactory(configContext, providedSslContext);
         return connectionFactory;
     }
 
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml
similarity index 58%
copy from nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
copy to nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml
index 82a3b96607..cd3e1ee2f8 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml
@@ -22,15 +22,13 @@
         <version>2.0.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>nifi-redis-extensions</artifactId>
+    <artifactId>nifi-redis-utils</artifactId>
     <packaging>jar</packaging>
 
     <dependencies>
-        <!-- Provided deps from nifi-redis-service-api -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
-            <scope>provided</scope>
+            <artifactId>nifi-api</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -38,74 +36,28 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.springframework.data</groupId>
             <artifactId>spring-data-redis</artifactId>
             <version>${spring.data.redis.version}</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-beans</artifactId>
-        </dependency>
         <dependency>
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
             <version>${jedis.version}</version>
             <scope>provided</scope>
         </dependency>
-        <!-- End Provided deps from nifi-redis-service-api -->
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-api</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-record-serialization-service-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-record</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-record-path</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-ssl-context-service-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <!-- Test dependencies -->
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock-record-utils</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>testcontainers</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>junit-jupiter</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisAction.java
similarity index 100%
rename from 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java
rename to 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisAction.java
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java
new file mode 100644
index 0000000000..c5da310093
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.util;
+
+import org.apache.nifi.redis.RedisType;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public class RedisConfig {
+
+    private final RedisType redisMode;
+    private final String connectionString;
+
+    private String sentinelMaster;
+    private String sentinelPassword;
+    private String password;
+
+    private int dbIndex = 0;
+    private int timeout = 10000;
+    private int clusterMaxRedirects = 5;
+
+    private int poolMaxTotal = 8;
+    private int poolMaxIdle = 8;
+    private int poolMinIdle = 0;
+    private boolean blockWhenExhausted = true;
+    private Duration maxWaitTime = Duration.ofSeconds(10);
+    private Duration minEvictableIdleTime = Duration.ofSeconds(60);
+    private Duration timeBetweenEvictionRuns = Duration.ofSeconds(30);
+    private int numTestsPerEvictionRun = -1;
+    private boolean testOnCreate = true;
+    private boolean testOnBorrow = true;
+    private boolean testOnReturn = false;
+    private boolean testWhenIdle = true;
+
+    public RedisConfig(final RedisType redisMode, final String 
connectionString) {
+        this.redisMode = Objects.requireNonNull(redisMode);
+        this.connectionString = Objects.requireNonNull(connectionString);
+    }
+
+    public RedisType getRedisMode() {
+        return redisMode;
+    }
+
+    public String getConnectionString() {
+        return connectionString;
+    }
+
+    public String getSentinelMaster() {
+        return sentinelMaster;
+    }
+
+    public void setSentinelMaster(String sentinelMaster) {
+        this.sentinelMaster = sentinelMaster;
+    }
+
+    public String getSentinelPassword() {
+        return sentinelPassword;
+    }
+
+    public void setSentinelPassword(String sentinelPassword) {
+        this.sentinelPassword = sentinelPassword;
+    }
+
+    public int getDbIndex() {
+        return dbIndex;
+    }
+
+    public void setDbIndex(int dbIndex) {
+        this.dbIndex = dbIndex;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getClusterMaxRedirects() {
+        return clusterMaxRedirects;
+    }
+
+    public void setClusterMaxRedirects(int clusterMaxRedirects) {
+        this.clusterMaxRedirects = clusterMaxRedirects;
+    }
+
+    public int getPoolMaxTotal() {
+        return poolMaxTotal;
+    }
+
+    public void setPoolMaxTotal(int poolMaxTotal) {
+        this.poolMaxTotal = poolMaxTotal;
+    }
+
+    public int getPoolMaxIdle() {
+        return poolMaxIdle;
+    }
+
+    public void setPoolMaxIdle(int poolMaxIdle) {
+        this.poolMaxIdle = poolMaxIdle;
+    }
+
+    public int getPoolMinIdle() {
+        return poolMinIdle;
+    }
+
+    public void setPoolMinIdle(int poolMinIdle) {
+        this.poolMinIdle = poolMinIdle;
+    }
+
+    public boolean getBlockWhenExhausted() {
+        return blockWhenExhausted;
+    }
+
+    public void setBlockWhenExhausted(boolean blockWhenExhausted) {
+        this.blockWhenExhausted = blockWhenExhausted;
+    }
+
+    public Duration getMaxWaitTime() {
+        return maxWaitTime;
+    }
+
+    public void setMaxWaitTime(Duration maxWaitTime) {
+        this.maxWaitTime = maxWaitTime;
+    }
+
+    public Duration getMinEvictableIdleTime() {
+        return minEvictableIdleTime;
+    }
+
+    public void setMinEvictableIdleTime(Duration minEvictableIdleTime) {
+        this.minEvictableIdleTime = minEvictableIdleTime;
+    }
+
+    public Duration getTimeBetweenEvictionRuns() {
+        return timeBetweenEvictionRuns;
+    }
+
+    public void setTimeBetweenEvictionRuns(Duration timeBetweenEvictionRuns) {
+        this.timeBetweenEvictionRuns = timeBetweenEvictionRuns;
+    }
+
+    public int getNumTestsPerEvictionRun() {
+        return numTestsPerEvictionRun;
+    }
+
+    public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
+        this.numTestsPerEvictionRun = numTestsPerEvictionRun;
+    }
+
+    public boolean getTestOnCreate() {
+        return testOnCreate;
+    }
+
+    public void setTestOnCreate(boolean testOnCreate) {
+        this.testOnCreate = testOnCreate;
+    }
+
+    public boolean getTestOnBorrow() {
+        return testOnBorrow;
+    }
+
+    public void setTestOnBorrow(boolean testOnBorrow) {
+        this.testOnBorrow = testOnBorrow;
+    }
+
+    public boolean getTestOnReturn() {
+        return testOnReturn;
+    }
+
+    public void setTestOnReturn(boolean testOnReturn) {
+        this.testOnReturn = testOnReturn;
+    }
+
+    public boolean getTestWhenIdle() {
+        return testWhenIdle;
+    }
+
+    public void setTestWhenIdle(boolean testWhenIdle) {
+        this.testWhenIdle = testWhenIdle;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
similarity index 76%
rename from 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
rename to 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
index f72ba136f5..51b17e3cd0 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
@@ -16,37 +16,47 @@
  */
 package org.apache.nifi.redis.util;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLContext;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.redis.RedisConnectionPool;
 import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.data.redis.connection.RedisClusterConfiguration;
 import org.springframework.data.redis.connection.RedisConfiguration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.connection.RedisPassword;
 import org.springframework.data.redis.connection.RedisSentinelConfiguration;
 import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
 import 
org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+import org.springframework.data.redis.connection.stream.MapRecord;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 import org.springframework.lang.Nullable;
 import redis.clients.jedis.JedisPoolConfig;
 
+import javax.net.ssl.SSLContext;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 public class RedisUtils {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisUtils.class);
+
     // These properties are shared among the controller service(s) and 
processor(s) that use a RedisConnectionPool
 
     public static final PropertyDescriptor REDIS_CONNECTION_POOL = new 
PropertyDescriptor.Builder()
@@ -301,15 +311,44 @@ public class RedisUtils {
         REDIS_CONNECTION_PROPERTY_DESCRIPTORS = 
Collections.unmodifiableList(props);
     }
 
+    public static RedisConfig createRedisConfig(final PropertyContext context) 
{
+        final RedisType redisType = 
RedisType.fromDisplayName(context.getProperty(RedisUtils.REDIS_MODE).getValue());
+        final String connectString =   
context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
+
+        final RedisConfig redisConfig = new RedisConfig(redisType, 
connectString);
+        
redisConfig.setSentinelMaster(context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue());
+        
redisConfig.setDbIndex(context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger());
+        
redisConfig.setPassword(context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue());
+        
redisConfig.setSentinelPassword(context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue());
+        
redisConfig.setTimeout(context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
+        
redisConfig.setClusterMaxRedirects(context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger());
+        
redisConfig.setPoolMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger());
+        
redisConfig.setPoolMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger());
+        
redisConfig.setPoolMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger());
+        
redisConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean());
+        
redisConfig.setMaxWaitTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS)));
+        
redisConfig.setMinEvictableIdleTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS)));
+        
redisConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS)));
+        
redisConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger());
+        
redisConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean());
+        
redisConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean());
+        
redisConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean());
+        
redisConfig.setTestWhenIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean());
+        return redisConfig;
+    }
 
-    public static JedisConnectionFactory createConnectionFactory(final 
PropertyContext context, final ComponentLog logger, final SSLContext 
sslContext) {
-        final String redisMode = 
context.getProperty(RedisUtils.REDIS_MODE).getValue();
-        final String connectionString = 
context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
-        final Integer dbIndex = 
context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
-        final String password = 
context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue();
-        final String sentinelPassword = 
context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue();
-        final Integer timeout = 
context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
+    public static JedisConnectionFactory createConnectionFactory(final 
PropertyContext context, final SSLContext sslContext) {
+        return createConnectionFactory(createRedisConfig(context), sslContext);
+    }
+
+    public static JedisConnectionFactory createConnectionFactory(final 
RedisConfig redisConfig, final SSLContext sslContext) {
+        final RedisType redisMode = redisConfig.getRedisMode();
+        final String connectionString = redisConfig.getConnectionString();
+        final Integer dbIndex = redisConfig.getDbIndex();
+        final String password = redisConfig.getPassword();
+        final String sentinelPassword = redisConfig.getSentinelPassword();
+        final Integer timeout = redisConfig.getTimeout();
+        final JedisPoolConfig poolConfig = createJedisPoolConfig(redisConfig);
 
         JedisClientConfiguration.JedisClientConfigurationBuilder builder = 
JedisClientConfiguration.builder()
                 .connectTimeout(Duration.ofMillis(timeout))
@@ -328,8 +367,8 @@ public class RedisUtils {
         final JedisClientConfiguration jedisClientConfiguration = 
builder.build();
         JedisConnectionFactory connectionFactory;
 
-        if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
-            logger.info("Connecting to Redis in standalone mode at " + 
connectionString);
+        if (RedisType.STANDALONE == redisMode) {
+            LOGGER.info("Connecting to Redis in standalone mode at " + 
connectionString);
             final String[] hostAndPortSplit = connectionString.split("[:]");
             final String host = hostAndPortSplit[0].trim();
             final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
@@ -338,32 +377,32 @@ public class RedisUtils {
 
             connectionFactory = new 
JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration);
 
-        } else if 
(RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
+        } else if (RedisType.SENTINEL == redisMode) {
             final String[] sentinels = connectionString.split("[,]");
-            final String sentinelMaster = 
context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
+            final String sentinelMaster = redisConfig.getSentinelMaster();
             final RedisSentinelConfiguration sentinelConfiguration = new 
RedisSentinelConfiguration(sentinelMaster, new 
HashSet<>(getTrimmedValues(sentinels)));
             enrichRedisConfiguration(sentinelConfiguration, dbIndex, password, 
sentinelPassword);
 
-            logger.info("Connecting to Redis in sentinel mode...");
-            logger.info("Redis master = " + sentinelMaster);
+            LOGGER.info("Connecting to Redis in sentinel mode...");
+            LOGGER.info("Redis master = " + sentinelMaster);
 
             for (final String sentinel : sentinels) {
-                logger.info("Redis sentinel at " + sentinel);
+                LOGGER.info("Redis sentinel at " + sentinel);
             }
 
             connectionFactory = new 
JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration);
 
         } else {
             final String[] clusterNodes = connectionString.split("[,]");
-            final Integer maxRedirects = 
context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
+            final Integer maxRedirects = redisConfig.getClusterMaxRedirects();
 
             final RedisClusterConfiguration clusterConfiguration = new 
RedisClusterConfiguration(getTrimmedValues(clusterNodes));
             enrichRedisConfiguration(clusterConfiguration, dbIndex, password, 
sentinelPassword);
             clusterConfiguration.setMaxRedirects(maxRedirects);
 
-            logger.info("Connecting to Redis in clustered mode...");
+            LOGGER.info("Connecting to Redis in clustered mode...");
             for (final String clusterNode : clusterNodes) {
-                logger.info("Redis cluster node at " + clusterNode);
+                LOGGER.info("Redis cluster node at " + clusterNode);
             }
 
             connectionFactory = new 
JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration);
@@ -397,20 +436,20 @@ public class RedisUtils {
         }
     }
 
-    private static JedisPoolConfig createJedisPoolConfig(final PropertyContext 
context) {
+    private static JedisPoolConfig createJedisPoolConfig(final RedisConfig 
redisConfig) {
         final JedisPoolConfig poolConfig = new JedisPoolConfig();
-        
poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger());
-        
poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger());
-        
poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger());
-        
poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean());
-        
poolConfig.setMaxWait(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asDuration());
-        
poolConfig.setMinEvictableIdleTime(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asDuration());
-        
poolConfig.setTimeBetweenEvictionRuns(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asDuration());
-        
poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger());
-        
poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean());
-        
poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean());
-        
poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean());
-        
poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean());
+        poolConfig.setMaxTotal(redisConfig.getPoolMaxTotal());
+        poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle());
+        poolConfig.setMinIdle(redisConfig.getPoolMinIdle());
+        poolConfig.setBlockWhenExhausted(redisConfig.getBlockWhenExhausted());
+        poolConfig.setMaxWait(redisConfig.getMaxWaitTime());
+        
poolConfig.setMinEvictableIdleTime(redisConfig.getMinEvictableIdleTime());
+        
poolConfig.setTimeBetweenEvictionRuns(redisConfig.getTimeBetweenEvictionRuns());
+        
poolConfig.setNumTestsPerEvictionRun(redisConfig.getNumTestsPerEvictionRun());
+        poolConfig.setTestOnCreate(redisConfig.getTestOnCreate());
+        poolConfig.setTestOnBorrow(redisConfig.getTestOnBorrow());
+        poolConfig.setTestOnReturn(redisConfig.getTestOnReturn());
+        poolConfig.setTestWhileIdle(redisConfig.getTestWhenIdle());
         return poolConfig;
     }
 
@@ -482,4 +521,39 @@ public class RedisUtils {
         }
     }
 
+    public static RedisTemplate<String, String> 
createRedisTemplateForStreams(final RedisConnectionFactory connectionFactory) {
+        final 
StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder<String,
 MapRecord<String, String, byte[]>> builder =
+                
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
+                        .hashValueSerializer(RedisSerializer.byteArray());
+
+        final 
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, 
MapRecord<String, String, byte[]>> containerOptions = builder.build();
+        return createRedisTemplateForStreams(connectionFactory, 
containerOptions);
+    }
+
+    public static RedisTemplate<String, String> createRedisTemplateForStreams(
+            final RedisConnectionFactory connectionFactory,
+            final 
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, 
MapRecord<String, String, byte[]>> containerOptions) {
+        final RedisTemplate<String,String> redisTemplate = new 
RedisTemplate<>();
+        redisTemplate.setKeySerializer(containerOptions.getKeySerializer());
+        redisTemplate.setValueSerializer(containerOptions.getKeySerializer());
+        
redisTemplate.setHashKeySerializer(containerOptions.getHashKeySerializer());
+        
redisTemplate.setHashValueSerializer(containerOptions.getHashValueSerializer());
+        redisTemplate.setConnectionFactory(connectionFactory);
+        redisTemplate.afterPropertiesSet();
+        return redisTemplate;
+    }
+
+    public static RedisTemplate<String, String> 
createRedisTemplateForKeyValues(final RedisConnectionFactory connectionFactory) 
{
+        final RedisTemplate<String, String> redisTemplate = new 
StringRedisTemplate(connectionFactory);
+        redisTemplate.afterPropertiesSet();
+        return redisTemplate;
+    }
+
+    public static RedisTemplate<String, byte[]> 
createRedisTemplateForPubSub(final RedisConnectionFactory connectionFactory) {
+        final RedisTemplate<String, byte[]> redisTemplate = new 
RedisTemplate<>();
+        redisTemplate.setConnectionFactory(connectionFactory);
+        redisTemplate.setValueSerializer(RedisSerializer.byteArray());
+        redisTemplate.afterPropertiesSet();
+        return redisTemplate;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-redis-bundle/pom.xml 
b/nifi-nar-bundles/nifi-redis-bundle/pom.xml
index 06e9c45e47..62d3c833f1 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-redis-bundle/pom.xml
@@ -31,6 +31,7 @@
     </properties>
 
     <modules>
+        <module>nifi-redis-utils</module>
         <module>nifi-redis-service-api</module>
         <module>nifi-redis-service-api-nar</module>
         <module>nifi-redis-extensions</module>

Reply via email to