[
https://issues.apache.org/jira/browse/NIFI-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054148#comment-16054148
]
ASF GitHub Bot commented on NIFI-4061:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1918#discussion_r122731744
--- Diff:
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.service;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import
org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.redis.util.RedisAction;
+import org.apache.nifi.util.Tuple;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.ScanOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+@Tags({ "redis", "distributed", "cache", "map" })
+@CapabilityDescription("An implementation of DistributedMapCacheClient
that uses Redis as the backing cache. This service relies on " +
+ "the WATCH, MULTI, and EXEC commands in Redis, which are not fully
supported when Redis is clustered. As a result, this service " +
+ "can only be used with a Redis Connection Pool that is configured
for standalone or sentinel mode. Sentinel mode can be used to " +
+ "provide high-availability configurations.")
+public class RedisDistributedMapCacheClientService extends
AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
+
+ public static final PropertyDescriptor REDIS_CONNECTION_POOL = new
PropertyDescriptor.Builder()
+ .name("redis-connection-pool")
+ .displayName("Redis Connection Pool")
+ .identifiesControllerService(RedisConnectionPool.class)
+ .required(true)
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(REDIS_CONNECTION_POOL);
+ PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+ }
+
+ private volatile RedisConnectionPool redisConnectionPool;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult>
customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final RedisConnectionPool redisConnectionPool =
validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+ if (redisConnectionPool != null) {
+ final RedisType redisType = redisConnectionPool.getRedisType();
+ if (redisType != null && redisType == RedisType.CLUSTER) {
+ results.add(new ValidationResult.Builder()
+ .subject(REDIS_CONNECTION_POOL.getDisplayName())
+ .valid(false)
+ .explanation(REDIS_CONNECTION_POOL.getDisplayName()
+ + " is configured in clustered mode, and
this service requires a non-clustered Redis")
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.redisConnectionPool =
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ this.redisConnectionPool = null;
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
+ return withConnection(redisConnection -> {
+ final Tuple<byte[],byte[]> kv = serialize(key, value,
keySerializer, valueSerializer);
+ return redisConnection.setNX(kv.getKey(), kv.getValue());
+ });
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer, final
Deserializer<V> valueDeserializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final Tuple<byte[],byte[]> kv = serialize(key, value,
keySerializer, valueSerializer);
+ do {
+ // start a watch on the key and retrieve the current value
+ redisConnection.watch(kv.getKey());
+ final byte[] existingValue =
redisConnection.get(kv.getKey());
+
+ // start a transaction and perform the put-if-absent
+ redisConnection.multi();
+ redisConnection.setNX(kv.getKey(), kv.getValue());
+
+ // execute the transaction
+ final List<Object> results = redisConnection.exec();
+
+ // if the results list was empty, then the transaction
failed (i.e. key was modified after we started watching), so keep looping to
retry
+ // if the results list has results, then the transaction
succeeded and it should have the result of the setNX operation
+ if (results.size() > 0) {
+ final Object firstResult = results.get(0);
+ if (firstResult instanceof Boolean) {
+ final Boolean absent = (Boolean) firstResult;
+ return absent ? null :
valueDeserializer.deserialize(existingValue);
+ } else {
+ // this shouldn't really happen, but just in case
there is a non-boolean result then bounce out of the loop
+ throw new IOException("Unexpected result from
Redis transaction: Expected Boolean result, but got "
+ + firstResult.getClass().getName() + "
with value " + firstResult.toString());
+ }
+ }
+ } while (true);
--- End diff --
Rather than doing this `while (true)` we should probably do this while the
Controller Service is still enabled. Would involve having an @OnEnabled method
that sets a volatile boolean to enabled = true and an @OnDisabled method that
sets the value to false... I thought we had this in AbstractControllerService
already but it looks like there is no `boolean isEnabled()` method. Probably
worth just adding that to AbstractControllerService and call it here?
> Implement a StateProvider backed by Redis
> -----------------------------------------
>
> Key: NIFI-4061
> URL: https://issues.apache.org/jira/browse/NIFI-4061
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
>
> We currently have only one clustered state provider which is a ZooKeeper
> implementation. Redis would make a good candidate to provide another option.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)