This is an automated email from the ASF dual-hosted git repository.

otto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a61d5bdbf NIFI-11794 - Fix NPE + configure max attempts for Redis 
State Provider (#7473)
8a61d5bdbf is described below

commit 8a61d5bdbf60a0f4cec3e9ded1488bee3f495859
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jul 14 17:51:18 2023 +0200

    NIFI-11794 - Fix NPE + configure max attempts for Redis State Provider 
(#7473)
    
    Signed-off-by: Otto Fowler<[email protected]>
    
    This closes #7473.
---
 .../src/main/resources/conf/state-management.xml     |  3 +++
 .../apache/nifi/redis/state/RedisStateProvider.java  | 20 +++++++++++++++++---
 .../nifi/redis/state/TestRedisStateProvider.java     |  1 +
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index b4ffa2d4b6..8689480bf5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -105,6 +105,9 @@
                     nifi.properties.  This means that a TLS-enabled Redis 
connection is only possible if the Apache NiFi instance is running in secure 
mode.
                     If this property is false, an insecure Redis connection 
will be used even if the Apache NiFi instance is secure (default false).
 
+            Max Attempts - Maximum number of attempts when setting/clearing 
the state for a component. This number should be higher than the number of 
nodes 
+                    in the NiFi cluster to account for the case where each 
node may concurrently try to clear a state with a local scope (default is 20).
+
             Pool - Max Total - The maximum number of connections that can be 
allocated by the pool (checked out to clients, or idle awaiting checkout).
                         A negative value indicates that there is no limit.
 
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index 3a3c06b512..0f581a5558 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -73,12 +73,22 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
             .defaultValue("false")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
+    public static final PropertyDescriptor MAX_ATTEMPTS = new 
PropertyDescriptor.Builder()
+            .name("Max Attempts")
+            .displayName("Max Attempts")
+            .description("Maximum number of attempts when setting/clearing the 
state for a component. This number should be higher than the number of nodes "
+                    + "in the NiFi cluster to account for the case where each 
node may concurrently try to clear a state with a local scope.")
+            .required(true)
+            .defaultValue("20")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
 
     static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
     static {
         final List<PropertyDescriptor> props = new 
ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
         props.add(KEY_PREFIX);
         props.add(ENABLE_TLS);
+        props.add(MAX_ATTEMPTS);
         STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
     }
 
@@ -90,6 +100,7 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
 
     private String identifier;
     private String keyPrefix;
+    private int maxAttempts;
     private ComponentLog logger;
     private PropertyContext context;
     private SSLContext sslContext;
@@ -113,6 +124,8 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
             keyPrefix = keyPrefix + "/";
         }
         this.keyPrefix = keyPrefix;
+
+        this.maxAttempts = context.getProperty(MAX_ATTEMPTS).asInteger();
     }
 
     @Override
@@ -184,7 +197,7 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
         int attempted = 0;
         boolean updated = false;
 
-        while (!updated && attempted < 20) {
+        while (!updated && attempted < this.maxAttempts) {
             updated = replace(currStateMap, state, componentId, true);
             attempted++;
         }
@@ -257,7 +270,8 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
             final List<Object> results = redisConnection.exec();
 
             // if we have a result then the replace succeeded
-            if (results.size() > 0) {
+            // results can be null if the transaction has been aborted
+            if (results != null && results.size() > 0) {
                 replaced = true;
             }
 
@@ -270,7 +284,7 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
         int attempted = 0;
         boolean updated = false;
 
-        while (!updated && attempted < 20) {
+        while (!updated && attempted < this.maxAttempts) {
             final StateMap currStateMap = getState(componentId);
             updated = replace(currStateMap, Collections.emptyMap(), 
componentId, true);
 
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
index b0c5a16ef0..cac9c0acd5 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
@@ -45,6 +45,7 @@ public class TestRedisStateProvider {
 
         // Set up mock state provider init context
         
Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new 
MockPropertyValue("/nifi/components/"));
+        
Mockito.when(context.getProperty(RedisStateProvider.MAX_ATTEMPTS)).thenReturn(new
 MockPropertyValue("20"));
 
         // Set up mock validation context
         
Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new
 MockPropertyValue("localhost:6379"));

Reply via email to