This is an automated email from the ASF dual-hosted git repository.
otto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8a61d5bdbf NIFI-11794 - Fix NPE + configure max attempts for Redis
State Provider (#7473)
8a61d5bdbf is described below
commit 8a61d5bdbf60a0f4cec3e9ded1488bee3f495859
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jul 14 17:51:18 2023 +0200
NIFI-11794 - Fix NPE + configure max attempts for Redis State Provider
(#7473)
Signed-off-by: Otto Fowler<[email protected]>
This closes #7473.
---
.../src/main/resources/conf/state-management.xml | 3 +++
.../apache/nifi/redis/state/RedisStateProvider.java | 20 +++++++++++++++++---
.../nifi/redis/state/TestRedisStateProvider.java | 1 +
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index b4ffa2d4b6..8689480bf5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -105,6 +105,9 @@
nifi.properties. This means that a TLS-enabled Redis
connection is only possible if the Apache NiFi instance is running in secure
mode.
If this property is false, an insecure Redis connection
will be used even if the Apache NiFi instance is secure (default false).
+ Max Attempts - Maximum number of attempts when setting/clearing
the state for a component. This number should be higher than the number of
nodes
+ in the NiFi cluster to account for the case where each
node may concurrently try to clear a state with a local scope (default is 20).
+
Pool - Max Total - The maximum number of connections that can be
allocated by the pool (checked out to clients, or idle awaiting checkout).
A negative value indicates that there is no limit.
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index 3a3c06b512..0f581a5558 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -73,12 +73,22 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final PropertyDescriptor MAX_ATTEMPTS = new
PropertyDescriptor.Builder()
+ .name("Max Attempts")
+ .displayName("Max Attempts")
+ .description("Maximum number of attempts when setting/clearing the
state for a component. This number should be higher than the number of nodes "
+ + "in the NiFi cluster to account for the case where each
node may concurrently try to clear a state with a local scope.")
+ .required(true)
+ .defaultValue("20")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
static {
final List<PropertyDescriptor> props = new
ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
props.add(KEY_PREFIX);
props.add(ENABLE_TLS);
+ props.add(MAX_ATTEMPTS);
STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
}
@@ -90,6 +100,7 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
private String identifier;
private String keyPrefix;
+ private int maxAttempts;
private ComponentLog logger;
private PropertyContext context;
private SSLContext sslContext;
@@ -113,6 +124,8 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
keyPrefix = keyPrefix + "/";
}
this.keyPrefix = keyPrefix;
+
+ this.maxAttempts = context.getProperty(MAX_ATTEMPTS).asInteger();
}
@Override
@@ -184,7 +197,7 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
int attempted = 0;
boolean updated = false;
- while (!updated && attempted < 20) {
+ while (!updated && attempted < this.maxAttempts) {
updated = replace(currStateMap, state, componentId, true);
attempted++;
}
@@ -257,7 +270,8 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
final List<Object> results = redisConnection.exec();
// if we have a result then the replace succeeded
- if (results.size() > 0) {
+ // results can be null if the transaction has been aborted
+ if (results != null && results.size() > 0) {
replaced = true;
}
@@ -270,7 +284,7 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
int attempted = 0;
boolean updated = false;
- while (!updated && attempted < 20) {
+ while (!updated && attempted < this.maxAttempts) {
final StateMap currStateMap = getState(componentId);
updated = replace(currStateMap, Collections.emptyMap(),
componentId, true);
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
index b0c5a16ef0..cac9c0acd5 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
@@ -45,6 +45,7 @@ public class TestRedisStateProvider {
// Set up mock state provider init context
Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new
MockPropertyValue("/nifi/components/"));
+
Mockito.when(context.getProperty(RedisStateProvider.MAX_ATTEMPTS)).thenReturn(new
MockPropertyValue("20"));
// Set up mock validation context
Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new
MockPropertyValue("localhost:6379"));