This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 00a6478c06 NIFI-11794 - Fix NPE + configure max attempts for Redis
State Provider (#7473)
00a6478c06 is described below
commit 00a6478c06bc167d1c6fddd6f6ca3c25f192f49c
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jul 14 17:51:18 2023 +0200
NIFI-11794 - Fix NPE + configure max attempts for Redis State Provider
(#7473)
Signed-off-by: Otto Fowler<[email protected]>
This closes #7473.
---
.../src/main/resources/conf/state-management.xml | 3 +++
.../apache/nifi/redis/state/RedisStateProvider.java | 20 +++++++++++++++++---
.../nifi/redis/state/TestRedisStateProvider.java | 1 +
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index 85fb6c6dcf..c16d4fbb18 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -99,6 +99,9 @@
nifi.properties. This means that a TLS-enabled Redis
connection is only possible if the Apache NiFi instance is running in secure
mode.
If this property is false, an insecure Redis connection
will be used even if the Apache NiFi instance is secure (default false).
+ Max Attempts - Maximum number of attempts when setting/clearing
the state for a component. This number should be higher than the number of
nodes
+ in the NiFi cluster to account for the case where each
node may concurrently try to clear a state with a local scope (default is 20).
+
Pool - Max Total - The maximum number of connections that can be
allocated by the pool (checked out to clients, or idle awaiting checkout).
A negative value indicates that there is no limit.
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index 6d552dc771..07a36a8557 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -68,17 +68,28 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final PropertyDescriptor MAX_ATTEMPTS = new
PropertyDescriptor.Builder()
+ .name("Max Attempts")
+ .displayName("Max Attempts")
+ .description("Maximum number of attempts when setting/clearing the
state for a component. This number should be higher than the number of nodes "
+ + "in the NiFi cluster to account for the case where each
node may concurrently try to clear a state with a local scope.")
+ .required(true)
+ .defaultValue("20")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
static {
final List<PropertyDescriptor> props = new
ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
props.add(KEY_PREFIX);
props.add(ENABLE_TLS);
+ props.add(MAX_ATTEMPTS);
STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
}
private String identifier;
private String keyPrefix;
+ private int maxAttempts;
private ComponentLog logger;
private PropertyContext context;
private SSLContext sslContext;
@@ -102,6 +113,8 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
keyPrefix = keyPrefix + "/";
}
this.keyPrefix = keyPrefix;
+
+ this.maxAttempts = context.getProperty(MAX_ATTEMPTS).asInteger();
}
@Override
@@ -173,7 +186,7 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
int attempted = 0;
boolean updated = false;
- while (!updated && attempted < 20) {
+ while (!updated && attempted < this.maxAttempts) {
updated = replace(currStateMap, state, componentId, true);
attempted++;
}
@@ -245,7 +258,8 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
final List<Object> results = redisConnection.exec();
// if we have a result then the replace succeeded
- if (results.size() > 0) {
+ // results can be null if the transaction has been aborted
+ if (results != null && results.size() > 0) {
replaced = true;
}
@@ -258,7 +272,7 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
int attempted = 0;
boolean updated = false;
- while (!updated && attempted < 20) {
+ while (!updated && attempted < this.maxAttempts) {
final StateMap currStateMap = getState(componentId);
updated = replace(currStateMap, Collections.emptyMap(),
componentId, true);
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
index b0c5a16ef0..cac9c0acd5 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java
@@ -45,6 +45,7 @@ public class TestRedisStateProvider {
// Set up mock state provider init context
Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new
MockPropertyValue("/nifi/components/"));
+
Mockito.when(context.getProperty(RedisStateProvider.MAX_ATTEMPTS)).thenReturn(new
MockPropertyValue("20"));
// Set up mock validation context
Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new
MockPropertyValue("localhost:6379"));