This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a74c411079 NIFI-12104 Separate a non-atomic Redis DMC implementation
from the existing one for use in Put/Fetch DMC when Redis is clustered
a74c411079 is described below
commit a74c411079efe7d886ddae58314d1f2cee74f58e
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Sep 26 11:37:02 2023 -0400
NIFI-12104 Separate a non-atomic Redis DMC implementation from the existing
one for use in Put/Fetch DMC when Redis is clustered
This closes #7796
Signed-off-by: Mike Thomsen <[email protected]>
---
.../RedisDistributedMapCacheClientService.java | 248 +--------------------
...mpleRedisDistributedMapCacheClientService.java} | 134 ++---------
.../org.apache.nifi.controller.ControllerService | 3 +-
3 files changed, 25 insertions(+), 360 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
index 6a36d485c5..722ca63649 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -18,63 +18,30 @@ package org.apache.nifi.redis.service;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
-import org.apache.nifi.redis.util.RedisAction;
-import org.apache.nifi.util.Tuple;
-import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.core.Cursor;
-import org.springframework.data.redis.core.ScanOptions;
-import org.springframework.data.redis.core.types.Expiration;
-import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
-import static org.apache.nifi.redis.util.RedisUtils.TTL;
@Tags({ "redis", "distributed", "cache", "map" })
@CapabilityDescription("An implementation of DistributedMapCacheClient that
uses Redis as the backing cache. This service relies on " +
"the WATCH, MULTI, and EXEC commands in Redis, which are not fully
supported when Redis is clustered. As a result, this service " +
"can only be used with a Redis Connection Pool that is configured for
standalone or sentinel mode. Sentinel mode can be used to " +
"provide high-availability configurations.")
-public class RedisDistributedMapCacheClientService extends
AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
-
- static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
- static {
- final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(REDIS_CONNECTION_POOL);
- props.add(TTL);
- PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
- }
-
- private volatile RedisConnectionPool redisConnectionPool;
- private Long ttl;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTY_DESCRIPTORS;
- }
+public class RedisDistributedMapCacheClientService extends
SimpleRedisDistributedMapCacheClientService implements
AtomicDistributedMapCacheClient<byte[]> {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
@@ -96,179 +63,6 @@ public class RedisDistributedMapCacheClientService extends
AbstractControllerSer
return results;
}
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) {
- this.redisConnectionPool =
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
- this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
-
- if (ttl == 0) {
- this.ttl = -1L;
- }
- }
-
- @OnDisabled
- public void onDisabled() {
- this.redisConnectionPool = null;
- }
-
- @Override
- public <K, V> boolean putIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
- return withConnection(redisConnection -> {
- final Tuple<byte[],byte[]> kv = serialize(key, value,
keySerializer, valueSerializer);
- boolean set = redisConnection.setNX(kv.getKey(), kv.getValue());
-
- if (ttl != -1L && set) {
- redisConnection.expire(kv.getKey(), ttl);
- }
-
- return set;
- });
- }
-
- @Override
- public <K, V> V getAndPutIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final
Deserializer<V> valueDeserializer) throws IOException {
- return withConnection(redisConnection -> {
- final Tuple<byte[],byte[]> kv = serialize(key, value,
keySerializer, valueSerializer);
- do {
- // start a watch on the key and retrieve the current value
- redisConnection.watch(kv.getKey());
- final byte[] existingValue = redisConnection.get(kv.getKey());
-
- // start a transaction and perform the put-if-absent
- redisConnection.multi();
- redisConnection.setNX(kv.getKey(), kv.getValue());
-
- // Set the TTL only if the key doesn't exist already
- if (ttl != -1L && existingValue == null) {
- redisConnection.expire(kv.getKey(), ttl);
- }
-
- // execute the transaction
- final List<Object> results = redisConnection.exec();
-
- // if the results list was empty, then the transaction failed
(i.e. key was modified after we started watching), so keep looping to retry
- // if the results list was null, then the transaction failed
- // if the results list has results, then the transaction
succeeded and it should have the result of the setNX operation
- if (results != null && results.size() > 0) {
- final Object firstResult = results.get(0);
- if (firstResult instanceof Boolean) {
- final Boolean absent = (Boolean) firstResult;
- return absent ? null :
valueDeserializer.deserialize(existingValue);
- } else {
- // this shouldn't really happen, but just in case
there is a non-boolean result then bounce out of the loop
- throw new IOException("Unexpected result from Redis
transaction: Expected Boolean result, but got "
- + firstResult.getClass().getName() + " with
value " + firstResult.toString());
- }
- }
- } while (isEnabled());
-
- return null;
- });
- }
-
- @Override
- public <K> boolean containsKey(final K key, final Serializer<K>
keySerializer) throws IOException {
- return withConnection(redisConnection -> {
- final byte[] k = serialize(key, keySerializer);
- return redisConnection.exists(k);
- });
- }
-
- @Override
- public <K, V> void put(final K key, final V value, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer) throws IOException {
- withConnection(redisConnection -> {
- final Tuple<byte[],byte[]> kv = serialize(key, value,
keySerializer, valueSerializer);
- redisConnection.set(kv.getKey(), kv.getValue(),
Expiration.seconds(ttl), SetOption.upsert());
- return null;
- });
- }
-
- @Override
- public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K>
keySerializer, Serializer<V> valueSerializer) throws IOException {
- withConnection(redisConnection -> {
- Map<byte[], byte[]> values = new HashMap<>();
- for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
- final Tuple<byte[],byte[]> kv = serialize(entry.getKey(),
entry.getValue(), keySerializer, valueSerializer);
- values.put(kv.getKey(), kv.getValue());
- }
-
- if (getLogger().isDebugEnabled()) {
- getLogger().debug(String.format("Queued up %d tuples to mset
on Redis connection.", values.size()));
- }
-
- if (!values.isEmpty()) {
- redisConnection.mSet(values);
- if (ttl != -1L) {
- values.keySet().forEach(k -> redisConnection.expire(k,
ttl));
- }
- }
- return null;
- });
- }
-
- @Override
- public <K, V> V get(final K key, final Serializer<K> keySerializer, final
Deserializer<V> valueDeserializer) throws IOException {
- return withConnection(redisConnection -> {
- final byte[] k = serialize(key, keySerializer);
- final byte[] v = redisConnection.get(k);
- return v == null ? null : valueDeserializer.deserialize(v);
- });
- }
-
- @Override
- public void close() throws IOException {
- // nothing to do
- }
-
- @Override
- public <K> boolean remove(final K key, final Serializer<K> keySerializer)
throws IOException {
- return withConnection(redisConnection -> {
- final byte[] k = serialize(key, keySerializer);
- final long numRemoved = redisConnection.del(k);
- return numRemoved > 0;
- });
- }
-
- @Override
- public long removeByPattern(final String regex) throws IOException {
- return withConnection(redisConnection -> {
- long deletedCount = 0;
- final List<byte[]> batchKeys = new ArrayList<>();
-
- // delete keys in batches of 1000 using the cursor
- final Cursor<byte[]> cursor =
redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
- while (cursor.hasNext()) {
- batchKeys.add(cursor.next());
-
- if (batchKeys.size() == 1000) {
- deletedCount += redisConnection.del(getKeys(batchKeys));
- batchKeys.clear();
- }
- }
-
- // delete any left-over keys if some were added to the batch but
never reached 1000
- if (batchKeys.size() > 0) {
- deletedCount += redisConnection.del(getKeys(batchKeys));
- batchKeys.clear();
- }
-
- return deletedCount;
- });
- }
-
- /**
- * Convert the list of all keys to an array.
- */
- private byte[][] getKeys(final List<byte[]> keys) {
- final byte[][] allKeysArray = new byte[keys.size()][];
- for (int i=0; i < keys.size(); i++) {
- allKeysArray[i] = keys.get(i);
- }
- return allKeysArray;
- }
-
- // ----------------- Methods from AtomicDistributedMapCacheClient
------------------------
-
@Override
public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws
IOException {
return withConnection(redisConnection -> {
@@ -315,6 +109,7 @@ public class RedisDistributedMapCacheClientService extends
AbstractControllerSer
redisConnection.getSet(k, newVal);
// set the TTL if specified
+ final long ttl = getTtl();
if (ttl != -1L) {
redisConnection.expire(k, ttl);
}
@@ -332,43 +127,4 @@ public class RedisDistributedMapCacheClientService extends
AbstractControllerSer
});
}
- // ----------------- END Methods from AtomicDistributedMapCacheClient
------------------------
-
- private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- keySerializer.serialize(key, out);
- final byte[] k = out.toByteArray();
-
- out.reset();
-
- valueSerializer.serialize(value, out);
- final byte[] v = out.toByteArray();
-
- return new Tuple<>(k, v);
- }
-
- private <K> byte[] serialize(final K key, final Serializer<K>
keySerializer) throws IOException {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- keySerializer.serialize(key, out);
- return out.toByteArray();
- }
-
- private <T> T withConnection(final RedisAction<T> action) throws
IOException {
- RedisConnection redisConnection = null;
- try {
- redisConnection = redisConnectionPool.getConnection();
- return action.execute(redisConnection);
- } finally {
- if (redisConnection != null) {
- try {
- redisConnection.close();
- } catch (Exception e) {
- getLogger().warn("Error closing connection: " +
e.getMessage(), e);
- }
- }
- }
- }
-
}
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java
similarity index 66%
copy from
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
copy to
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java
index 6a36d485c5..33f9340523 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java
@@ -21,29 +21,23 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
-import
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.redis.RedisConnectionPool;
-import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.Expiration;
-import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -54,11 +48,9 @@ import static
org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.apache.nifi.redis.util.RedisUtils.TTL;
@Tags({ "redis", "distributed", "cache", "map" })
-@CapabilityDescription("An implementation of DistributedMapCacheClient that
uses Redis as the backing cache. This service relies on " +
- "the WATCH, MULTI, and EXEC commands in Redis, which are not fully
supported when Redis is clustered. As a result, this service " +
- "can only be used with a Redis Connection Pool that is configured for
standalone or sentinel mode. Sentinel mode can be used to " +
- "provide high-availability configurations.")
-public class RedisDistributedMapCacheClientService extends
AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
+@CapabilityDescription("An implementation of DistributedMapCacheClient that
uses Redis as the backing cache. " +
+ "This service is intended to be used when a non-atomic
DistributedMapCacheClient is required.")
+public class SimpleRedisDistributedMapCacheClientService extends
AbstractControllerService implements DistributedMapCacheClient {
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static {
@@ -71,29 +63,13 @@ public class RedisDistributedMapCacheClientService extends
AbstractControllerSer
private volatile RedisConnectionPool redisConnectionPool;
private Long ttl;
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTY_DESCRIPTORS;
+ protected Long getTtl() {
+ return ttl;
}
@Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- final List<ValidationResult> results = new ArrayList<>();
-
- final RedisConnectionPool redisConnectionPool =
validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
- if (redisConnectionPool != null) {
- final RedisType redisType = redisConnectionPool.getRedisType();
- if (redisType != null && redisType == RedisType.CLUSTER) {
- results.add(new ValidationResult.Builder()
- .subject(REDIS_CONNECTION_POOL.getDisplayName())
- .valid(false)
- .explanation(REDIS_CONNECTION_POOL.getDisplayName()
- + " is configured in clustered mode, and this
service requires a non-clustered Redis")
- .build());
- }
- }
-
- return results;
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
}
@OnEnabled
@@ -178,7 +154,7 @@ public class RedisDistributedMapCacheClientService extends
AbstractControllerSer
public <K, V> void put(final K key, final V value, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value,
keySerializer, valueSerializer);
- redisConnection.set(kv.getKey(), kv.getValue(),
Expiration.seconds(ttl), SetOption.upsert());
+ redisConnection.set(kv.getKey(), kv.getValue(),
Expiration.seconds(ttl), RedisStringCommands.SetOption.upsert());
return null;
});
}
@@ -259,7 +235,7 @@ public class RedisDistributedMapCacheClientService extends
AbstractControllerSer
/**
* Convert the list of all keys to an array.
*/
- private byte[][] getKeys(final List<byte[]> keys) {
+ protected byte[][] getKeys(final List<byte[]> keys) {
final byte[][] allKeysArray = new byte[keys.size()][];
for (int i=0; i < keys.size(); i++) {
allKeysArray[i] = keys.get(i);
@@ -267,74 +243,7 @@ public class RedisDistributedMapCacheClientService extends
AbstractControllerSer
return allKeysArray;
}
- // ----------------- Methods from AtomicDistributedMapCacheClient
------------------------
-
- @Override
- public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final
Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws
IOException {
- return withConnection(redisConnection -> {
- final byte[] k = serialize(key, keySerializer);
-
- final byte[] v = redisConnection.get(k);
- if (v == null) {
- return null;
- }
-
- // for Redis we are going to use the raw bytes of the value as the
revision
- return new AtomicCacheEntry<>(key,
valueDeserializer.deserialize(v), v);
- });
- }
-
- @Override
- public <K, V> boolean replace(final AtomicCacheEntry<K, V, byte[]> entry,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
- return withConnection(redisConnection -> {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- keySerializer.serialize(entry.getKey(), out);
- final byte[] k = out.toByteArray();
- out.reset();
-
- valueSerializer.serialize(entry.getValue(), out);
- final byte[] newVal = out.toByteArray();
-
- // the revision of the cache entry holds the value of the key from
a previous fetch
- final byte[] prevVal = entry.getRevision().orElse(null);
-
- boolean replaced = false;
-
- // start a watch on the key and retrieve the current value
- redisConnection.watch(k);
- final byte[] currValue = redisConnection.get(k);
-
- // start a transaction
- redisConnection.multi();
-
- // compare-and-set
- if (Arrays.equals(prevVal, currValue)) {
- // if we use set(k, newVal) then the results list will always
have size == 0 b/c when convertPipelineAndTxResults is set to true,
- // status responses like "OK" are skipped over, so by using
getSet we can rely on the results list to know if the transaction succeeded
- redisConnection.getSet(k, newVal);
-
- // set the TTL if specified
- if (ttl != -1L) {
- redisConnection.expire(k, ttl);
- }
- }
-
- // execute the transaction
- final List<Object> results = redisConnection.exec();
-
- // if we have a result then the replace succeeded
- if (results != null && results.size() > 0) {
- replaced = true;
- }
-
- return replaced;
- });
- }
-
- // ----------------- END Methods from AtomicDistributedMapCacheClient
------------------------
-
- private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
+ protected <K, V> Tuple<byte[],byte[]> serialize(final K key, final V
value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
@@ -348,27 +257,26 @@ public class RedisDistributedMapCacheClientService
extends AbstractControllerSer
return new Tuple<>(k, v);
}
- private <K> byte[] serialize(final K key, final Serializer<K>
keySerializer) throws IOException {
+ protected <K> byte[] serialize(final K key, final Serializer<K>
keySerializer) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
return out.toByteArray();
}
- private <T> T withConnection(final RedisAction<T> action) throws
IOException {
+ protected <T> T withConnection(final RedisAction<T> action) throws
IOException {
RedisConnection redisConnection = null;
try {
redisConnection = redisConnectionPool.getConnection();
return action.execute(redisConnection);
} finally {
- if (redisConnection != null) {
- try {
- redisConnection.close();
- } catch (Exception e) {
- getLogger().warn("Error closing connection: " +
e.getMessage(), e);
- }
- }
+ if (redisConnection != null) {
+ try {
+ redisConnection.close();
+ } catch (Exception e) {
+ getLogger().warn("Error closing connection: " +
e.getMessage(), e);
+ }
+ }
}
}
-
}
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 5d4073fdd5..c7da33be1f 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.redis.service.RedisConnectionPoolService
-org.apache.nifi.redis.service.RedisDistributedMapCacheClientService
\ No newline at end of file
+org.apache.nifi.redis.service.RedisDistributedMapCacheClientService
+org.apache.nifi.redis.service.SimpleRedisDistributedMapCacheClientService
\ No newline at end of file