[
https://issues.apache.org/jira/browse/NIFI-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054823#comment-16054823
]
ASF GitHub Bot commented on NIFI-4061:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1918#discussion_r122837003
--- Diff:
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.state;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.redis.util.RedisAction;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+import
org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A StateProvider backed by Redis.
+ */
+public class RedisStateProvider extends AbstractConfigurableComponent
implements StateProvider {
+
+ static final int ENCODING_VERSION = 1;
+
+ private String identifier;
+ private PropertyContext context;
+ private ComponentLog logger;
+
+ private volatile boolean enabled;
+ private volatile JedisConnectionFactory connectionFactory;
+
+ private final RedisStateMapSerDe serDe = new RedisStateMapJsonSerDe();
+
+ @Override
+ public final void initialize(final StateProviderInitializationContext
context) throws IOException {
+ this.context = context;
+ this.identifier = context.getIdentifier();
+ this.logger = context.getLogger();
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(RedisUtils.validate(validationContext));
+
+ final RedisType redisType =
RedisType.fromDisplayName(validationContext.getProperty(RedisUtils.REDIS_MODE).getValue());
+ if (redisType != null && redisType == RedisType.CLUSTER) {
+ results.add(new ValidationResult.Builder()
+ .subject(RedisUtils.REDIS_MODE.getDisplayName())
+ .valid(false)
+ .explanation(RedisUtils.REDIS_MODE.getDisplayName()
+ + " is configured in clustered mode, and this
service requires a non-clustered Redis")
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public void enable() {
+ enabled = true;
+ }
+
+ @Override
+ public void disable() {
+ enabled = false;
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Override
+ public void shutdown() {
+ if (connectionFactory != null) {
+ connectionFactory.destroy();
+ connectionFactory = null;
+ }
+ }
+
+ @Override
+ public void setState(final Map<String, String> state, final String
componentId) throws IOException {
+ verifyEnabled();
+
+ final StateMap currStateMap = getState(componentId);
+
+ int attempted = 0;
+ boolean updated = false;
+
+ while (!updated && attempted < 20) {
+ updated = replace(currStateMap, state, componentId, true);
+ attempted++;
+ }
+
+ if (!updated) {
+ throw new IOException("Unable to update state due to
concurrent modifications");
+ }
+ }
+
+ @Override
+ public StateMap getState(final String componentId) throws IOException {
+ return withConnection(redisConnection -> {
+ final byte[] key =
getComponentPath(componentId).getBytes(StandardCharsets.UTF_8);
+ final byte[] value = redisConnection.get(key);
+
+ final RedisStateMap stateMap = serDe.deserialize(value);
+ if (stateMap == null) {
+ return new
RedisStateMap.Builder().encodingVersion(ENCODING_VERSION).build();
+ } else {
+ return stateMap;
+ }
+ });
+ }
+
+ @Override
+ public boolean replace(final StateMap oldValue, final Map<String,
String> newValue, final String componentId) throws IOException {
+ return replace(oldValue, newValue, componentId, false);
+ }
+
+ private boolean replace(final StateMap oldValue, final Map<String,
String> newValue, final String componentId, final boolean allowReplaceMissing)
throws IOException {
+ return withConnection(redisConnection -> {
+
+ boolean replaced = false;
+
+ // start a watch on the key and retrieve the current value
+ final byte[] key =
getComponentPath(componentId).getBytes(StandardCharsets.UTF_8);
+ redisConnection.watch(key);
+
+ final long prevVersion = oldValue == null ? -1L :
oldValue.getVersion();
+
+ final byte[] currValue = redisConnection.get(key);
+ final RedisStateMap currStateMap =
serDe.deserialize(currValue);
+ final long currVersion = currStateMap == null ? -1L :
currStateMap.getVersion();
+
+ // the replace API expects that you can't call replace on a
non-existing value, so unwatch and return
+ if (!allowReplaceMissing && currVersion == -1) {
+ redisConnection.unwatch();
+ return false;
+ }
+
+ // start a transaction
+ redisConnection.multi();
+
+ // compare-and-set
+ if (prevVersion == currVersion) {
+ // build the new RedisStateMap incrementing the version,
using latest encoding, and using the passed in values
+ final RedisStateMap newStateMap = new
RedisStateMap.Builder()
+ .version(currVersion + 1)
+ .encodingVersion(ENCODING_VERSION)
+ .stateValues(newValue)
+ .build();
+
+ // if we use set(k, newVal) then the results list will
always have size == 0 b/c when convertPipelineAndTxResults is set to true,
+ // status responses like "OK" are skipped over, so by
using getSet we can rely on the results list to know if the transaction
succeeded
+ redisConnection.getSet(key, serDe.serialize(newStateMap));
+ }
+
+ // execute the transaction
+ final List<Object> results = redisConnection.exec();
+
+ // if we have a result then the replace succeeded
+ if (results.size() > 0) {
+ replaced = true;
+ }
+
+ return replaced;
+ });
+ }
+
+ @Override
+ public void clear(final String componentId) throws IOException {
+ final StateMap currStateMap = getState(componentId);
+
+ int attempted = 0;
+ boolean updated = false;
+
+ while (!updated && attempted < 20) {
+ updated = replace(currStateMap, Collections.emptyMap(),
componentId, true);
+ attempted++;
+ }
+
+ if (!updated) {
+ throw new IOException("Unable to update state due to
concurrent modifications");
+ }
+ }
+
+ @Override
+ public void onComponentRemoved(final String componentId) throws
IOException {
+ withConnection(redisConnection -> {
+ final byte[] key =
getComponentPath(componentId).getBytes(StandardCharsets.UTF_8);
+ redisConnection.del(key);
+ return true;
+ });
+ }
+
+ @Override
+ public Scope[] getSupportedScopes() {
+ return new Scope[] {Scope.CLUSTER};
+ }
+
+ private String getComponentPath(final String componentId) {
--- End diff --
I like that idea, I added a property called "Key Prefix" since with Redis
it isn't as much of a path as it is with ZooKeeper. Even without the Key Prefix
they could still use the Database Index to segregate the keys, but I still like
making the prefix configurable.
> Implement a StateProvider backed by Redis
> -----------------------------------------
>
> Key: NIFI-4061
> URL: https://issues.apache.org/jira/browse/NIFI-4061
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
>
> We currently have only one clustered state provider which is a ZooKeeper
> implementation. Redis would make a good candidate to provide another option.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)