[ 
https://issues.apache.org/jira/browse/NIFI-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054820#comment-16054820
 ] 

ASF GitHub Bot commented on NIFI-4061:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1918#discussion_r122836730
  
    --- Diff: 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
 ---
    @@ -0,0 +1,313 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.redis.service;
    +
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
    +import 
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import org.apache.nifi.redis.RedisConnectionPool;
    +import org.apache.nifi.redis.RedisType;
    +import org.apache.nifi.redis.util.RedisAction;
    +import org.apache.nifi.util.Tuple;
    +import org.springframework.data.redis.connection.RedisConnection;
    +import org.springframework.data.redis.core.Cursor;
    +import org.springframework.data.redis.core.ScanOptions;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +
    +@Tags({ "redis", "distributed", "cache", "map" })
    +@CapabilityDescription("An implementation of DistributedMapCacheClient 
that uses Redis as the backing cache. This service relies on " +
    +        "the WATCH, MULTI, and EXEC commands in Redis, which are not fully 
supported when Redis is clustered. As a result, this service " +
    +        "can only be used with a Redis Connection Pool that is configured 
for standalone or sentinel mode. Sentinel mode can be used to " +
    +        "provide high-availability configurations.")
    +public class RedisDistributedMapCacheClientService extends 
AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
    +
    +    public static final PropertyDescriptor REDIS_CONNECTION_POOL = new 
PropertyDescriptor.Builder()
    +            .name("redis-connection-pool")
    +            .displayName("Redis Connection Pool")
    +            .identifiesControllerService(RedisConnectionPool.class)
    +            .required(true)
    +            .build();
    +
    +    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
    +    static {
    +        final List<PropertyDescriptor> props = new ArrayList<>();
    +        props.add(REDIS_CONNECTION_POOL);
    +        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
    +    }
    +
    +    private volatile RedisConnectionPool redisConnectionPool;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTY_DESCRIPTORS;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>();
    +
    +        final RedisConnectionPool redisConnectionPool = 
validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
    +        if (redisConnectionPool != null) {
    +            final RedisType redisType = redisConnectionPool.getRedisType();
    +            if (redisType != null && redisType == RedisType.CLUSTER) {
    +                results.add(new ValidationResult.Builder()
    +                        .subject(REDIS_CONNECTION_POOL.getDisplayName())
    +                        .valid(false)
    +                        .explanation(REDIS_CONNECTION_POOL.getDisplayName()
    +                                + " is configured in clustered mode, and 
this service requires a non-clustered Redis")
    +                        .build());
    +            }
    +        }
    +
    +        return results;
    +    }
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
    +    }
    +
    +    @OnDisabled
    +    public void onDisabled() {
    +        this.redisConnectionPool = null;
    +    }
    +
    +    @Override
    +    public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
    +        return withConnection(redisConnection -> {
    +            final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
    +            return redisConnection.setNX(kv.getKey(), kv.getValue());
    +        });
    +    }
    +
    +    @Override
    +    public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final 
Deserializer<V> valueDeserializer) throws IOException {
    +        return withConnection(redisConnection -> {
    +            final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
    +            do {
    +                // start a watch on the key and retrieve the current value
    +                redisConnection.watch(kv.getKey());
    +                final byte[] existingValue = 
redisConnection.get(kv.getKey());
    +
    +                // start a transaction and perform the put-if-absent
    +                redisConnection.multi();
    +                redisConnection.setNX(kv.getKey(), kv.getValue());
    +
    +                // execute the transaction
    +                final List<Object> results = redisConnection.exec();
    +
    +                // if the results list was empty, then the transaction 
failed (i.e. key was modified after we started watching), so keep looping to 
retry
    +                // if the results list has results, then the transaction 
succeeded and it should have the result of the setNX operation
    +                if (results.size() > 0) {
    +                    final Object firstResult = results.get(0);
    +                    if (firstResult instanceof Boolean) {
    +                        final Boolean absent = (Boolean) firstResult;
    +                        return absent ? null : 
valueDeserializer.deserialize(existingValue);
    +                    } else {
    +                        // this shouldn't really happen, but just in case 
there is a non-boolean result then bounce out of the loop
    +                        throw new IOException("Unexpected result from 
Redis transaction: Expected Boolean result, but got "
    +                                + firstResult.getClass().getName() + " 
with value " + firstResult.toString());
    +                    }
    +                }
    +            } while (true);
    +        });
    +    }
    +
    +    @Override
    +    public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
    +        return withConnection(redisConnection -> {
    +            final byte[] k = serialize(key, keySerializer);
    +            return redisConnection.exists(k);
    +        });
    +    }
    +
    +    @Override
    +    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
    +        withConnection(redisConnection -> {
    +            final Tuple<byte[],byte[]> kv = serialize(key, value, 
keySerializer, valueSerializer);
    +            redisConnection.set(kv.getKey(), kv.getValue());
    +            return null;
    +        });
    +    }
    +
    +    @Override
    +    public <K, V> V get(final K key, final Serializer<K> keySerializer, 
final Deserializer<V> valueDeserializer) throws IOException {
    +        return withConnection(redisConnection -> {
    +            final byte[] k = serialize(key, keySerializer);
    +            final byte[] v = redisConnection.get(k);
    +            return valueDeserializer.deserialize(v);
    +        });
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        // nothing to do
    +    }
    +
    +    @Override
    +    public <K> boolean remove(final K key, final Serializer<K> 
keySerializer) throws IOException {
    +        return withConnection(redisConnection -> {
    +            final byte[] k = serialize(key, keySerializer);
    +            final long numRemoved = redisConnection.del(k);
    +            return numRemoved > 0;
    +        });
    +    }
    +
    +    @Override
    +    public long removeByPattern(final String regex) throws IOException {
    +        return withConnection(redisConnection -> {
    +            final List<byte[]> allKeys = new ArrayList<>();
    --- End diff --
    
    That makes sense, I hadn't thought about the heap usage... Since the cursor 
returned from scan is already paging through smaller key ranges at a time, I 
changed the logic to perform a delete for batches of 1000 and set the hint for 
the scan to return 1000 keys at a time.


> Implement a StateProvider backed by Redis
> -----------------------------------------
>
>                 Key: NIFI-4061
>                 URL: https://issues.apache.org/jira/browse/NIFI-4061
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>
> We currently have only one clustered state provider which is a ZooKeeper 
> implementation. Redis would make a good candidate to provide another option.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to