This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new de88ce7  NIFI-7906: Implemented RecordSetWriter support for 
ExecuteGraphQueryRecord
de88ce7 is described below

commit de88ce7a618aac4f7c886ccd1be39d3047313adb
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri Dec 4 12:23:07 2020 -0500

    NIFI-7906: Implemented RecordSetWriter support for ExecuteGraphQueryRecord
    
    This closes #4704
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../processors/graph/ExecuteGraphQueryRecord.java  | 113 ++++++++++++---------
 .../graph/ExecuteGraphQueryRecordTest.java         |  67 ++++++++----
 .../processors/graph/util/InMemoryGraphClient.java |  14 ++-
 3 files changed, 122 insertions(+), 72 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
 
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
index e750b6c..c73aa91 100644
--- 
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
@@ -20,20 +20,20 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.graph.GraphClientService;
-import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPath;
@@ -41,28 +41,31 @@ import org.apache.nifi.record.path.RecordPathResult;
 import org.apache.nifi.record.path.util.RecordPathCache;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.stream.Collectors;
 
-@Tags({"graph, gremlin"})
-@CapabilityDescription("This uses a flowfile as input to perform graph 
mutations.")
+@Tags({"graph", "gremlin", "cypher"})
+@CapabilityDescription("This uses FlowFile records as input to perform graph 
mutations. Each record is associated with an individual query/mutation, and a 
FlowFile will "
+    + "be output for each successful operation. Failed records will be sent as 
a single FlowFile to the failure relationship.")
 @WritesAttributes({
         @WritesAttribute(attribute = 
ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time 
it took to execute all of the graph operations."),
-        @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, 
description = "The amount of record processed")
+        @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, 
description = "The number of records unsuccessfully processed (written on 
FlowFiles routed to the "
+                + "'failure' relationship.")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @DynamicProperty(name = "A dynamic property to be used as a parameter in the 
graph script",
@@ -154,7 +157,7 @@ public class ExecuteGraphQueryRecord extends  
AbstractGraphExecutor {
     private GraphClientService clientService;
     private RecordReaderFactory recordReaderFactory;
     private RecordSetWriterFactory recordSetWriterFactory;
-    private ObjectMapper mapper = new ObjectMapper();
+    private final ObjectMapper mapper = new ObjectMapper();
 
     @OnScheduled
     public void onScheduled(ProcessContext context) {
@@ -179,8 +182,6 @@ public class ExecuteGraphQueryRecord extends  
AbstractGraphExecutor {
             return;
         }
 
-        List<FlowFile> graphList = new ArrayList<>();
-
         String recordScript = context.getProperty(SUBMISSION_SCRIPT)
                 .evaluateAttributeExpressions(input)
                 .getValue();
@@ -199,56 +200,70 @@ public class ExecuteGraphQueryRecord extends  
AbstractGraphExecutor {
                                     .getValue()))
                 );
 
-
-        boolean failed = false;
-        long delta = 0;
+        long delta;
+        FlowFile failedRecords = session.create(input);
+        WriteResult failedWriteResult = null;
         try (InputStream is = session.read(input);
              RecordReader reader = 
recordReaderFactory.createRecordReader(input, is, getLogger());
+             OutputStream os = session.write(failedRecords);
+             RecordSetWriter failedWriter = 
recordSetWriterFactory.createWriter(getLogger(), reader.getSchema(), os, 
input.getAttributes())
         ) {
             Record record;
-
             long start = System.currentTimeMillis();
+            failedWriter.beginRecordSet();
             while ((record = reader.nextRecord()) != null) {
                 FlowFile graph = session.create(input);
 
-                List<Map<String,Object>> graphResponses = new ArrayList<>();
-
-                Map<String, Object> dynamicPropertyMap = new HashMap<>();
-                for (String entry : dynamic.keySet()) {
-                        if(!dynamicPropertyMap.containsKey(entry)) {
+                try {
+                    Map<String, Object> dynamicPropertyMap = new HashMap<>();
+                    for (String entry : dynamic.keySet()) {
+                        if (!dynamicPropertyMap.containsKey(entry)) {
                             dynamicPropertyMap.put(entry, 
getRecordValue(record, dynamic.get(entry)));
                         }
-                }
+                    }
 
-                dynamicPropertyMap.putAll(input.getAttributes());
-                graphResponses.addAll(executeQuery(recordScript, 
dynamicPropertyMap));
+                    dynamicPropertyMap.putAll(input.getAttributes());
+                    List<Map<String, Object>> graphResponses = new 
ArrayList<>(executeQuery(recordScript, dynamicPropertyMap));
 
-                OutputStream graphOutputStream = session.write(graph);
-                String graphOutput = 
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
-                
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
-                graphList.add(graph);
-                graphOutputStream.close();
+                    OutputStream graphOutputStream = session.write(graph);
+                    String graphOutput = 
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
+                    
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
+                    graphOutputStream.close();
+                    session.transfer(graph, GRAPH);
+                } catch (Exception e) {
+                    // write failed records to a flowfile destined for the 
failure relationship
+                    failedWriter.write(record);
+                    session.remove(graph);
+                }
             }
             long end = System.currentTimeMillis();
             delta = (end - start) / 1000;
             if (getLogger().isDebugEnabled()){
                 getLogger().debug(String.format("Took %s seconds.", delta));
             }
+            failedWriteResult = failedWriter.finishRecordSet();
+            failedWriter.flush();
+
         } catch (Exception ex) {
-            getLogger().error("", ex);
-            failed = true;
-        } finally {
-            if (failed) {
-                graphList.forEach(session::remove);
-                session.transfer(input, FAILURE);
-            } else {
-                input = session.putAttribute(input, GRAPH_OPERATION_TIME, 
String.valueOf(delta));
-                session.getProvenanceReporter().send(input, 
clientService.getTransitUrl(), delta*1000);
-                session.transfer(input, SUCCESS);
-                graphList.forEach(it -> {
-                   session.transfer(it, GRAPH);
-                });
-            }
+            getLogger().error("Error reading records, routing input FlowFile 
to failure", ex);
+            session.remove(failedRecords);
+            session.transfer(input, FAILURE);
+            return;
+        }
+
+        // Generate provenance and send input flowfile to success
+        session.getProvenanceReporter().send(input, 
clientService.getTransitUrl(), delta*1000);
+
+        if (failedWriteResult.getRecordCount() < 1) {
+            // No failed records, remove the failure flowfile and send the 
input flowfile to success
+            session.remove(failedRecords);
+            input = session.putAttribute(input, GRAPH_OPERATION_TIME, 
String.valueOf(delta));
+            session.transfer(input, SUCCESS);
+        } else {
+            failedRecords = session.putAttribute(failedRecords, RECORD_COUNT, 
String.valueOf(failedWriteResult.getRecordCount()));
+            session.transfer(failedRecords, FAILURE);
+            // There were failures, don't send the input flowfile to SUCCESS
+            session.remove(input);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
 
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
index 7c260b2..d16507c 100644
--- 
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
@@ -41,14 +41,12 @@ import static org.junit.Assert.assertTrue;
 
 public class ExecuteGraphQueryRecordTest {
     private TestRunner runner;
-    private JsonTreeReader reader;
-    private InMemoryGraphClient graphClient;
     Map<String, String> enqueProperties = new HashMap<>();
 
     @Before
     public void setup() throws InitializationException {
         MockRecordWriter writer = new MockRecordWriter();
-        reader = new JsonTreeReader();
+        JsonTreeReader reader = new JsonTreeReader();
         runner = TestRunners.newTestRunner(ExecuteGraphQueryRecord.class);
         runner.addControllerService("reader", reader);
         runner.addControllerService("writer", writer);
@@ -57,23 +55,12 @@ public class ExecuteGraphQueryRecordTest {
 
         runner.enableControllerService(writer);
         runner.enableControllerService(reader);
-
-        graphClient = new InMemoryGraphClient();
-
-
-        runner.addControllerService("graphClient", graphClient);
-
-        runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE, 
"graphClient");
-        runner.enableControllerService(graphClient);
-        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[ 
'testProperty': 'testResponse' ]");
-        runner.assertValid();
-        enqueProperties.put("graph.name", "graph");
-
     }
 
     @Test
-    public void testFlowFileContent() throws IOException {
-        List<Map> test = new ArrayList<>();
+    public void testFlowFileContent() throws Exception {
+        setupGraphClient(false);
+        List<Map<String,Object>> test = new ArrayList<>();
         Map<String, Object> tempMap = new HashMap<>();
         tempMap.put("M", 1);
         test.add(tempMap);
@@ -96,8 +83,9 @@ public class ExecuteGraphQueryRecordTest {
     }
 
     @Test
-    public void testFlowFileList() throws IOException {
-        List<Map> test = new ArrayList<>();
+    public void testFlowFileList() throws Exception {
+        setupGraphClient(false);
+        List<Map<String,Object>> test = new ArrayList<>();
         Map<String, Object> tempMap = new HashMap<>();
         tempMap.put("M", new ArrayList<Integer>(){
             {
@@ -127,8 +115,9 @@ public class ExecuteGraphQueryRecordTest {
     }
 
     @Test
-    public void testComplexFlowFile() throws IOException {
-        List<Map> test = new ArrayList<>();
+    public void testComplexFlowFile() throws Exception {
+        setupGraphClient(false);
+        List<Map<String,Object>> test = new ArrayList<>();
         Map<String, Object> tempMap = new HashMap<>();
         tempMap.put("tMap", "123");
         tempMap.put("L", new ArrayList<Integer>(){
@@ -159,7 +148,8 @@ public class ExecuteGraphQueryRecordTest {
     }
 
     @Test
-    public void testAttributes() throws IOException {
+    public void testAttributes() throws Exception {
+        setupGraphClient(false);
         List<Map<String, Object>> test = new ArrayList<>();
         Map<String, Object> tempMap = new HashMap<>();
         tempMap.put("tMap", "123");
@@ -193,4 +183,37 @@ public class ExecuteGraphQueryRecordTest {
 
         return expected.equals(content);
     }
+
+    @Test
+    public void testExceptionOnQuery() throws Exception {
+        setupGraphClient(true);
+        List<Map<String,Object>> test = new ArrayList<>();
+        Map<String, Object> tempMap = new HashMap<>();
+        tempMap.put("M", 1);
+        test.add(tempMap);
+
+        byte[] json = JsonOutput.toJson(test).getBytes();
+        String submissionScript;
+        submissionScript = "[ 'M': M[0] ]";
+
+        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, 
submissionScript);
+        runner.setProperty("M", "/M");
+        runner.enqueue(json, enqueProperties);
+
+        runner.run();
+        runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 0);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 0);
+        runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 1);
+    }
+
+    private void setupGraphClient(boolean failOnQuery) throws 
InitializationException {
+        InMemoryGraphClient graphClient = new InMemoryGraphClient(failOnQuery);
+        runner.addControllerService("graphClient", graphClient);
+
+        runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE, 
"graphClient");
+        runner.enableControllerService(graphClient);
+        runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[ 
'testProperty': 'testResponse' ]");
+        runner.assertValid();
+        enqueProperties.put("graph.name", "graph");
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
 
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
index a12d5b8..0288f69 100644
--- 
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
+++ 
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
@@ -29,13 +29,22 @@ import org.janusgraph.core.JanusGraphFactory;
 import javax.script.ScriptEngine;
 import javax.script.ScriptEngineManager;
 import javax.script.ScriptException;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.AbstractMap.SimpleEntry;
 
 public class InMemoryGraphClient extends AbstractControllerService implements 
GraphClientService {
     private Graph graph;
+    private boolean generateExceptionOnQuery = false;
+
+    public InMemoryGraphClient() {
+        this(false);
+    }
+
+    public InMemoryGraphClient(final boolean generateExceptionOnQuery) {
+        this.generateExceptionOnQuery = generateExceptionOnQuery;
+    }
 
     @OnEnabled
     void onEnabled(ConfigurationContext context) {
@@ -48,6 +57,9 @@ public class InMemoryGraphClient extends 
AbstractControllerService implements Gr
 
     @Override
     public Map<String, String> executeQuery(String query, Map<String, Object> 
parameters, GraphQueryResultCallback graphQueryResultCallback) {
+        if(generateExceptionOnQuery) {
+            throw new ProcessException("Generated test exception");
+        }
         ScriptEngine engine = new 
ScriptEngineManager().getEngineByName("groovy");
         parameters.entrySet().stream().forEach( it -> {
             engine.put(it.getKey(), it.getValue());

Reply via email to