[ 
https://issues.apache.org/jira/browse/NIFI-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054823#comment-16054823
 ] 

ASF GitHub Bot commented on NIFI-4061:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1918#discussion_r122837003
  
    --- Diff: 
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
 ---
    @@ -0,0 +1,271 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.redis.state;
    +
    +import org.apache.nifi.components.AbstractConfigurableComponent;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.components.state.StateProvider;
    +import org.apache.nifi.components.state.StateProviderInitializationContext;
    +import org.apache.nifi.context.PropertyContext;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.redis.RedisType;
    +import org.apache.nifi.redis.util.RedisAction;
    +import org.apache.nifi.redis.util.RedisUtils;
    +import org.springframework.data.redis.connection.RedisConnection;
    +import 
org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A StateProvider backed by Redis.
    + */
    +public class RedisStateProvider extends AbstractConfigurableComponent 
implements StateProvider {
    +
    +    static final int ENCODING_VERSION = 1;
    +
    +    private String identifier;
    +    private PropertyContext context;
    +    private ComponentLog logger;
    +
    +    private volatile boolean enabled;
    +    private volatile JedisConnectionFactory connectionFactory;
    +
    +    private final RedisStateMapSerDe serDe = new RedisStateMapJsonSerDe();
    +
    +    @Override
    +    public final void initialize(final StateProviderInitializationContext 
context) throws IOException {
    +        this.context = context;
    +        this.identifier = context.getIdentifier();
    +        this.logger = context.getLogger();
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
    +        final List<ValidationResult> results = new 
ArrayList<>(RedisUtils.validate(validationContext));
    +
    +        final RedisType redisType = 
RedisType.fromDisplayName(validationContext.getProperty(RedisUtils.REDIS_MODE).getValue());
    +        if (redisType != null && redisType == RedisType.CLUSTER) {
    +            results.add(new ValidationResult.Builder()
    +                    .subject(RedisUtils.REDIS_MODE.getDisplayName())
    +                    .valid(false)
    +                    .explanation(RedisUtils.REDIS_MODE.getDisplayName()
    +                            + " is configured in clustered mode, and this 
service requires a non-clustered Redis")
    +                    .build());
    +        }
    +
    +        return results;
    +    }
    +
    +    @Override
    +    public String getIdentifier() {
    +        return identifier;
    +    }
    +
    +    @Override
    +    public void enable() {
    +        enabled = true;
    +    }
    +
    +    @Override
    +    public void disable() {
    +        enabled = false;
    +    }
    +
    +    @Override
    +    public boolean isEnabled() {
    +        return enabled;
    +    }
    +
    +    @Override
    +    public void shutdown() {
    +        if (connectionFactory != null) {
    +            connectionFactory.destroy();
    +            connectionFactory = null;
    +        }
    +    }
    +
    +    @Override
    +    public void setState(final Map<String, String> state, final String 
componentId) throws IOException {
    +        verifyEnabled();
    +
    +        final StateMap currStateMap = getState(componentId);
    +
    +        int attempted = 0;
    +        boolean updated = false;
    +
    +        while (!updated && attempted < 20) {
    +            updated = replace(currStateMap, state, componentId, true);
    +            attempted++;
    +        }
    +
    +        if (!updated) {
    +            throw new IOException("Unable to update state due to 
concurrent modifications");
    +        }
    +    }
    +
    +    @Override
    +    public StateMap getState(final String componentId) throws IOException {
    +        return withConnection(redisConnection -> {
    +            final byte[] key = 
getComponentPath(componentId).getBytes(StandardCharsets.UTF_8);
    +            final byte[] value = redisConnection.get(key);
    +
    +            final RedisStateMap stateMap = serDe.deserialize(value);
    +            if (stateMap == null) {
    +                return new 
RedisStateMap.Builder().encodingVersion(ENCODING_VERSION).build();
    +            } else {
    +                return stateMap;
    +            }
    +        });
    +    }
    +
    +    @Override
    +    public boolean replace(final StateMap oldValue, final Map<String, 
String> newValue, final String componentId) throws IOException {
    +        return replace(oldValue, newValue, componentId, false);
    +    }
    +
    +    private boolean replace(final StateMap oldValue, final Map<String, 
String> newValue, final String componentId, final boolean allowReplaceMissing) 
throws IOException {
    +        return withConnection(redisConnection -> {
    +
    +            boolean replaced = false;
    +
    +            // start a watch on the key and retrieve the current value
    +            final byte[] key = 
getComponentPath(componentId).getBytes(StandardCharsets.UTF_8);
    +            redisConnection.watch(key);
    +
    +            final long prevVersion = oldValue == null ? -1L : 
oldValue.getVersion();
    +
    +            final byte[] currValue = redisConnection.get(key);
    +            final RedisStateMap currStateMap = 
serDe.deserialize(currValue);
    +            final long currVersion = currStateMap == null ? -1L : 
currStateMap.getVersion();
    +
    +            // the replace API expects that you can't call replace on a 
non-existing value, so unwatch and return
    +            if (!allowReplaceMissing && currVersion == -1) {
    +                redisConnection.unwatch();
    +                return false;
    +            }
    +
    +            // start a transaction
    +            redisConnection.multi();
    +
    +            // compare-and-set
    +            if (prevVersion == currVersion) {
    +                // build the new RedisStateMap incrementing the version, 
using latest encoding, and using the passed in values
    +                final RedisStateMap newStateMap = new 
RedisStateMap.Builder()
    +                        .version(currVersion + 1)
    +                        .encodingVersion(ENCODING_VERSION)
    +                        .stateValues(newValue)
    +                        .build();
    +
    +                // if we use set(k, newVal) then the results list will 
always have size == 0 b/c when convertPipelineAndTxResults is set to true,
    +                // status responses like "OK" are skipped over, so by 
using getSet we can rely on the results list to know if the transaction 
succeeded
    +                redisConnection.getSet(key, serDe.serialize(newStateMap));
    +            }
    +
    +            // execute the transaction
    +            final List<Object> results = redisConnection.exec();
    +
    +            // if we have a result then the replace succeeded
    +            if (results.size() > 0) {
    +                replaced = true;
    +            }
    +
    +            return replaced;
    +        });
    +    }
    +
    +    @Override
    +    public void clear(final String componentId) throws IOException {
    +        final StateMap currStateMap = getState(componentId);
    +
    +        int attempted = 0;
    +        boolean updated = false;
    +
    +        while (!updated && attempted < 20) {
    +            updated = replace(currStateMap, Collections.emptyMap(), 
componentId, true);
    +            attempted++;
    +        }
    +
    +        if (!updated) {
    +            throw new IOException("Unable to update state due to 
concurrent modifications");
    +        }
    +    }
    +
    +    @Override
    +    public void onComponentRemoved(final String componentId) throws 
IOException {
    +        withConnection(redisConnection -> {
    +            final byte[] key = 
getComponentPath(componentId).getBytes(StandardCharsets.UTF_8);
    +            redisConnection.del(key);
    +            return true;
    +        });
    +    }
    +
    +    @Override
    +    public Scope[] getSupportedScopes() {
    +        return new Scope[] {Scope.CLUSTER};
    +    }
    +
    +    private String getComponentPath(final String componentId) {
    --- End diff --
    
    I like that idea, I added a property called "Key Prefix" since with Redis 
it isn't as much of a path as it is with ZooKeeper. Even without the Key Prefix 
they could still use the Database Index to segregate the keys, but I still like 
making the prefix configurable.


> Implement a StateProvider backed by Redis
> -----------------------------------------
>
>                 Key: NIFI-4061
>                 URL: https://issues.apache.org/jira/browse/NIFI-4061
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>
> We currently have only one clustered state provider which is a ZooKeeper 
> implementation. Redis would make a good candidate to provide another option.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to