dan-s1 commented on code in PR #11316:
URL: https://github.com/apache/nifi/pull/11316#discussion_r3444138758


##########
nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.graph.GraphElementType;
+import org.apache.nifi.graph.GraphClientTransientException;
+import org.apache.nifi.graph.GraphMutation;
+import org.apache.nifi.graph.GraphQueryGeneratorService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Tags({"graph", "gremlin", "cypher", "enrich", "record"})
+@CapabilityDescription("This processor uses fields from FlowFile records to 
add property values to graph elements. Each record is associated "
+        + "with an individual graph element using the specified identifier 
field values. A single FlowFile containing successful graph responses is "
+        + "written to the response relationship. Failed records are written to 
a single FlowFile routed to the failure relationship.")
+@WritesAttributes({
+        @WritesAttribute(attribute = 
EnrichGraphRecord.GRAPH_OPERATIONS_TIME_SECONDS, description = "The amount of 
time in seconds that it took to execute all graph operations."),
+        @WritesAttribute(attribute = EnrichGraphRecord.RECORD_COUNT, 
description = "The number of records unsuccessfully processed.")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "Field(s) containing values to be added to matched 
elements as properties. If no user-defined properties are added, all fields "
+        + "except identifier fields are added as element properties.",
+        value = "The property name to be set in the graph query",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "A dynamic property specifying a RecordPath Expression 
identifying field(s) whose values are added as properties")
+public class EnrichGraphRecord extends AbstractGraphExecutor {
+    private static final AllowableValue NODE = new AllowableValue(
+            GraphElementType.NODE.name(),
+            "Node",
+            "Enrich nodes in the graph with properties from incoming records."
+    );
+
+    private static final AllowableValue EDGE = new AllowableValue(
+            GraphElementType.EDGE.name(),
+            "Edge",
+            "Enrich edges in the graph with properties from incoming records."
+    );
+
+    public static final PropertyDescriptor CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Graph Client Service")
+            .description("The graph client service for connecting to a graph 
database.")
+            .identifiesControllerService(GraphClientService.class)
+            .addValidator(Validator.VALID)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor QUERY_GENERATOR_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Graph Query Generator Service")
+            .description("The graph query generator service used to build 
mutation statements for the selected graph implementation.")
+            .identifiesControllerService(GraphQueryGeneratorService.class)
+            .addValidator(Validator.VALID)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor READER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The record reader to use with this processor to read 
incoming records.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor WRITER_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Failed Record Writer")
+            .description("The record writer to use for writing failed 
records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor ELEMENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Element Type")
+            .description("The graph element type to enrich with properties 
from incoming records.")
+            .addValidator(Validator.VALID)
+            .allowableValues(NODE, EDGE)
+            .defaultValue(NODE.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor IDENTIFIER_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Identifier Field(s)")
+            .description("A RecordPath Expression for field(s) in the record 
used to match identifiers when setting properties.")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ELEMENT_LABEL = new 
PropertyDescriptor.Builder()
+            .name("Element Label")
+            .description("The graph element label used for matching in the 
graph query. Setting this can result in faster execution.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final Relationship ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("Original FlowFiles that successfully interacted with 
graph server.")
+            .build();
+
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that fail to interact with graph server.")
+            .build();
+
+    public static final Relationship RESPONSE = new Relationship.Builder()
+            .name("response")
+            .description("The response object from the graph server.")
+            .autoTerminateDefault(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            CLIENT_SERVICE,
+            QUERY_GENERATOR_SERVICE,
+            READER_SERVICE,
+            WRITER_SERVICE,
+            ELEMENT_TYPE,
+            IDENTIFIER_FIELD,
+            ELEMENT_LABEL
+    );
+
+    private static final Set<Relationship> RELATIONSHIPS = Set.of(
+            ORIGINAL,
+            FAILURE,
+            RESPONSE
+    );
+
+    public static final String RECORD_COUNT = "record.count";
+    public static final String GRAPH_OPERATIONS_TIME_SECONDS = 
"graph.operations.took";
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private volatile GraphClientService clientService;
+    private volatile GraphQueryGeneratorService graphQueryGeneratorService;
+    private volatile RecordReaderFactory recordReaderFactory;
+    private volatile RecordSetWriterFactory recordSetWriterFactory;
+
+    private volatile RecordPathCache recordPathCache;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = 
context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+        graphQueryGeneratorService = 
context.getProperty(QUERY_GENERATOR_SERVICE).asControllerService(GraphQueryGeneratorService.class);
+        recordReaderFactory = 
context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+        recordSetWriterFactory = 
context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+
+        recordPathCache = new RecordPathCache(100);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final String identifierRecordPathValue = 
context.getProperty(IDENTIFIER_FIELD).evaluateAttributeExpressions(input).getValue();
+        final RecordPath identifierRecordPath = 
recordPathCache.getCompiled(identifierRecordPathValue);
+        final String elementLabel = 
context.getProperty(ELEMENT_LABEL).evaluateAttributeExpressions(input).getValue();
+        final GraphElementType elementType = 
GraphElementType.valueOf(context.getProperty(ELEMENT_TYPE).getValue());
+
+        final Map<String, RecordPath> dynamicPropertyRecordPaths = new 
HashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> configuredProperty : 
context.getProperties().entrySet()) {
+            final PropertyDescriptor propertyDescriptor = 
configuredProperty.getKey();
+            if (propertyDescriptor.isDynamic()) {
+                final String recordPathValue = 
context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
+                dynamicPropertyRecordPaths.put(propertyDescriptor.getName(), 
recordPathCache.getCompiled(recordPathValue));
+            }
+        }
+
+        Duration graphOperationsDuration = Duration.ZERO;
+        int successfulRecordCount = 0;
+        final AtomicBoolean wroteGraphResponse = new AtomicBoolean(false);
+        WriteResult failedWriteResult;
+
+        FlowFile failedRecords = session.create(input);
+        FlowFile graphResponse = session.create(input);
+        try (
+                InputStream inputStream = session.read(input);
+                RecordReader recordReader = 
recordReaderFactory.createRecordReader(input, inputStream, getLogger());
+
+                OutputStream failedOutputStream = session.write(failedRecords);
+                RecordSetWriter failedWriter = 
recordSetWriterFactory.createWriter(getLogger(), recordReader.getSchema(), 
failedOutputStream, input.getAttributes());
+
+                OutputStream graphOutputStream = session.write(graphResponse)
+        ) {
+            final long processingStartNanos = System.nanoTime();
+
+            failedWriter.beginRecordSet();
+
+            graphOutputStream.write("[".getBytes(StandardCharsets.UTF_8));
+
+            int recordIndex = 0;
+            Record record;
+            while ((record = recordReader.nextRecord()) != null) {
+                try {
+                    final List<FieldValue> identifierFieldValues = 
getFieldValues(record, identifierRecordPath);
+                    if (identifierFieldValues.isEmpty()) {
+                        throw new IOException("Identifier field(s) not found 
in record, check the RecordPath expression");
+                    }
+
+                    final LinkedHashMap<String, Object> 
identifierFieldNameToValue = new LinkedHashMap<>(identifierFieldValues.size());

Review Comment:
   I believe PMD is complaining that the assignment should be to an interface
   
   ```suggestion
                       final Map<String, Object> identifierFieldNameToValue = 
new LinkedHashMap<>(identifierFieldValues.size());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to