This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 00a6478c06 NIFI-11794 - Fix NPE + configure max attempts for Redis 
State Provider (#7473)
00a6478c06 is described below

commit 00a6478c06bc167d1c6fddd6f6ca3c25f192f49c
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jul 14 17:51:18 2023 +0200

    NIFI-11794 - Fix NPE + configure max attempts for Redis State Provider 
(#7473)
    
    Signed-off-by: Otto Fowler<[email protected]>
    
    This closes #7473.
---
 .../src/main/resources/conf/state-management.xml     |  3 +++
 .../apache/nifi/redis/state/RedisStateProvider.java  | 20 +++++++++++++++++---
 .../nifi/redis/state/TestRedisStateProvider.java     |  1 +
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index 85fb6c6dcf..c16d4fbb18 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -99,6 +99,9 @@
                     nifi.properties.  This means that a TLS-enabled Redis 
connection is only possible if the Apache NiFi instance is running in secure 
mode.
                     If this property is false, an insecure Redis connection 
will be used even if the Apache NiFi instance is secure (default false).
 
+            Max Attempts - Maximum number of attempts when setting/clearing 
the state for a component. This number should be higher than the number of 
nodes 
+                    in the NiFi cluster to account for the case where each 
node may concurrently try to clear a state with a local scope (default is 20).
+
             Pool - Max Total - The maximum number of connections that can be 
allocated by the pool (checked out to clients, or idle awaiting checkout).
                         A negative value indicates that there is no limit.
 
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index 6d552dc771..07a36a8557 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -68,17 +68,28 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
             .defaultValue("false")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
+    public static final PropertyDescriptor MAX_ATTEMPTS = new 
PropertyDescriptor.Builder()
+            .name("Max Attempts")
+            .displayName("Max Attempts")
+            .description("Maximum number of attempts when setting/clearing the 
state for a component. This number should be higher than the number of nodes "
+                    + "in the NiFi cluster to account for the case where each 
node may concurrently try to clear a state with a local scope.")
+            .required(true)
+            .defaultValue("20")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
 
     static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
     static {
         final List<PropertyDescriptor> props = new 
ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
         props.add(KEY_PREFIX);
         props.add(ENABLE_TLS);
+        props.add(MAX_ATTEMPTS);
         STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
     }
 
     private String identifier;
     private String keyPrefix;
+    private int maxAttempts;
     private ComponentLog logger;
     private PropertyContext context;
     private SSLContext sslContext;
@@ -102,6 +113,8 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
             keyPrefix = keyPrefix + "/";
         }
         this.keyPrefix = keyPrefix;
+
+        this.maxAttempts = context.getProperty(MAX_ATTEMPTS).asInteger();
     }
 
     @Override
@@ -173,7 +186,7 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
         int attempted = 0;
         boolean updated = false;
 
-        while (!updated && attempted < 20) {
+        while (!updated && attempted < this.maxAttempts) {
             updated = replace(currStateMap, state, componentId, true);
             attempted++;
         }
@@ -245,7 +258,8 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
             final List<Object> results = redisConnection.exec();
 
             // if we have a result then the replace succeeded
-            if (results.size() > 0) {
+            // results can be null if the transaction has been aborted
+            if (results != null && results.size() > 0) {
                 replaced = true;
             }
 
@@ -258,7 +272,7 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
         int attempted = 0;
         boolean updated = false;
 
-        while (!updated && attempted < 20) {
+        while (!updated && attempted < this.maxAttempts) {
             final StateMap currStateMap = getState(componentId);
             updated = replace(currStateMap, Collections.emptyMap(), 
componentId, true);
 
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
index b0c5a16ef0..cac9c0acd5 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
@@ -45,6 +45,7 @@ public class TestRedisStateProvider {
 
         // Set up mock state provider init context
         
Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new 
MockPropertyValue("/nifi/components/"));
+        
Mockito.when(context.getProperty(RedisStateProvider.MAX_ATTEMPTS)).thenReturn(new
 MockPropertyValue("20"));
 
         // Set up mock validation context
         
Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new
 MockPropertyValue("localhost:6379"));

Reply via email to