Repository: nifi
Updated Branches:
  refs/heads/master d75ba167c -> 06d45c3a6


NIFI-4987: Added TTL to RedisDistributedMapCacheClientService

NIFI-4987: PR Review Fixes - Reverted getAndPutIfAbsent and added TTL setting 
with a different approach

NIFI-4987: PR Review Fixes - Added TTL to putIfAbsent()

This closes #2726.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/06d45c3a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/06d45c3a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/06d45c3a

Branch: refs/heads/master
Commit: 06d45c3a6ea30c5dc51d5f320d423dafc10271f8
Parents: d75ba16
Author: zenfenan <[email protected]>
Authored: Sun May 20 19:22:26 2018 +0530
Committer: Bryan Bende <[email protected]>
Committed: Wed May 23 15:28:39 2018 -0400

----------------------------------------------------------------------
 .../RedisDistributedMapCacheClientService.java  | 35 ++++++++++++++++++--
 1 file changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/06d45c3a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
index 94b195c..604c5ef 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -29,6 +29,7 @@ import 
org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
 import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.redis.RedisConnectionPool;
 import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.redis.util.RedisAction;
@@ -36,6 +37,7 @@ import org.apache.nifi.util.Tuple;
 import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.core.Cursor;
 import org.springframework.data.redis.core.ScanOptions;
+import org.springframework.data.redis.core.types.Expiration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -44,6 +46,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 @Tags({ "redis", "distributed", "cache", "map" })
 @CapabilityDescription("An implementation of DistributedMapCacheClient that 
uses Redis as the backing cache. This service relies on " +
@@ -59,14 +62,25 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
             .required(true)
             .build();
 
+    public static final PropertyDescriptor TTL = new 
PropertyDescriptor.Builder()
+            .name("redis-cache-ttl")
+            .displayName("TTL")
+            .description("Indicates how long the data should exist in Redis. 
Setting '0 secs' would mean the data would exist forever")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .required(true)
+            .defaultValue("0 secs")
+            .build();
+
     static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
     static {
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(REDIS_CONNECTION_POOL);
+        props.add(TTL);
         PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
     }
 
     private volatile RedisConnectionPool redisConnectionPool;
+    private Long ttl;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -96,6 +110,11 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
         this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+        this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
+
+        if (ttl == 0) {
+            this.ttl = -1L;
+        }
     }
 
     @OnDisabled
@@ -107,7 +126,13 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
     public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
         return withConnection(redisConnection -> {
             final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
-            return redisConnection.setNX(kv.getKey(), kv.getValue());
+            boolean set = redisConnection.setNX(kv.getKey(), kv.getValue());
+
+            if (ttl != -1L && set) {
+                redisConnection.expire(kv.getKey(), ttl);
+            }
+
+            return set;
         });
     }
 
@@ -124,6 +149,11 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
                 redisConnection.multi();
                 redisConnection.setNX(kv.getKey(), kv.getValue());
 
+                // Set the TTL only if the key doesn't exist already
+                if (ttl != -1L && existingValue == null) {
+                    redisConnection.expire(kv.getKey(), ttl);
+                }
+
                 // execute the transaction
                 final List<Object> results = redisConnection.exec();
 
@@ -146,7 +176,6 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
         });
     }
 
-
     @Override
     public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
         return withConnection(redisConnection -> {
@@ -159,7 +188,7 @@ public class RedisDistributedMapCacheClientService extends 
AbstractControllerSer
     public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
         withConnection(redisConnection -> {
             final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
-            redisConnection.set(kv.getKey(), kv.getValue());
+            redisConnection.set(kv.getKey(), kv.getValue(), 
Expiration.seconds(ttl), null);
             return null;
         });
     }

Reply via email to