exceptionfactory commented on code in PR #6943:
URL: https://github.com/apache/nifi/pull/6943#discussion_r1103281079


##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");

Review Comment:
   Recommend adjusting the wording to indicate the `Hash Value Record Path` and 
count for troubleshooting.
   ```suggestion
                       throw new ProcessException("No results found for Record 
[%d] Hash Value Record Path: %s", count, hashValueRecordPath.getPath()));
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received a null value, 
which is not allowed. Ensure "
+                            + "the RecordPath is pointing at a field that 
exists at that location and has a non-null value.");
+                }
+                final String hashValue = (String) 
DataTypeUtils.convertType(hashValueObject, 
RecordFieldType.STRING.getDataType(), charset.name());
+
+                List<Record> dataRecords = getDataRecords(dataRecordPath, 
record);
+
+                for (Record dataRecord : dataRecords) {
+                    RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+                    for (RecordField recordField : 
dataRecordSchema.getFields()) {
+                        final String fieldName = recordField.getFieldName();
+                        final Object value = record.getValue(fieldName);
+                        if (fieldName == null || value == null) {
+                            getLogger().debug("Record fieldname or value is 
null, skipping this field");
+                        } else {
+                            final String stringValue = (String) 
DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), 
charset.name());
+                            
redisConnection.hashCommands().hSet(hashValue.getBytes(charset), 
fieldName.getBytes(charset), stringValue.getBytes(charset));
+                        }
+                    }

Review Comment:
   These loops are nested multiple levels deep, could they be extracted to a 
separate method?



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received a null value, 
which is not allowed. Ensure "
+                            + "the RecordPath is pointing at a field that 
exists at that location and has a non-null value.");
+                }
+                final String hashValue = (String) 
DataTypeUtils.convertType(hashValueObject, 
RecordFieldType.STRING.getDataType(), charset.name());
+
+                List<Record> dataRecords = getDataRecords(dataRecordPath, 
record);
+
+                for (Record dataRecord : dataRecords) {
+                    RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+                    for (RecordField recordField : 
dataRecordSchema.getFields()) {
+                        final String fieldName = recordField.getFieldName();
+                        final Object value = record.getValue(fieldName);
+                        if (fieldName == null || value == null) {
+                            getLogger().debug("Record fieldname or value is 
null, skipping this field");
+                        } else {
+                            final String stringValue = (String) 
DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), 
charset.name());
+                            
redisConnection.hashCommands().hSet(hashValue.getBytes(charset), 
fieldName.getBytes(charset), stringValue.getBytes(charset));
+                        }
+                    }
+                    count++;
+                }
+            }
+
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input. Transferring 
FlowFile to failure", e);

Review Comment:
   Simplifying the error message and including the FlowFile reference could be 
helpful for troubleshooting.
   ```suggestion
               getLogger().error("Read Records failed {}", flowFile, e);
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")

Review Comment:
   Recommend removing the `redis` prefix from the property names.
   ```suggestion
               .name("record-reader")
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received a null value, 
which is not allowed. Ensure "
+                            + "the RecordPath is pointing at a field that 
exists at that location and has a non-null value.");

Review Comment:
   ```suggestion
                       throw new ProcessException(String.format("Null value 
found for Record [%d] Hash Value Record Path: %s", count, 
hashValueRecordPath.getPath()));
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisRecord.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+
+import org.apache.nifi.redis.service.RedisConnectionPoolService;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisHashCommands;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestPutRedisRecord {
+
+    private TestRunner runner;
+    private PutRedisRecord processor;
+    private MockRecordParser parser;
+
+    private MockRedisConnectionPoolService connectionPoolService;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        processor = new PutRedisRecord();
+        runner = TestRunners.newTestRunner(processor);
+        parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(PutRedisRecord.RECORD_READER_FACTORY, "parser");
+
+        connectionPoolService = new MockRedisConnectionPoolService();
+        connectionPoolService.setFailAfterN(0);
+        try {
+            runner.addControllerService("connectionPool", 
connectionPoolService);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.setProperty(connectionPoolService, 
RedisUtils.CONNECTION_STRING, "localhost:6379");
+        runner.setProperty(RedisUtils.REDIS_CONNECTION_POOL, "connectionPool");
+        // Tests should provide a field named 'hash' with unique values per 
record, unless testing failure conditions
+        runner.setProperty(PutRedisRecord.HASH_VALUE_RECORD_PATH, "/hash");
+    }
+
+    @Test
+    public void testPutRecords() {
+        runner.assertNotValid();
+        runner.enableControllerService(connectionPoolService);
+        parser.addSchemaField("hash", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+
+        parser.addRecord("abc", "John Doe", 48);
+        parser.addRecord("def", "Jane Doe", 47);
+        parser.addRecord("ghi", "Jimmy Doe", 14);
+
+        runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred("success", 1);
+        final List<MockFlowFile> result = 
runner.getFlowFilesForRelationship("success");

Review Comment:
   Recommend replacing `success` with the `REL_SUCCESS` Relationship reference.



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisRecord.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+
+import org.apache.nifi.redis.service.RedisConnectionPoolService;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisHashCommands;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestPutRedisRecord {
+
+    private TestRunner runner;
+    private PutRedisRecord processor;
+    private MockRecordParser parser;
+
+    private MockRedisConnectionPoolService connectionPoolService;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        processor = new PutRedisRecord();
+        runner = TestRunners.newTestRunner(processor);
+        parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(PutRedisRecord.RECORD_READER_FACTORY, "parser");
+
+        connectionPoolService = new MockRedisConnectionPoolService();
+        connectionPoolService.setFailAfterN(0);
+        try {
+            runner.addControllerService("connectionPool", 
connectionPoolService);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.setProperty(connectionPoolService, 
RedisUtils.CONNECTION_STRING, "localhost:6379");
+        runner.setProperty(RedisUtils.REDIS_CONNECTION_POOL, "connectionPool");
+        // Tests should provide a field named 'hash' with unique values per 
record, unless testing failure conditions
+        runner.setProperty(PutRedisRecord.HASH_VALUE_RECORD_PATH, "/hash");
+    }
+
+    @Test
+    public void testPutRecords() {
+        runner.assertNotValid();
+        runner.enableControllerService(connectionPoolService);
+        parser.addSchemaField("hash", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+
+        parser.addRecord("abc", "John Doe", 48);
+        parser.addRecord("def", "Jane Doe", 47);
+        parser.addRecord("ghi", "Jimmy Doe", 14);
+
+        runner.enqueue("hello".getBytes(StandardCharsets.UTF_8));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred("success", 1);
+        final List<MockFlowFile> result = 
runner.getFlowFilesForRelationship("success");
+        assertEquals(1, result.size());
+        MockFlowFile ff = result.get(0);
+        ff.assertAttributeEquals("putredisrecord.successful.record.count", 
"3");

Review Comment:
   Recommend replacing this attribute name with a new shared static value that 
can be reused.



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisRecord.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+
+import org.apache.nifi.redis.service.RedisConnectionPoolService;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisHashCommands;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestPutRedisRecord {
+
+    private TestRunner runner;
+    private PutRedisRecord processor;
+    private MockRecordParser parser;
+
+    private MockRedisConnectionPoolService connectionPoolService;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        processor = new PutRedisRecord();
+        runner = TestRunners.newTestRunner(processor);
+        parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }

Review Comment:
   Is there a reason for catching this exception instead of letting the 
`InitializationException` propagate to the method?



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received a null value, 
which is not allowed. Ensure "
+                            + "the RecordPath is pointing at a field that 
exists at that location and has a non-null value.");
+                }
+                final String hashValue = (String) 
DataTypeUtils.convertType(hashValueObject, 
RecordFieldType.STRING.getDataType(), charset.name());
+
+                List<Record> dataRecords = getDataRecords(dataRecordPath, 
record);
+
+                for (Record dataRecord : dataRecords) {
+                    RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+                    for (RecordField recordField : 
dataRecordSchema.getFields()) {
+                        final String fieldName = recordField.getFieldName();
+                        final Object value = record.getValue(fieldName);
+                        if (fieldName == null || value == null) {
+                            getLogger().debug("Record fieldname or value is 
null, skipping this field");
+                        } else {
+                            final String stringValue = (String) 
DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), 
charset.name());
+                            
redisConnection.hashCommands().hSet(hashValue.getBytes(charset), 
fieldName.getBytes(charset), stringValue.getBytes(charset));
+                        }
+                    }
+                    count++;
+                }
+            }
+
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input. Transferring 
FlowFile to failure", e);
+            flowFile = session.putAttribute(flowFile, 
"putredisrecord.successful.record.count", String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Couldn't create record writer. Transferring 
FlowFile to failure", e);

Review Comment:
   ```suggestion
               getLogger().error("Record Schema not found {}", flowFile, e);
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received a null value, 
which is not allowed. Ensure "
+                            + "the RecordPath is pointing at a field that 
exists at that location and has a non-null value.");
+                }
+                final String hashValue = (String) 
DataTypeUtils.convertType(hashValueObject, 
RecordFieldType.STRING.getDataType(), charset.name());
+
+                List<Record> dataRecords = getDataRecords(dataRecordPath, 
record);
+
+                for (Record dataRecord : dataRecords) {
+                    RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+                    for (RecordField recordField : 
dataRecordSchema.getFields()) {
+                        final String fieldName = recordField.getFieldName();
+                        final Object value = record.getValue(fieldName);
+                        if (fieldName == null || value == null) {
+                            getLogger().debug("Record fieldname or value is 
null, skipping this field");
+                        } else {
+                            final String stringValue = (String) 
DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), 
charset.name());
+                            
redisConnection.hashCommands().hSet(hashValue.getBytes(charset), 
fieldName.getBytes(charset), stringValue.getBytes(charset));
+                        }
+                    }
+                    count++;
+                }
+            }
+
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input. Transferring 
FlowFile to failure", e);
+            flowFile = session.putAttribute(flowFile, 
"putredisrecord.successful.record.count", String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Couldn't create record writer. Transferring 
FlowFile to failure", e);
+            flowFile = session.putAttribute(flowFile, 
"putredisrecord.successful.record.count", String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (Exception e) {
+            getLogger().error("Failed to put records to Redis. Transferring 
FlowFile to failure", e);

Review Comment:
   ```suggestion
               getLogger().error("Put Records failed {}", flowFile, e);
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received a null value, 
which is not allowed. Ensure "
+                            + "the RecordPath is pointing at a field that 
exists at that location and has a non-null value.");
+                }
+                final String hashValue = (String) 
DataTypeUtils.convertType(hashValueObject, 
RecordFieldType.STRING.getDataType(), charset.name());
+
+                List<Record> dataRecords = getDataRecords(dataRecordPath, 
record);
+
+                for (Record dataRecord : dataRecords) {
+                    RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+                    for (RecordField recordField : 
dataRecordSchema.getFields()) {
+                        final String fieldName = recordField.getFieldName();
+                        final Object value = record.getValue(fieldName);
+                        if (fieldName == null || value == null) {
+                            getLogger().debug("Record fieldname or value is 
null, skipping this field");
+                        } else {
+                            final String stringValue = (String) 
DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), 
charset.name());
+                            
redisConnection.hashCommands().hSet(hashValue.getBytes(charset), 
fieldName.getBytes(charset), stringValue.getBytes(charset));
+                        }
+                    }
+                    count++;
+                }
+            }
+
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input. Transferring 
FlowFile to failure", e);
+            flowFile = session.putAttribute(flowFile, 
"putredisrecord.successful.record.count", String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Couldn't create record writer. Transferring 
FlowFile to failure", e);
+            flowFile = session.putAttribute(flowFile, 
"putredisrecord.successful.record.count", String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (Exception e) {
+            getLogger().error("Failed to put records to Redis. Transferring 
FlowFile to failure", e);
+            flowFile = session.putAttribute(flowFile, 
"putredisrecord.successful.record.count", String.valueOf(count));
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        flowFile = session.putAttribute(flowFile, 
"putredisrecord.successful.record.count", String.valueOf(count));

Review Comment:
   The FlowFile attribute name could be extracted to a static final variable 
and reused in multiple places.
   ```suggestion
           flowFile = session.putAttribute(flowFile, SUCCESS_RECORD_COUNT, 
String.valueOf(count));
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")

Review Comment:
   ```suggestion
               .description("FlowFiles having all Records stored in Redis will 
be routed to this relationship")
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/processor/TestPutRedisRecord.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+
+import org.apache.nifi.redis.service.RedisConnectionPoolService;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisHashCommands;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestPutRedisRecord {
+
+    private TestRunner runner;
+    private PutRedisRecord processor;
+    private MockRecordParser parser;
+
+    private MockRedisConnectionPoolService connectionPoolService;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        processor = new PutRedisRecord();
+        runner = TestRunners.newTestRunner(processor);
+        parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(PutRedisRecord.RECORD_READER_FACTORY, "parser");

Review Comment:
   Recommend creating a static variable for `parser` and reusing.



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {

Review Comment:
   With the variety of methods available to interact with Redis, what do you 
think about giving this a more specific name? Perhaps `PutRedisHashRecord`?



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml:
##########
@@ -68,13 +68,35 @@
             <artifactId>nifi-utils</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>compile</scope>

Review Comment:
   The `compile` scope declaration is the default, so it should not be 
necessary.



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");

Review Comment:
   ```suggestion
                       throw new ProcessException(String.format("Multiple 
results [%d] found for Record [%d] Hash Value Record Path: %s", 
resultList.size(), count, hashValueRecordPath.getPath()));
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")

Review Comment:
   It appears that some records could be stored in Redis, but others could 
fail, is that correct? This could also occur for other errors not related to 
Redis, such as malformed records.
   ```suggestion
               .description("FlowFiles containing Records with processing 
errors will be routed to this relationship")
   ```



##########
nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/processor/PutRedisRecord.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.processor;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "redis", "hash", "record"})
+@CapabilityDescription("Puts record field data into Redis using a specified 
hash value, which is determined by a RecordPath to a field in each record 
containing the hash value. The record fields "
+        + "and values are stored as key/value pairs associated by the hash 
value. NOTE: Neither the evaluated hash value nor any of the field values can 
be null. If the hash value is null, "
+        + "the FlowFile will be routed to failure. For each of the field 
values, if the value is null that field will be not set in Redis.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "putredisrecord.successful.record.count", 
description = "The number of records successfully written to Redis. This can be 
compared to the 'record.count' "
+                + "attribute if it exists on the incoming FlowFile")
+})
+public class PutRedisRecord extends AbstractProcessor {
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("redis-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-hash-value-record-path")
+            .displayName("Hash Value Record Path")
+            .description("Specifies a RecordPath to evaluate against each 
Record in order to determine the hash value associated with all the record 
fields/values "
+                    + "(see 'hset' in Redis documentation for more details). 
The RecordPath must point at exactly one field or an error will occur.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(NONE)
+            .build();
+
+    static final PropertyDescriptor DATA_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("redis-data-record-path")
+            .displayName("Data Record Path")
+            .description("This property denotes a RecordPath that will be 
evaluated against each incoming Record and the Record that results from 
evaluating the RecordPath will be sent to" +
+                    " Redis instead of sending the entire incoming Record. The 
property defaults to the root '/' which corresponds to a 'flat' record (all 
fields/values at the top level of " +
+                    " the Record.")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            .defaultValue("/")
+            .expressionLanguageSupported(NONE)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("redis-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set to use when storing 
record field values as strings. All fields will be converted to strings using 
this character set "
+                    + "before being stored in Redis.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are successfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that are unsuccessfully entered into Redis 
will be routed to this relationship")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER_FACTORY);
+        props.add(REDIS_CONNECTION_POOL);
+        props.add(HASH_VALUE_RECORD_PATH);
+        props.add(DATA_RECORD_PATH);
+        props.add(CHARSET);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.redisConnectionPool = 
context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnStopped
+    public void onStopped() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        int count = 0;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, 
in, getLogger());
+             RedisConnection redisConnection = 
redisConnectionPool.getConnection()) {
+
+            final String hashValueRecordPathValue = 
context.getProperty(HASH_VALUE_RECORD_PATH).getValue();
+            final RecordPath hashValueRecordPath = 
RecordPath.compile(hashValueRecordPathValue);
+
+            final String dataRecordPathValue = 
context.getProperty(DATA_RECORD_PATH).getValue();
+            final RecordPath dataRecordPath = 
RecordPath.compile(dataRecordPathValue);
+
+            final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                final RecordPathResult recordPathResult = 
hashValueRecordPath.evaluate(record);
+                final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+                if (resultList.isEmpty()) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record but got no results");
+                }
+
+                if (resultList.size() > 1) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received multiple distinct 
results (" + resultList + ")");
+                }
+
+                final FieldValue hashValueFieldValue = resultList.get(0);
+                final Object hashValueObject = hashValueFieldValue.getValue();
+                if (hashValueObject == null) {
+                    throw new ProcessException("Evaluated RecordPath " + 
hashValueRecordPath.getPath() + " against Record and received a null value, 
which is not allowed. Ensure "
+                            + "the RecordPath is pointing at a field that 
exists at that location and has a non-null value.");
+                }
+                final String hashValue = (String) 
DataTypeUtils.convertType(hashValueObject, 
RecordFieldType.STRING.getDataType(), charset.name());
+
+                List<Record> dataRecords = getDataRecords(dataRecordPath, 
record);
+
+                for (Record dataRecord : dataRecords) {
+                    RecordSchema dataRecordSchema = dataRecord.getSchema();
+
+                    for (RecordField recordField : 
dataRecordSchema.getFields()) {
+                        final String fieldName = recordField.getFieldName();
+                        final Object value = record.getValue(fieldName);
+                        if (fieldName == null || value == null) {
+                            getLogger().debug("Record fieldname or value is 
null, skipping this field");

Review Comment:
   Recommend including the fieldName and value, even though null, for potential 
troubleshooting.
   ```suggestion
                               getLogger().debug("Record field missing required 
elements: name [{}] value [{}]", fieldName, value);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to