This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a74c411079 NIFI-12104 Separate a non-atomic Redis DMC implementation 
from the existing one for use in Put/Fetch DMC when Redis is clustered
a74c411079 is described below

commit a74c411079efe7d886ddae58314d1f2cee74f58e
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Sep 26 11:37:02 2023 -0400

    NIFI-12104 Separate a non-atomic Redis DMC implementation from the existing 
one for use in Put/Fetch DMC when Redis is clustered
    
    This closes #7796
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../RedisDistributedMapCacheClientService.java     | 248 +--------------------
 ...mpleRedisDistributedMapCacheClientService.java} | 134 ++---------
 .../org.apache.nifi.controller.ControllerService   |   3 +-
 3 files changed, 25 insertions(+), 360 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
index 6a36d485c5..722ca63649 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -18,63 +18,30 @@ package org.apache.nifi.redis.service;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
 import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.redis.RedisConnectionPool;
 import org.apache.nifi.redis.RedisType;
-import org.apache.nifi.redis.util.RedisAction;
-import org.apache.nifi.util.Tuple;
-import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.core.Cursor;
-import org.springframework.data.redis.core.ScanOptions;
-import org.springframework.data.redis.core.types.Expiration;
-import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
-import static org.apache.nifi.redis.util.RedisUtils.TTL;
 
 @Tags({ "redis", "distributed", "cache", "map" })
 @CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Redis as the backing cache. This service relies on " +
         "the WATCH, MULTI, and EXEC commands in Redis, which are not fully 
supported when Redis is clustered. As a result, this service " +
         "can only be used with a Redis Connection Pool that is configured for 
standalone or sentinel mode. Sentinel mode can be used to " +
         "provide high-availability configurations.")
-public class RedisDistributedMapCacheClientService extends 
AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
-
-    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
-    static {
-        final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(REDIS_CONNECTION_POOL);
-        props.add(TTL);
-        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
-    }
-
-    private volatile RedisConnectionPool redisConnectionPool;
-    private Long ttl;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTY_DESCRIPTORS;
-    }
+public class RedisDistributedMapCacheClientService extends 
SimpleRedisDistributedMapCacheClientService implements 
AtomicDistributedMapCacheClient<byte[]> {
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
@@ -96,179 +63,6 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
         return results;
     }
 
-    @OnEnabled
-    public void onEnabled(final ConfigurationContext context) {
-        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
-        this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
-
-        if (ttl == 0) {
-            this.ttl = -1L;
-        }
-    }
-
-    @OnDisabled
-    public void onDisabled() {
-        this.redisConnectionPool = null;
-    }
-
-    @Override
-    public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
-        return withConnection(redisConnection -> {
-            final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
-            boolean set = redisConnection.setNX(kv.getKey(), kv.getValue());
-
-            if (ttl != -1L && set) {
-                redisConnection.expire(kv.getKey(), ttl);
-            }
-
-            return set;
-        });
-    }
-
-    @Override
-    public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withConnection(redisConnection -> {
-            final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
-            do {
-                // start a watch on the key and retrieve the current value
-                redisConnection.watch(kv.getKey());
-                final byte[] existingValue = redisConnection.get(kv.getKey());
-
-                // start a transaction and perform the put-if-absent
-                redisConnection.multi();
-                redisConnection.setNX(kv.getKey(), kv.getValue());
-
-                // Set the TTL only if the key doesn't exist already
-                if (ttl != -1L && existingValue == null) {
-                    redisConnection.expire(kv.getKey(), ttl);
-                }
-
-                // execute the transaction
-                final List<Object> results = redisConnection.exec();
-
-                // if the results list was empty, then the transaction failed 
(i.e. key was modified after we started watching), so keep looping to retry
-                // if the results list was null, then the transaction failed
-                // if the results list has results, then the transaction 
succeeded and it should have the result of the setNX operation
-                if (results != null && results.size() > 0) {
-                    final Object firstResult = results.get(0);
-                    if (firstResult instanceof Boolean) {
-                        final Boolean absent = (Boolean) firstResult;
-                        return absent ? null : 
valueDeserializer.deserialize(existingValue);
-                    } else {
-                        // this shouldn't really happen, but just in case 
there is a non-boolean result then bounce out of the loop
-                        throw new IOException("Unexpected result from Redis 
transaction: Expected Boolean result, but got "
-                                + firstResult.getClass().getName() + " with 
value " + firstResult.toString());
-                    }
-                }
-            } while (isEnabled());
-
-            return null;
-        });
-    }
-
-    @Override
-    public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
-        return withConnection(redisConnection -> {
-            final byte[] k = serialize(key, keySerializer);
-            return redisConnection.exists(k);
-        });
-    }
-
-    @Override
-    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
-        withConnection(redisConnection -> {
-            final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
-            redisConnection.set(kv.getKey(), kv.getValue(), 
Expiration.seconds(ttl), SetOption.upsert());
-            return null;
-        });
-    }
-
-    @Override
-    public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) throws IOException {
-        withConnection(redisConnection -> {
-            Map<byte[], byte[]> values = new HashMap<>();
-            for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
-                final Tuple<byte[],byte[]> kv = serialize(entry.getKey(), 
entry.getValue(), keySerializer, valueSerializer);
-                values.put(kv.getKey(), kv.getValue());
-            }
-
-            if (getLogger().isDebugEnabled()) {
-                getLogger().debug(String.format("Queued up %d tuples to mset 
on Redis connection.", values.size()));
-            }
-
-            if (!values.isEmpty()) {
-                redisConnection.mSet(values);
-                if (ttl != -1L) {
-                    values.keySet().forEach(k -> redisConnection.expire(k, 
ttl));
-                }
-            }
-            return null;
-        });
-    }
-
-    @Override
-    public <K, V> V get(final K key, final Serializer<K> keySerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
-        return withConnection(redisConnection -> {
-            final byte[] k = serialize(key, keySerializer);
-            final byte[] v = redisConnection.get(k);
-            return v == null ? null : valueDeserializer.deserialize(v);
-        });
-    }
-
-    @Override
-    public void close() throws IOException {
-        // nothing to do
-    }
-
-    @Override
-    public <K> boolean remove(final K key, final Serializer<K> keySerializer) 
throws IOException {
-        return withConnection(redisConnection -> {
-            final byte[] k = serialize(key, keySerializer);
-            final long numRemoved = redisConnection.del(k);
-            return numRemoved > 0;
-        });
-    }
-
-    @Override
-    public long removeByPattern(final String regex) throws IOException {
-        return withConnection(redisConnection -> {
-            long deletedCount = 0;
-            final List<byte[]> batchKeys = new ArrayList<>();
-
-            // delete keys in batches of 1000 using the cursor
-            final Cursor<byte[]> cursor = 
redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
-            while (cursor.hasNext()) {
-                batchKeys.add(cursor.next());
-
-                if (batchKeys.size() == 1000) {
-                    deletedCount += redisConnection.del(getKeys(batchKeys));
-                    batchKeys.clear();
-                }
-            }
-
-            // delete any left-over keys if some were added to the batch but 
never reached 1000
-            if (batchKeys.size() > 0) {
-                deletedCount += redisConnection.del(getKeys(batchKeys));
-                batchKeys.clear();
-            }
-
-            return deletedCount;
-        });
-    }
-
-    /**
-     *  Convert the list of all keys to an array.
-     */
-    private byte[][] getKeys(final List<byte[]> keys) {
-        final byte[][] allKeysArray = new byte[keys.size()][];
-        for (int i=0; i < keys.size(); i++) {
-            allKeysArray[i] = keys.get(i);
-        }
-        return  allKeysArray;
-    }
-
-    // ----------------- Methods from AtomicDistributedMapCacheClient 
------------------------
-
     @Override
     public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final 
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws 
IOException {
         return withConnection(redisConnection -> {
@@ -315,6 +109,7 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
                 redisConnection.getSet(k, newVal);
 
                 // set the TTL if specified
+                final long ttl = getTtl();
                 if (ttl != -1L) {
                     redisConnection.expire(k, ttl);
                 }
@@ -332,43 +127,4 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
         });
     }
 
-    // ----------------- END Methods from AtomicDistributedMapCacheClient 
------------------------
-
-    private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
-        final ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-        keySerializer.serialize(key, out);
-        final byte[] k = out.toByteArray();
-
-        out.reset();
-
-        valueSerializer.serialize(value, out);
-        final byte[] v = out.toByteArray();
-
-        return new Tuple<>(k, v);
-    }
-
-    private <K> byte[] serialize(final K key, final Serializer<K> 
keySerializer) throws IOException {
-        final ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-        keySerializer.serialize(key, out);
-        return out.toByteArray();
-    }
-
-    private <T> T withConnection(final RedisAction<T> action) throws 
IOException {
-        RedisConnection redisConnection = null;
-        try {
-            redisConnection = redisConnectionPool.getConnection();
-            return action.execute(redisConnection);
-        } finally {
-           if (redisConnection != null) {
-               try {
-                   redisConnection.close();
-               } catch (Exception e) {
-                   getLogger().warn("Error closing connection: " + 
e.getMessage(), e);
-               }
-           }
-        }
-    }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java
similarity index 66%
copy from 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
copy to 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java
index 6a36d485c5..33f9340523 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java
@@ -21,29 +21,23 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
-import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.redis.RedisConnectionPool;
-import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.redis.util.RedisAction;
 import org.apache.nifi.util.Tuple;
 import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisStringCommands;
 import org.springframework.data.redis.core.Cursor;
 import org.springframework.data.redis.core.ScanOptions;
 import org.springframework.data.redis.core.types.Expiration;
-import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,11 +48,9 @@ import static 
org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
 import static org.apache.nifi.redis.util.RedisUtils.TTL;
 
 @Tags({ "redis", "distributed", "cache", "map" })
-@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Redis as the backing cache. This service relies on " +
-        "the WATCH, MULTI, and EXEC commands in Redis, which are not fully 
supported when Redis is clustered. As a result, this service " +
-        "can only be used with a Redis Connection Pool that is configured for 
standalone or sentinel mode. Sentinel mode can be used to " +
-        "provide high-availability configurations.")
-public class RedisDistributedMapCacheClientService extends 
AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
+@CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Redis as the backing cache. " +
+        "This service is intended to be used when a non-atomic 
DistributedMapCacheClient is required.")
+public class SimpleRedisDistributedMapCacheClientService extends 
AbstractControllerService implements DistributedMapCacheClient {
 
     static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
     static {
@@ -71,29 +63,13 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
     private volatile RedisConnectionPool redisConnectionPool;
     private Long ttl;
 
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return PROPERTY_DESCRIPTORS;
+    protected Long getTtl() {
+        return ttl;
     }
 
     @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        final RedisConnectionPool redisConnectionPool = 
validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
-        if (redisConnectionPool != null) {
-            final RedisType redisType = redisConnectionPool.getRedisType();
-            if (redisType != null && redisType == RedisType.CLUSTER) {
-                results.add(new ValidationResult.Builder()
-                        .subject(REDIS_CONNECTION_POOL.getDisplayName())
-                        .valid(false)
-                        .explanation(REDIS_CONNECTION_POOL.getDisplayName()
-                                + " is configured in clustered mode, and this 
service requires a non-clustered Redis")
-                        .build());
-            }
-        }
-
-        return results;
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
     }
 
     @OnEnabled
@@ -178,7 +154,7 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
     public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
         withConnection(redisConnection -> {
             final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
-            redisConnection.set(kv.getKey(), kv.getValue(), 
Expiration.seconds(ttl), SetOption.upsert());
+            redisConnection.set(kv.getKey(), kv.getValue(), 
Expiration.seconds(ttl), RedisStringCommands.SetOption.upsert());
             return null;
         });
     }
@@ -259,7 +235,7 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
     /**
      *  Convert the list of all keys to an array.
      */
-    private byte[][] getKeys(final List<byte[]> keys) {
+    protected byte[][] getKeys(final List<byte[]> keys) {
         final byte[][] allKeysArray = new byte[keys.size()][];
         for (int i=0; i < keys.size(); i++) {
             allKeysArray[i] = keys.get(i);
@@ -267,74 +243,7 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
         return  allKeysArray;
     }
 
-    // ----------------- Methods from AtomicDistributedMapCacheClient 
------------------------
-
-    @Override
-    public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final 
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws 
IOException {
-        return withConnection(redisConnection -> {
-            final byte[] k = serialize(key, keySerializer);
-
-            final byte[] v = redisConnection.get(k);
-            if (v == null) {
-                return null;
-            }
-
-            // for Redis we are going to use the raw bytes of the value as the 
revision
-            return new AtomicCacheEntry<>(key, 
valueDeserializer.deserialize(v), v);
-        });
-    }
-
-    @Override
-    public <K, V> boolean replace(final AtomicCacheEntry<K, V, byte[]> entry, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
-        return withConnection(redisConnection -> {
-            final ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-            keySerializer.serialize(entry.getKey(), out);
-            final byte[] k = out.toByteArray();
-            out.reset();
-
-            valueSerializer.serialize(entry.getValue(), out);
-            final byte[] newVal = out.toByteArray();
-
-            // the revision of the cache entry holds the value of the key from 
a previous fetch
-            final byte[] prevVal = entry.getRevision().orElse(null);
-
-            boolean replaced = false;
-
-            // start a watch on the key and retrieve the current value
-            redisConnection.watch(k);
-            final byte[] currValue = redisConnection.get(k);
-
-            // start a transaction
-            redisConnection.multi();
-
-            // compare-and-set
-            if (Arrays.equals(prevVal, currValue)) {
-                // if we use set(k, newVal) then the results list will always 
have size == 0 b/c when convertPipelineAndTxResults is set to true,
-                // status responses like "OK" are skipped over, so by using 
getSet we can rely on the results list to know if the transaction succeeded
-                redisConnection.getSet(k, newVal);
-
-                // set the TTL if specified
-                if (ttl != -1L) {
-                    redisConnection.expire(k, ttl);
-                }
-            }
-
-            // execute the transaction
-            final List<Object> results = redisConnection.exec();
-
-            // if we have a result then the replace succeeded
-            if (results != null && results.size() > 0) {
-                replaced = true;
-            }
-
-            return replaced;
-        });
-    }
-
-    // ----------------- END Methods from AtomicDistributedMapCacheClient 
------------------------
-
-    private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, 
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+    protected <K, V> Tuple<byte[],byte[]> serialize(final K key, final V 
value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) 
throws IOException {
         final ByteArrayOutputStream out = new ByteArrayOutputStream();
 
         keySerializer.serialize(key, out);
@@ -348,27 +257,26 @@ public class RedisDistributedMapCacheClientService 
extends AbstractControllerSer
         return new Tuple<>(k, v);
     }
 
-    private <K> byte[] serialize(final K key, final Serializer<K> 
keySerializer) throws IOException {
+    protected <K> byte[] serialize(final K key, final Serializer<K> 
keySerializer) throws IOException {
         final ByteArrayOutputStream out = new ByteArrayOutputStream();
 
         keySerializer.serialize(key, out);
         return out.toByteArray();
     }
 
-    private <T> T withConnection(final RedisAction<T> action) throws 
IOException {
+    protected <T> T withConnection(final RedisAction<T> action) throws 
IOException {
         RedisConnection redisConnection = null;
         try {
             redisConnection = redisConnectionPool.getConnection();
             return action.execute(redisConnection);
         } finally {
-           if (redisConnection != null) {
-               try {
-                   redisConnection.close();
-               } catch (Exception e) {
-                   getLogger().warn("Error closing connection: " + 
e.getMessage(), e);
-               }
-           }
+            if (redisConnection != null) {
+                try {
+                    redisConnection.close();
+                } catch (Exception e) {
+                    getLogger().warn("Error closing connection: " + 
e.getMessage(), e);
+                }
+            }
         }
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 5d4073fdd5..c7da33be1f 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.redis.service.RedisConnectionPoolService
-org.apache.nifi.redis.service.RedisDistributedMapCacheClientService
\ No newline at end of file
+org.apache.nifi.redis.service.RedisDistributedMapCacheClientService
+org.apache.nifi.redis.service.SimpleRedisDistributedMapCacheClientService
\ No newline at end of file

Reply via email to