hleonps commented on code in PR #10208:
URL: https://github.com/apache/nifi/pull/10208#discussion_r2737009585


##########
nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java:
##########
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({"graph", "gremlin", "cypher", "enrich", "record"})
+@CapabilityDescription("This processor uses fields from FlowFile records to 
add property values to nodes or edges in a graph. Each record is associated 
with an individual node/edge "
+        + "(associated by the specified 'identifier' field value), and a 
single FlowFile will be output for all successful operations. Failed records 
will be sent as "
+        + "individual FlowFiles to the failure relationship.")
+@WritesAttributes({
+        @WritesAttribute(attribute = EnrichGraphRecord.GRAPH_OPERATION_TIME, 
description = "The amount of time it took to execute all of the graph 
operations."),
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "Field(s) containing values to be added to the matched 
node/edge as properties. If no user-defined properties are added, all fields 
except the identifier "
+        + "will be added as properties on the node/edge",
+        value = "The variable name to be set", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "A dynamic property specifying a RecordField Expression 
identifying field(s) for whose values will be added to the matched node as 
properties")
+public class EnrichGraphRecord extends AbstractGraphExecutor {
+
+    private static final AllowableValue NODES = new AllowableValue(
+            GraphClientService.NODES_TYPE,
+            GraphClientService.NODES_TYPE,
+            "Enrich nodes in the graph with properties from the incoming 
records. The node identifier is determined by the 'Identifier Field(s)' 
property."
+    );
+
+    private static final AllowableValue EDGES = new AllowableValue(
+            GraphClientService.EDGES_TYPE,
+            GraphClientService.EDGES_TYPE,
+            "Enrich edges in the graph with properties from the incoming 
records. The edge identifier is determined by the 'Identifier Field(s)' 
property."
+    );
+
+    public static final PropertyDescriptor CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Graph Client Service")
+            .description("The graph client service for connecting to a graph 
database.")
+            .identifiesControllerService(GraphClientService.class)
+            .addValidator(Validator.VALID)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor READER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The record reader to use with this processor to read 
incoming records.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor WRITER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Failed Record Writer")
+            .description("The record writer to use for writing failed 
records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor UPDATE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Components to Enrich")
+            .description("The components in the graph to enrich with 
properties from the incoming records.")
+            .addValidator(Validator.VALID)
+            .allowableValues(NODES, EDGES)
+            .defaultValue(NODES.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field(s)")
+            .description("A RecordPath Expression for field(s) in the record 
used to match the node identifier(s) in order to set properties on that node")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor NODE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Node/Edge Type")
+            .description("The type of the nodes or edges to match on. Setting 
this can result in faster execution")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    public static final Relationship ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("Original flow files that successfully interacted 
with " +
+                    "graph server.")
+            .build();
+    public static final Relationship FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Flow files that fail to interact with graph server.")
+            .build();
+    public static final Relationship GRAPH = new 
Relationship.Builder().name("response")
+            .description("The response object from the graph server.")
+            .autoTerminateDefault(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            CLIENT_SERVICE,
+            READER_SERVICE,
+            WRITER_SERVICE,
+            UPDATE_TYPE,
+            IDENTIFIER_FIELD,
+            NODE_TYPE
+    );
+
+    private static final Set<Relationship> RELATIONSHIPS = Set.of(
+            ORIGINAL,
+            FAILURE,
+            GRAPH
+    );
+
+    public static final String RECORD_COUNT = "record.count";
+    public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
+    private volatile RecordPathCache recordPathCache;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    private GraphClientService clientService;
+    private RecordReaderFactory recordReaderFactory;
+    private RecordSetWriterFactory recordSetWriterFactory;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+        recordReaderFactory = 
context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+        recordSetWriterFactory = 
context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+        recordPathCache = new RecordPathCache(100);
+    }
+
+    private List<FieldValue> getRecordValue(Record record, RecordPath 
recordPath) {
+        final RecordPathResult result = recordPath.evaluate(record);
+        final List<FieldValue> values = result.getSelectedFields().toList();
+        return values.isEmpty() ? null : values;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        Map<String, RecordPath> dynamic = new HashMap<>();
+
+        FlowFile finalInput = input;
+        context.getProperties()
+                .keySet().stream()
+                .filter(PropertyDescriptor::isDynamic)
+                .forEach(it ->
+                        dynamic.put(it.getName(), recordPathCache.getCompiled(
+                                context
+                                        .getProperty(it.getName())
+                                        
.evaluateAttributeExpressions(finalInput)
+                                        .getValue()))
+            );
+
+        long delta;
+        FlowFile failedRecords = session.create(input);
+        WriteResult failedWriteResult = null;
+        try (InputStream is = session.read(input);
+             RecordReader reader = 
recordReaderFactory.createRecordReader(input, is, getLogger());
+             OutputStream os = session.write(failedRecords);
+             RecordSetWriter failedWriter = 
recordSetWriterFactory.createWriter(getLogger(), reader.getSchema(), os, 
input.getAttributes())
+        ) {
+            Record record;
+            long start = System.currentTimeMillis();
+            failedWriter.beginRecordSet();
+            int records = 0;
+            while ((record = reader.nextRecord()) != null) {
+                FlowFile graph = session.create(input);
+
+                try {
+                    final String identifierField = 
context.getProperty(IDENTIFIER_FIELD).evaluateAttributeExpressions(input).getValue();
+                    final RecordPath identifierPath = 
recordPathCache.getCompiled(identifierField);
+                    final List<FieldValue> identifierValues = 
getRecordValue(record, identifierPath);
+                    if (identifierValues == null || 
identifierValues.isEmpty()) {
+                        throw new IOException("Identifier field(s) not found 
in record (check the RecordPath Expression), sending this record to failure");
+                    }
+                    Map<String, Object> dynamicPropertyMap = new HashMap<>();
+                    Set<String> keySet = dynamic.keySet();
+                    if (keySet.isEmpty()) {
+                        // Add all dynamic properties at the top level except 
the identifier field
+                        List<String> fieldNames = 
record.getSchema().getFieldNames();
+                        for (String fieldName : fieldNames) {
+                            if (fieldName.equals(identifierField)) {

Review Comment:
   This condition won´t meet due the `identifierField` contains the raw value 
set in the processor's properties (RecordPath)



##########
nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java:
##########
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({"graph", "gremlin", "cypher", "enrich", "record"})
+@CapabilityDescription("This processor uses fields from FlowFile records to 
add property values to nodes or edges in a graph. Each record is associated 
with an individual node/edge "
+        + "(associated by the specified 'identifier' field value), and a 
single FlowFile will be output for all successful operations. Failed records 
will be sent as "
+        + "individual FlowFiles to the failure relationship.")
+@WritesAttributes({
+        @WritesAttribute(attribute = EnrichGraphRecord.GRAPH_OPERATION_TIME, 
description = "The amount of time it took to execute all of the graph 
operations."),
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "Field(s) containing values to be added to the matched 
node/edge as properties. If no user-defined properties are added, all fields 
except the identifier "
+        + "will be added as properties on the node/edge",
+        value = "The variable name to be set", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "A dynamic property specifying a RecordField Expression 
identifying field(s) for whose values will be added to the matched node as 
properties")
+public class EnrichGraphRecord extends AbstractGraphExecutor {
+
+    private static final AllowableValue NODES = new AllowableValue(
+            GraphClientService.NODES_TYPE,
+            GraphClientService.NODES_TYPE,
+            "Enrich nodes in the graph with properties from the incoming 
records. The node identifier is determined by the 'Identifier Field(s)' 
property."
+    );
+
+    private static final AllowableValue EDGES = new AllowableValue(
+            GraphClientService.EDGES_TYPE,
+            GraphClientService.EDGES_TYPE,
+            "Enrich edges in the graph with properties from the incoming 
records. The edge identifier is determined by the 'Identifier Field(s)' 
property."
+    );
+
+    public static final PropertyDescriptor CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Graph Client Service")
+            .description("The graph client service for connecting to a graph 
database.")
+            .identifiesControllerService(GraphClientService.class)
+            .addValidator(Validator.VALID)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor READER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The record reader to use with this processor to read 
incoming records.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor WRITER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Failed Record Writer")
+            .description("The record writer to use for writing failed 
records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor UPDATE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Components to Enrich")
+            .description("The components in the graph to enrich with 
properties from the incoming records.")
+            .addValidator(Validator.VALID)
+            .allowableValues(NODES, EDGES)
+            .defaultValue(NODES.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field(s)")
+            .description("A RecordPath Expression for field(s) in the record 
used to match the node identifier(s) in order to set properties on that node")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor NODE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Node/Edge Type")
+            .description("The type of the nodes or edges to match on. Setting 
this can result in faster execution")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    public static final Relationship ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("Original flow files that successfully interacted 
with " +
+                    "graph server.")
+            .build();
+    public static final Relationship FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Flow files that fail to interact with graph server.")
+            .build();
+    public static final Relationship GRAPH = new 
Relationship.Builder().name("response")
+            .description("The response object from the graph server.")
+            .autoTerminateDefault(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            CLIENT_SERVICE,
+            READER_SERVICE,
+            WRITER_SERVICE,
+            UPDATE_TYPE,
+            IDENTIFIER_FIELD,
+            NODE_TYPE
+    );
+
+    private static final Set<Relationship> RELATIONSHIPS = Set.of(
+            ORIGINAL,
+            FAILURE,
+            GRAPH
+    );
+
+    public static final String RECORD_COUNT = "record.count";
+    public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
+    private volatile RecordPathCache recordPathCache;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    private GraphClientService clientService;
+    private RecordReaderFactory recordReaderFactory;
+    private RecordSetWriterFactory recordSetWriterFactory;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+        recordReaderFactory = 
context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+        recordSetWriterFactory = 
context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+        recordPathCache = new RecordPathCache(100);
+    }
+
+    private List<FieldValue> getRecordValue(Record record, RecordPath 
recordPath) {
+        final RecordPathResult result = recordPath.evaluate(record);
+        final List<FieldValue> values = result.getSelectedFields().toList();
+        return values.isEmpty() ? null : values;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        Map<String, RecordPath> dynamic = new HashMap<>();
+
+        FlowFile finalInput = input;
+        context.getProperties()
+                .keySet().stream()
+                .filter(PropertyDescriptor::isDynamic)
+                .forEach(it ->
+                        dynamic.put(it.getName(), recordPathCache.getCompiled(
+                                context
+                                        .getProperty(it.getName())
+                                        
.evaluateAttributeExpressions(finalInput)
+                                        .getValue()))
+            );
+
+        long delta;
+        FlowFile failedRecords = session.create(input);
+        WriteResult failedWriteResult = null;
+        try (InputStream is = session.read(input);
+             RecordReader reader = 
recordReaderFactory.createRecordReader(input, is, getLogger());
+             OutputStream os = session.write(failedRecords);
+             RecordSetWriter failedWriter = 
recordSetWriterFactory.createWriter(getLogger(), reader.getSchema(), os, 
input.getAttributes())
+        ) {
+            Record record;
+            long start = System.currentTimeMillis();
+            failedWriter.beginRecordSet();
+            int records = 0;
+            while ((record = reader.nextRecord()) != null) {
+                FlowFile graph = session.create(input);
+
+                try {
+                    final String identifierField = 
context.getProperty(IDENTIFIER_FIELD).evaluateAttributeExpressions(input).getValue();
+                    final RecordPath identifierPath = 
recordPathCache.getCompiled(identifierField);
+                    final List<FieldValue> identifierValues = 
getRecordValue(record, identifierPath);
+                    if (identifierValues == null || 
identifierValues.isEmpty()) {
+                        throw new IOException("Identifier field(s) not found 
in record (check the RecordPath Expression), sending this record to failure");
+                    }
+                    Map<String, Object> dynamicPropertyMap = new HashMap<>();
+                    Set<String> keySet = dynamic.keySet();
+                    if (keySet.isEmpty()) {
+                        // Add all dynamic properties at the top level except 
the identifier field
+                        List<String> fieldNames = 
record.getSchema().getFieldNames();
+                        for (String fieldName : fieldNames) {
+                            if (fieldName.equals(identifierField)) {
+                                continue;
+                            }
+                            final List<FieldValue> propertyValues = 
getRecordValue(record, recordPathCache.getCompiled("/" + fieldName));
+                            // Use the first value if multiple are found
+                            if (propertyValues == null || 
propertyValues.isEmpty() || propertyValues.getFirst().getValue() == null) {
+                                continue;
+                            }
+
+                            Object rawValue = 
propertyValues.getFirst().getValue();
+                            DataType rawDataType = 
propertyValues.getFirst().getField().getDataType();
+                            RecordFieldType rawValueType = 
rawDataType.getFieldType();
+                            // Change MapRecords to Maps recursively as needed
+                            if (RecordFieldType.ARRAY.equals(rawValueType)) {
+                                DataType arrayElementType = ((ArrayDataType) 
rawDataType).getElementType();
+                                if 
(RecordFieldType.RECORD.getDataType().equals(arrayElementType)) {
+                                    Object[] rawValueArray = (Object[]) 
rawValue;
+                                    Object[] mappedValueArray = new 
Object[rawValueArray.length];
+                                    for (int i = 0; i < rawValueArray.length; 
i++) {
+                                        MapRecord mapRecord = (MapRecord) 
rawValueArray[i];
+                                        mappedValueArray[i] = 
mapRecord.toMap(true);
+                                    }
+                                    dynamicPropertyMap.put(fieldName, 
mappedValueArray);
+                                }
+                            } else if 
(RecordFieldType.RECORD.equals(rawValueType)) {
+                                MapRecord mapRecord = (MapRecord) rawValue;
+                                dynamicPropertyMap.put(fieldName, 
mapRecord.toMap(true));
+                            } else if 
(RecordFieldType.STRING.equals(rawValueType)) {
+                                // Escape single quotes
+                                String stringValue = (String) rawValue;
+                                if (rawValue != null) {
+                                    stringValue = stringValue.replace('\'', 
'\\');
+                                    dynamicPropertyMap.put(fieldName, 
stringValue);
+                                }
+                            } else {
+                                dynamicPropertyMap.put(fieldName, rawValue);
+                            }

Review Comment:
   The mapping is working fine, but there are combinations where it might not 
work. For example: if the `ARRAY type` is not at top level attribute (i.e the 
array is inside a record), it won't map correctly.
   
   I suggest to move the mapping block to its own method and then process each 
attribute recursively like a tree. This way each nested attribute is mapped as 
expected.



##########
nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java:
##########
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({"graph", "gremlin", "cypher", "enrich", "record"})
+@CapabilityDescription("This processor uses fields from FlowFile records to 
add property values to nodes or edges in a graph. Each record is associated 
with an individual node/edge "
+        + "(associated by the specified 'identifier' field value), and a 
single FlowFile will be output for all successful operations. Failed records 
will be sent as "
+        + "individual FlowFiles to the failure relationship.")
+@WritesAttributes({
+        @WritesAttribute(attribute = EnrichGraphRecord.GRAPH_OPERATION_TIME, 
description = "The amount of time it took to execute all of the graph 
operations."),
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "Field(s) containing values to be added to the matched 
node/edge as properties. If no user-defined properties are added, all fields 
except the identifier "
+        + "will be added as properties on the node/edge",
+        value = "The variable name to be set", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "A dynamic property specifying a RecordField Expression 
identifying field(s) for whose values will be added to the matched node as 
properties")
+public class EnrichGraphRecord extends AbstractGraphExecutor {
+
+    private static final AllowableValue NODES = new AllowableValue(
+            GraphClientService.NODES_TYPE,
+            GraphClientService.NODES_TYPE,
+            "Enrich nodes in the graph with properties from the incoming 
records. The node identifier is determined by the 'Identifier Field(s)' 
property."
+    );
+
+    private static final AllowableValue EDGES = new AllowableValue(
+            GraphClientService.EDGES_TYPE,
+            GraphClientService.EDGES_TYPE,
+            "Enrich edges in the graph with properties from the incoming 
records. The edge identifier is determined by the 'Identifier Field(s)' 
property."
+    );
+
+    public static final PropertyDescriptor CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Graph Client Service")
+            .description("The graph client service for connecting to a graph 
database.")
+            .identifiesControllerService(GraphClientService.class)
+            .addValidator(Validator.VALID)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor READER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The record reader to use with this processor to read 
incoming records.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor WRITER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Failed Record Writer")
+            .description("The record writer to use for writing failed 
records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor UPDATE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Components to Enrich")
+            .description("The components in the graph to enrich with 
properties from the incoming records.")
+            .addValidator(Validator.VALID)
+            .allowableValues(NODES, EDGES)
+            .defaultValue(NODES.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field(s)")
+            .description("A RecordPath Expression for field(s) in the record 
used to match the node identifier(s) in order to set properties on that node")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor NODE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Node/Edge Type")
+            .description("The type of the nodes or edges to match on. Setting 
this can result in faster execution")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    public static final Relationship ORIGINAL = new 
Relationship.Builder().name("original")
+            .description("Original flow files that successfully interacted 
with " +
+                    "graph server.")
+            .build();
+    public static final Relationship FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Flow files that fail to interact with graph server.")
+            .build();
+    public static final Relationship GRAPH = new 
Relationship.Builder().name("response")
+            .description("The response object from the graph server.")
+            .autoTerminateDefault(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            CLIENT_SERVICE,
+            READER_SERVICE,
+            WRITER_SERVICE,
+            UPDATE_TYPE,
+            IDENTIFIER_FIELD,
+            NODE_TYPE
+    );
+
+    private static final Set<Relationship> RELATIONSHIPS = Set.of(
+            ORIGINAL,
+            FAILURE,
+            GRAPH
+    );
+
+    public static final String RECORD_COUNT = "record.count";
+    public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
+    private volatile RecordPathCache recordPathCache;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    private GraphClientService clientService;
+    private RecordReaderFactory recordReaderFactory;
+    private RecordSetWriterFactory recordSetWriterFactory;
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+        recordReaderFactory = 
context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+        recordSetWriterFactory = 
context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+        recordPathCache = new RecordPathCache(100);
+    }
+
+    private List<FieldValue> getRecordValue(Record record, RecordPath 
recordPath) {
+        final RecordPathResult result = recordPath.evaluate(record);
+        final List<FieldValue> values = result.getSelectedFields().toList();
+        return values.isEmpty() ? null : values;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        Map<String, RecordPath> dynamic = new HashMap<>();
+
+        FlowFile finalInput = input;
+        context.getProperties()
+                .keySet().stream()
+                .filter(PropertyDescriptor::isDynamic)
+                .forEach(it ->
+                        dynamic.put(it.getName(), recordPathCache.getCompiled(
+                                context
+                                        .getProperty(it.getName())
+                                        
.evaluateAttributeExpressions(finalInput)
+                                        .getValue()))
+            );
+
+        long delta;
+        FlowFile failedRecords = session.create(input);
+        WriteResult failedWriteResult = null;
+        try (InputStream is = session.read(input);
+             RecordReader reader = 
recordReaderFactory.createRecordReader(input, is, getLogger());
+             OutputStream os = session.write(failedRecords);
+             RecordSetWriter failedWriter = 
recordSetWriterFactory.createWriter(getLogger(), reader.getSchema(), os, 
input.getAttributes())
+        ) {
+            Record record;
+            long start = System.currentTimeMillis();
+            failedWriter.beginRecordSet();
+            int records = 0;
+            while ((record = reader.nextRecord()) != null) {
+                FlowFile graph = session.create(input);
+
+                try {
+                    final String identifierField = 
context.getProperty(IDENTIFIER_FIELD).evaluateAttributeExpressions(input).getValue();
+                    final RecordPath identifierPath = 
recordPathCache.getCompiled(identifierField);
+                    final List<FieldValue> identifierValues = 
getRecordValue(record, identifierPath);
+                    if (identifierValues == null || 
identifierValues.isEmpty()) {
+                        throw new IOException("Identifier field(s) not found 
in record (check the RecordPath Expression), sending this record to failure");
+                    }
+                    Map<String, Object> dynamicPropertyMap = new HashMap<>();
+                    Set<String> keySet = dynamic.keySet();
+                    if (keySet.isEmpty()) {
+                        // Add all dynamic properties at the top level except 
the identifier field
+                        List<String> fieldNames = 
record.getSchema().getFieldNames();
+                        for (String fieldName : fieldNames) {
+                            if (fieldName.equals(identifierField)) {
+                                continue;
+                            }
+                            final List<FieldValue> propertyValues = 
getRecordValue(record, recordPathCache.getCompiled("/" + fieldName));
+                            // Use the first value if multiple are found
+                            if (propertyValues == null || 
propertyValues.isEmpty() || propertyValues.getFirst().getValue() == null) {
+                                continue;
+                            }
+
+                            Object rawValue = 
propertyValues.getFirst().getValue();
+                            DataType rawDataType = 
propertyValues.getFirst().getField().getDataType();
+                            RecordFieldType rawValueType = 
rawDataType.getFieldType();
+                            // Change MapRecords to Maps recursively as needed
+                            if (RecordFieldType.ARRAY.equals(rawValueType)) {
+                                DataType arrayElementType = ((ArrayDataType) 
rawDataType).getElementType();
+                                if 
(RecordFieldType.RECORD.getDataType().equals(arrayElementType)) {
+                                    Object[] rawValueArray = (Object[]) 
rawValue;
+                                    Object[] mappedValueArray = new 
Object[rawValueArray.length];
+                                    for (int i = 0; i < rawValueArray.length; 
i++) {
+                                        MapRecord mapRecord = (MapRecord) 
rawValueArray[i];
+                                        mappedValueArray[i] = 
mapRecord.toMap(true);
+                                    }
+                                    dynamicPropertyMap.put(fieldName, 
mappedValueArray);
+                                }
+                            } else if 
(RecordFieldType.RECORD.equals(rawValueType)) {
+                                MapRecord mapRecord = (MapRecord) rawValue;
+                                dynamicPropertyMap.put(fieldName, 
mapRecord.toMap(true));
+                            } else if 
(RecordFieldType.STRING.equals(rawValueType)) {
+                                // Escape single quotes
+                                String stringValue = (String) rawValue;
+                                if (rawValue != null) {
+                                    stringValue = stringValue.replace('\'', 
'\\');
+                                    dynamicPropertyMap.put(fieldName, 
stringValue);
+                                }
+                            } else {
+                                dynamicPropertyMap.put(fieldName, rawValue);
+                            }
+                        }
+                    } else {
+                        for (String entry : keySet) {
+                            if (!dynamicPropertyMap.containsKey(entry)) {
+                                final List<FieldValue> propertyValues = 
getRecordValue(record, dynamic.get(entry));
+                                // Use the first value if multiple are found
+                                if (propertyValues == null || 
propertyValues.isEmpty() || propertyValues.getFirst().getValue() == null) {
+                                    throw new IOException("Dynamic property 
field(s) not found in record (check the RecordPath Expression), sending this 
record to failure");
+                                }
+
+                                dynamicPropertyMap.put(entry, 
propertyValues.getFirst().getValue());

Review Comment:
   Add call to the suggested mapping method for consistent results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to