This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new de88ce7 NIFI-7906: Implemented RecordSetWriter support for
ExecuteGraphQueryRecord
de88ce7 is described below
commit de88ce7a618aac4f7c886ccd1be39d3047313adb
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri Dec 4 12:23:07 2020 -0500
NIFI-7906: Implemented RecordSetWriter support for ExecuteGraphQueryRecord
This closes #4704
Signed-off-by: Mike Thomsen <[email protected]>
---
.../processors/graph/ExecuteGraphQueryRecord.java | 113 ++++++++++++---------
.../graph/ExecuteGraphQueryRecordTest.java | 67 ++++++++----
.../processors/graph/util/InMemoryGraphClient.java | 14 ++-
3 files changed, 122 insertions(+), 72 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
index e750b6c..c73aa91 100644
---
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
@@ -20,20 +20,20 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.graph.GraphClientService;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
@@ -41,28 +41,31 @@ import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.stream.Collectors;
-@Tags({"graph, gremlin"})
-@CapabilityDescription("This uses a flowfile as input to perform graph
mutations.")
+@Tags({"graph", "gremlin", "cypher"})
+@CapabilityDescription("This uses FlowFile records as input to perform graph
mutations. Each record is associated with an individual query/mutation, and a
FlowFile will "
+ + "be output for each successful operation. Failed records will be sent as
a single FlowFile to the failure relationship.")
@WritesAttributes({
@WritesAttribute(attribute =
ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time
it took to execute all of the graph operations."),
- @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT,
description = "The amount of record processed")
+ @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT,
description = "The number of records unsuccessfully processed (written on
FlowFiles routed to the "
+ + "'failure' relationship.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "A dynamic property to be used as a parameter in the
graph script",
@@ -154,7 +157,7 @@ public class ExecuteGraphQueryRecord extends
AbstractGraphExecutor {
private GraphClientService clientService;
private RecordReaderFactory recordReaderFactory;
private RecordSetWriterFactory recordSetWriterFactory;
- private ObjectMapper mapper = new ObjectMapper();
+ private final ObjectMapper mapper = new ObjectMapper();
@OnScheduled
public void onScheduled(ProcessContext context) {
@@ -179,8 +182,6 @@ public class ExecuteGraphQueryRecord extends
AbstractGraphExecutor {
return;
}
- List<FlowFile> graphList = new ArrayList<>();
-
String recordScript = context.getProperty(SUBMISSION_SCRIPT)
.evaluateAttributeExpressions(input)
.getValue();
@@ -199,56 +200,70 @@ public class ExecuteGraphQueryRecord extends
AbstractGraphExecutor {
.getValue()))
);
-
- boolean failed = false;
- long delta = 0;
+ long delta;
+ FlowFile failedRecords = session.create(input);
+ WriteResult failedWriteResult = null;
try (InputStream is = session.read(input);
RecordReader reader =
recordReaderFactory.createRecordReader(input, is, getLogger());
+ OutputStream os = session.write(failedRecords);
+ RecordSetWriter failedWriter =
recordSetWriterFactory.createWriter(getLogger(), reader.getSchema(), os,
input.getAttributes())
) {
Record record;
-
long start = System.currentTimeMillis();
+ failedWriter.beginRecordSet();
while ((record = reader.nextRecord()) != null) {
FlowFile graph = session.create(input);
- List<Map<String,Object>> graphResponses = new ArrayList<>();
-
- Map<String, Object> dynamicPropertyMap = new HashMap<>();
- for (String entry : dynamic.keySet()) {
- if(!dynamicPropertyMap.containsKey(entry)) {
+ try {
+ Map<String, Object> dynamicPropertyMap = new HashMap<>();
+ for (String entry : dynamic.keySet()) {
+ if (!dynamicPropertyMap.containsKey(entry)) {
dynamicPropertyMap.put(entry,
getRecordValue(record, dynamic.get(entry)));
}
- }
+ }
- dynamicPropertyMap.putAll(input.getAttributes());
- graphResponses.addAll(executeQuery(recordScript,
dynamicPropertyMap));
+ dynamicPropertyMap.putAll(input.getAttributes());
+ List<Map<String, Object>> graphResponses = new
ArrayList<>(executeQuery(recordScript, dynamicPropertyMap));
- OutputStream graphOutputStream = session.write(graph);
- String graphOutput =
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
-
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
- graphList.add(graph);
- graphOutputStream.close();
+ OutputStream graphOutputStream = session.write(graph);
+ String graphOutput =
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
+
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
+ graphOutputStream.close();
+ session.transfer(graph, GRAPH);
+ } catch (Exception e) {
+ // write failed records to a flowfile destined for the
failure relationship
+ failedWriter.write(record);
+ session.remove(graph);
+ }
}
long end = System.currentTimeMillis();
delta = (end - start) / 1000;
if (getLogger().isDebugEnabled()){
getLogger().debug(String.format("Took %s seconds.", delta));
}
+ failedWriteResult = failedWriter.finishRecordSet();
+ failedWriter.flush();
+
} catch (Exception ex) {
- getLogger().error("", ex);
- failed = true;
- } finally {
- if (failed) {
- graphList.forEach(session::remove);
- session.transfer(input, FAILURE);
- } else {
- input = session.putAttribute(input, GRAPH_OPERATION_TIME,
String.valueOf(delta));
- session.getProvenanceReporter().send(input,
clientService.getTransitUrl(), delta*1000);
- session.transfer(input, SUCCESS);
- graphList.forEach(it -> {
- session.transfer(it, GRAPH);
- });
- }
+ getLogger().error("Error reading records, routing input FlowFile
to failure", ex);
+ session.remove(failedRecords);
+ session.transfer(input, FAILURE);
+ return;
+ }
+
+ // Generate provenance and send input flowfile to success
+ session.getProvenanceReporter().send(input,
clientService.getTransitUrl(), delta*1000);
+
+ if (failedWriteResult.getRecordCount() < 1) {
+ // No failed records, remove the failure flowfile and send the
input flowfile to success
+ session.remove(failedRecords);
+ input = session.putAttribute(input, GRAPH_OPERATION_TIME,
String.valueOf(delta));
+ session.transfer(input, SUCCESS);
+ } else {
+ failedRecords = session.putAttribute(failedRecords, RECORD_COUNT,
String.valueOf(failedWriteResult.getRecordCount()));
+ session.transfer(failedRecords, FAILURE);
+ // There were failures, don't send the input flowfile to SUCCESS
+ session.remove(input);
}
}
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
index 7c260b2..d16507c 100644
---
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
@@ -41,14 +41,12 @@ import static org.junit.Assert.assertTrue;
public class ExecuteGraphQueryRecordTest {
private TestRunner runner;
- private JsonTreeReader reader;
- private InMemoryGraphClient graphClient;
Map<String, String> enqueProperties = new HashMap<>();
@Before
public void setup() throws InitializationException {
MockRecordWriter writer = new MockRecordWriter();
- reader = new JsonTreeReader();
+ JsonTreeReader reader = new JsonTreeReader();
runner = TestRunners.newTestRunner(ExecuteGraphQueryRecord.class);
runner.addControllerService("reader", reader);
runner.addControllerService("writer", writer);
@@ -57,23 +55,12 @@ public class ExecuteGraphQueryRecordTest {
runner.enableControllerService(writer);
runner.enableControllerService(reader);
-
- graphClient = new InMemoryGraphClient();
-
-
- runner.addControllerService("graphClient", graphClient);
-
- runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE,
"graphClient");
- runner.enableControllerService(graphClient);
- runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[
'testProperty': 'testResponse' ]");
- runner.assertValid();
- enqueProperties.put("graph.name", "graph");
-
}
@Test
- public void testFlowFileContent() throws IOException {
- List<Map> test = new ArrayList<>();
+ public void testFlowFileContent() throws Exception {
+ setupGraphClient(false);
+ List<Map<String,Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("M", 1);
test.add(tempMap);
@@ -96,8 +83,9 @@ public class ExecuteGraphQueryRecordTest {
}
@Test
- public void testFlowFileList() throws IOException {
- List<Map> test = new ArrayList<>();
+ public void testFlowFileList() throws Exception {
+ setupGraphClient(false);
+ List<Map<String,Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("M", new ArrayList<Integer>(){
{
@@ -127,8 +115,9 @@ public class ExecuteGraphQueryRecordTest {
}
@Test
- public void testComplexFlowFile() throws IOException {
- List<Map> test = new ArrayList<>();
+ public void testComplexFlowFile() throws Exception {
+ setupGraphClient(false);
+ List<Map<String,Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("tMap", "123");
tempMap.put("L", new ArrayList<Integer>(){
@@ -159,7 +148,8 @@ public class ExecuteGraphQueryRecordTest {
}
@Test
- public void testAttributes() throws IOException {
+ public void testAttributes() throws Exception {
+ setupGraphClient(false);
List<Map<String, Object>> test = new ArrayList<>();
Map<String, Object> tempMap = new HashMap<>();
tempMap.put("tMap", "123");
@@ -193,4 +183,37 @@ public class ExecuteGraphQueryRecordTest {
return expected.equals(content);
}
+
+ @Test
+ public void testExceptionOnQuery() throws Exception {
+ setupGraphClient(true);
+ List<Map<String,Object>> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("M", 1);
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript;
+ submissionScript = "[ 'M': M[0] ]";
+
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT,
submissionScript);
+ runner.setProperty("M", "/M");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 0);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 0);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 1);
+ }
+
+ private void setupGraphClient(boolean failOnQuery) throws
InitializationException {
+ InMemoryGraphClient graphClient = new InMemoryGraphClient(failOnQuery);
+ runner.addControllerService("graphClient", graphClient);
+
+ runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE,
"graphClient");
+ runner.enableControllerService(graphClient);
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[
'testProperty': 'testResponse' ]");
+ runner.assertValid();
+ enqueProperties.put("graph.name", "graph");
+ }
}
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
index a12d5b8..0288f69 100644
---
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
@@ -29,13 +29,22 @@ import org.janusgraph.core.JanusGraphFactory;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
+import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.AbstractMap.SimpleEntry;
public class InMemoryGraphClient extends AbstractControllerService implements
GraphClientService {
private Graph graph;
+ private boolean generateExceptionOnQuery = false;
+
+ public InMemoryGraphClient() {
+ this(false);
+ }
+
+ public InMemoryGraphClient(final boolean generateExceptionOnQuery) {
+ this.generateExceptionOnQuery = generateExceptionOnQuery;
+ }
@OnEnabled
void onEnabled(ConfigurationContext context) {
@@ -48,6 +57,9 @@ public class InMemoryGraphClient extends
AbstractControllerService implements Gr
@Override
public Map<String, String> executeQuery(String query, Map<String, Object>
parameters, GraphQueryResultCallback graphQueryResultCallback) {
+ if(generateExceptionOnQuery) {
+ throw new ProcessException("Generated test exception");
+ }
ScriptEngine engine = new
ScriptEngineManager().getEngineByName("groovy");
parameters.entrySet().stream().forEach( it -> {
engine.put(it.getKey(), it.getValue());