Repository: nifi Updated Branches: refs/heads/master d75ba167c -> 06d45c3a6
NIFI-4987: Added TTL to RedisDistributedMapCacheClientService NIFI-4987: PR Review Fixes - Reverted getAndPutIfAbsent and added TTL setting with a different approach NIFI-4987: PR Review Fixes - Added TTL to putIfAbsent() This closes #2726. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/06d45c3a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/06d45c3a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/06d45c3a Branch: refs/heads/master Commit: 06d45c3a6ea30c5dc51d5f320d423dafc10271f8 Parents: d75ba16 Author: zenfenan <[email protected]> Authored: Sun May 20 19:22:26 2018 +0530 Committer: Bryan Bende <[email protected]> Committed: Wed May 23 15:28:39 2018 -0400 ---------------------------------------------------------------------- .../RedisDistributedMapCacheClientService.java | 35 ++++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/06d45c3a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java index 94b195c..604c5ef 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -29,6 +29,7 @@ import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.RedisType; import org.apache.nifi.redis.util.RedisAction; @@ -36,6 +37,7 @@ import org.apache.nifi.util.Tuple; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.types.Expiration; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -44,6 +46,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; @Tags({ "redis", "distributed", "cache", "map" }) @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " + @@ -59,14 +62,25 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer .required(true) .build(); + public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("redis-cache-ttl") + .displayName("TTL") + .description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .required(true) + .defaultValue("0 secs") + .build(); + static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; static { final List<PropertyDescriptor> props = new ArrayList<>(); props.add(REDIS_CONNECTION_POOL); + props.add(TTL); PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); } private volatile RedisConnectionPool redisConnectionPool; + private Long ttl; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -96,6 +110,11 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer @OnEnabled public void onEnabled(final ConfigurationContext context) { this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); + this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS); + + if (ttl == 0) { + this.ttl = -1L; + } } @OnDisabled @@ -107,7 +126,13 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { return withConnection(redisConnection -> { final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer); - return redisConnection.setNX(kv.getKey(), kv.getValue()); + boolean set = redisConnection.setNX(kv.getKey(), kv.getValue()); + + if (ttl != -1L && set) { + redisConnection.expire(kv.getKey(), ttl); + } + + return set; }); } @@ -124,6 +149,11 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer redisConnection.multi(); redisConnection.setNX(kv.getKey(), kv.getValue()); + // Set the TTL only if the key doesn't exist already + if (ttl != -1L && existingValue == null) { + redisConnection.expire(kv.getKey(), ttl); + } + // execute the transaction final List<Object> results = redisConnection.exec(); @@ -146,7 +176,6 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer }); } - @Override public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { return withConnection(redisConnection -> { @@ -159,7 +188,7 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { withConnection(redisConnection -> { final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer); - redisConnection.set(kv.getKey(), kv.getValue()); + redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), null); return null; }); }
