This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c29cced NIFI-7906: parameterized graph query NIFI-7906: addressing PR
concerns NIFI-7906: code styling fixes NIFI-7906: adding in license information
to new files + enables processor in META-INF NIFI-7906: exclude test files from
RAT NIFI-7906: PR refactor to streamline graph response NIFI-7906: removing
ERRORS output Unused after refactor Did a few cleanups for the contributor.
c29cced is described below
commit c29cced269dcce28fb9ba034025d01e76a79b037
Author: Levi Lentz <[email protected]>
AuthorDate: Thu Oct 22 19:00:34 2020 -0400
NIFI-7906: parameterized graph query
NIFI-7906: addressing PR concerns
NIFI-7906: code styling fixes
NIFI-7906: adding in license information to new files
+ enables processor in META-INF
NIFI-7906: exclude test files from RAT
NIFI-7906: PR refactor to streamline graph response
NIFI-7906: removing ERRORS output
Unused after refactor
Did a few cleanups for the contributor.
This closes #4638
Signed-off-by: Mike Thomsen <[email protected]>
---
.../nifi-graph-processors/pom.xml | 55 +++++
.../processors/graph/ExecuteGraphQueryRecord.java | 270 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../graph/ExecuteGraphQueryRecordTest.java | 176 ++++++++++++++
.../processors/graph/util/InMemoryGraphClient.java | 113 +++++++++
.../src/test/resources/testAttributes.json | 3 +
.../src/test/resources/testComplexFlowFile.json | 9 +
.../src/test/resources/testFlowFileContent.json | 3 +
.../src/test/resources/testFlowFileList.json | 3 +
9 files changed, 633 insertions(+)
diff --git a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
index 9b68f45..1a9b6f6 100644
--- a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/pom.xml
@@ -78,5 +78,60 @@
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-json-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+
<exclude>src/test/resources/testAttributes.json</exclude>
+
<exclude>src/test/resources/testComplexFlowFile.json</exclude>
+
<exclude>src/test/resources/testFlowFileContent.json</exclude>
+
<exclude>src/test/resources/testFlowFileList.json</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
new file mode 100644
index 0000000..e750b6c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+@Tags({"graph, gremlin"})
+@CapabilityDescription("This uses a flowfile as input to perform graph
mutations.")
+@WritesAttributes({
+ @WritesAttribute(attribute =
ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time
it took to execute all of the graph operations."),
+ @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT,
description = "The amount of record processed")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "A dynamic property to be used as a parameter in the
graph script",
+ value = "The variable name to be set", expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ description = "Uses a record path to set a variable as a parameter in
the graph script")
+public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
+
+ public static final PropertyDescriptor CLIENT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("client-service")
+ .displayName("Client Service")
+ .description("The graph client service for connecting to a graph
database.")
+ .identifiesControllerService(GraphClientService.class)
+ .addValidator(Validator.VALID)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor READER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("reader-service")
+ .displayName("Record Reader")
+ .description("The record reader to use with this processor.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor WRITER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("writer-service")
+ .displayName("Failed Record Writer")
+ .description("The record writer to use for writing failed
records.")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor SUBMISSION_SCRIPT = new
PropertyDescriptor.Builder()
+ .name("record-script")
+ .displayName("Graph Record Script")
+ .description("Script to perform the business logic on graph, using
flow file attributes and custom properties " +
+ "as variable-value pairs in its logic.")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+
.dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ }
+
+ public static final List<PropertyDescriptor> DESCRIPTORS =
Collections.unmodifiableList(Arrays.asList(
+ CLIENT_SERVICE, READER_SERVICE, WRITER_SERVICE, SUBMISSION_SCRIPT
+ ));
+
+ public static final Relationship SUCCESS = new
Relationship.Builder().name("original")
+ .description("Original
flow files that successfully interacted with " +
+ "graph server.")
+ .build();
+ public static final Relationship FAILURE = new
Relationship.Builder().name("failure")
+ .description("Flow files
that fail to interact with graph server.")
+ .build();
+ public static final Relationship GRAPH = new
Relationship.Builder().name("response")
+ .description("The response
object from the graph server.")
+ .autoTerminateDefault(true)
+ .build();
+
+ public static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ SUCCESS, FAILURE, GRAPH
+ )));
+
+ public static final String RECORD_COUNT = "records.count";
+ public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
+ private volatile RecordPathCache recordPathCache;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ private GraphClientService clientService;
+ private RecordReaderFactory recordReaderFactory;
+ private RecordSetWriterFactory recordSetWriterFactory;
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ clientService =
context.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
+ recordReaderFactory =
context.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
+ recordSetWriterFactory =
context.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
+ recordPathCache = new RecordPathCache(100);
+ }
+
+ private List<Object> getRecordValue(Record record, RecordPath recordPath){
+ final RecordPathResult result = recordPath.evaluate(record);
+ return result.getSelectedFields()
+ .filter(fv -> fv.getValue() != null)
+ .map(FieldValue::getValue)
+ .collect( Collectors.toList());
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile input = session.get();
+ if ( input == null ) {
+ return;
+ }
+
+ List<FlowFile> graphList = new ArrayList<>();
+
+ String recordScript = context.getProperty(SUBMISSION_SCRIPT)
+ .evaluateAttributeExpressions(input)
+ .getValue();
+
+ Map<String, RecordPath> dynamic = new HashMap<>();
+
+ FlowFile finalInput = input;
+ context.getProperties()
+ .keySet().stream()
+ .filter(PropertyDescriptor::isDynamic)
+ .forEach(it ->
+ dynamic.put(it.getName(), recordPathCache.getCompiled(
+ context
+ .getProperty(it.getName())
+ .evaluateAttributeExpressions(finalInput)
+ .getValue()))
+ );
+
+
+ boolean failed = false;
+ long delta = 0;
+ try (InputStream is = session.read(input);
+ RecordReader reader =
recordReaderFactory.createRecordReader(input, is, getLogger());
+ ) {
+ Record record;
+
+ long start = System.currentTimeMillis();
+ while ((record = reader.nextRecord()) != null) {
+ FlowFile graph = session.create(input);
+
+ List<Map<String,Object>> graphResponses = new ArrayList<>();
+
+ Map<String, Object> dynamicPropertyMap = new HashMap<>();
+ for (String entry : dynamic.keySet()) {
+ if(!dynamicPropertyMap.containsKey(entry)) {
+ dynamicPropertyMap.put(entry,
getRecordValue(record, dynamic.get(entry)));
+ }
+ }
+
+ dynamicPropertyMap.putAll(input.getAttributes());
+ graphResponses.addAll(executeQuery(recordScript,
dynamicPropertyMap));
+
+ OutputStream graphOutputStream = session.write(graph);
+ String graphOutput =
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(graphResponses);
+
graphOutputStream.write(graphOutput.getBytes(StandardCharsets.UTF_8));
+ graphList.add(graph);
+ graphOutputStream.close();
+ }
+ long end = System.currentTimeMillis();
+ delta = (end - start) / 1000;
+ if (getLogger().isDebugEnabled()){
+ getLogger().debug(String.format("Took %s seconds.", delta));
+ }
+ } catch (Exception ex) {
+ getLogger().error("", ex);
+ failed = true;
+ } finally {
+ if (failed) {
+ graphList.forEach(session::remove);
+ session.transfer(input, FAILURE);
+ } else {
+ input = session.putAttribute(input, GRAPH_OPERATION_TIME,
String.valueOf(delta));
+ session.getProvenanceReporter().send(input,
clientService.getTransitUrl(), delta*1000);
+ session.transfer(input, SUCCESS);
+ graphList.forEach(it -> {
+ session.transfer(it, GRAPH);
+ });
+ }
+ }
+ }
+
+ private List<Map<String, Object>> executeQuery(String recordScript,
Map<String, Object> parameters) {
+ ObjectMapper mapper = new ObjectMapper();
+ List<Map<String, Object>> graphResponses = new ArrayList<>();
+ clientService.executeQuery(recordScript, parameters, (map, b) -> {
+ if (getLogger().isDebugEnabled()){
+ try {
+
getLogger().debug(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map));
+ } catch (JsonProcessingException ex) {
+ getLogger().error("Error converted map to JSON ", ex);
+ }
+ }
+ graphResponses.add(map);
+ });
+ return graphResponses;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 615c785..2ab7e95 100644
---
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.graph.ExecuteGraphQuery
+org.apache.nifi.processors.graph.ExecuteGraphQueryRecord
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
new file mode 100644
index 0000000..8c6cb96
--- /dev/null
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/ExecuteGraphQueryRecordTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph;
+
+import groovy.json.JsonOutput;
+import org.apache.nifi.processors.graph.util.InMemoryGraphClient;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+
+public class ExecuteGraphQueryRecordTest {
+ private TestRunner runner;
+ private JsonTreeReader reader;
+ private InMemoryGraphClient graphClient;
+ Map<String, String> enqueProperties = new HashMap<>();
+
+ @Before
+ public void setup() throws InitializationException {
+ MockRecordWriter writer = new MockRecordWriter();
+ reader = new JsonTreeReader();
+ runner = TestRunners.newTestRunner(ExecuteGraphQueryRecord.class);
+ runner.addControllerService("reader", reader);
+ runner.addControllerService("writer", writer);
+ runner.setProperty(ExecuteGraphQueryRecord.READER_SERVICE, "reader");
+ runner.setProperty(ExecuteGraphQueryRecord.WRITER_SERVICE, "writer");
+
+ runner.enableControllerService(writer);
+ runner.enableControllerService(reader);
+
+ graphClient = new InMemoryGraphClient();
+
+
+ runner.addControllerService("graphClient", graphClient);
+
+ runner.setProperty(ExecuteGraphQueryRecord.CLIENT_SERVICE,
"graphClient");
+ runner.enableControllerService(graphClient);
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT, "[
'testProperty': 'testResponse' ]");
+ runner.assertValid();
+ enqueProperties.put("graph.name", "graph");
+
+ }
+
+ @Test
+ public void testFlowFileContent() throws IOException {
+ List<Map> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("M", 1);
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript;
+ submissionScript = "[ 'M': M[0] ]";
+
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT,
submissionScript);
+ runner.setProperty("M", "/M");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph =
runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileContent.json"));
+ }
+
+ @Test
+ public void testFlowFileList() throws IOException {
+ List<Map> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("M", new ArrayList<Integer>(){
+ {
+ add(1);
+ add(2);
+ add(3);
+ }
+ });
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript = "[ " +
+ "'M': M[0] " +
+ "]";
+
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT,
submissionScript);
+ runner.setProperty("M", "/M");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph =
runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testFlowFileList.json"));
+ }
+
+ @Test
+ public void testComplexFlowFile() throws IOException {
+ List<Map> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("tMap", "123");
+ tempMap.put("L", new ArrayList<Integer>(){
+ {
+ add(1);
+ add(2);
+ add(3);
+ }
+ });
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript = "Map<String, Object> vertexHashes = new
HashMap()\n" +
+ "vertexHashes.put('1234', tMap[0])\n" +
+ "[ 'L': L[0], 'result': vertexHashes ]";
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT,
submissionScript);
+ runner.setProperty("tMap", "/tMap");
+ runner.setProperty("L", "/L");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph =
runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testComplexFlowFile.json"));
+ }
+
+ @Test
+ public void testAttributes() throws IOException {
+ List<Map<String, Object>> test = new ArrayList<>();
+ Map<String, Object> tempMap = new HashMap<>();
+ tempMap.put("tMap", "123");
+ test.add(tempMap);
+
+ byte[] json = JsonOutput.toJson(test).getBytes();
+ String submissionScript = "[ " +
+ "'testProperty': testProperty " +
+ "] ";
+ runner.setProperty(ExecuteGraphQueryRecord.SUBMISSION_SCRIPT,
submissionScript);
+ Map<String, String> enqueProperties = new HashMap<>();
+ enqueProperties.put("testProperty", "test");
+ runner.enqueue(json, enqueProperties);
+
+ runner.run();
+ runner.assertTransferCount(ExecuteGraphQueryRecord.GRAPH, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.SUCCESS, 1);
+ runner.assertTransferCount(ExecuteGraphQueryRecord.FAILURE, 0);
+ MockFlowFile relGraph =
runner.getFlowFilesForRelationship(ExecuteGraphQueryRecord.GRAPH).get(0);
+
relGraph.assertContentEquals(ExecuteGraphQueryRecordTest.class.getResourceAsStream("/testAttributes.json"));
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
new file mode 100644
index 0000000..a12d5b8
--- /dev/null
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/util/InMemoryGraphClient.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.graph.util;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.graph.GraphClientService;
+import org.apache.nifi.graph.GraphQueryResultCallback;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.janusgraph.core.JanusGraph;
+import org.janusgraph.core.JanusGraphFactory;
+
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+
+public class InMemoryGraphClient extends AbstractControllerService implements
GraphClientService {
+ private Graph graph;
+
+ @OnEnabled
+ void onEnabled(ConfigurationContext context) {
+ graph = buildGraph();
+ }
+
+ private static JanusGraph buildGraph() {
+ return JanusGraphFactory.build().set("storage.backend",
"inmemory").open();
+ }
+
+ @Override
+ public Map<String, String> executeQuery(String query, Map<String, Object>
parameters, GraphQueryResultCallback graphQueryResultCallback) {
+ ScriptEngine engine = new
ScriptEngineManager().getEngineByName("groovy");
+ parameters.entrySet().stream().forEach( it -> {
+ engine.put(it.getKey(), it.getValue());
+ });
+ if (graph == null) {
+ graph = buildGraph();
+ }
+ engine.put("graph", graph);
+ engine.put("g", graph.traversal());
+
+ Object response;
+ try {
+ response = engine.eval(query);
+ } catch (ScriptException ex) {
+ throw new ProcessException(ex);
+ }
+
+ if (response instanceof Map) {
+ //The below logic helps with the handling of complex Map<String,
Object> relationships
+ Map resultMap = (Map) response;
+ if (!resultMap.isEmpty()) {
+ // Convertex a resultMap to an entrySet iterator
+ Iterator outerResultSet = resultMap.entrySet().iterator();
+ // this loops over the outermost map
+ while(outerResultSet.hasNext()) {
+ Map.Entry<String, Object> innerResultSet =
(Map.Entry<String, Object>) outerResultSet.next();
+ // this is for edge case handling where innerResultSet is
also a Map
+ if (innerResultSet.getValue() instanceof Map) {
+ Iterator resultSet = ((Map)
innerResultSet.getValue()).entrySet().iterator();
+ // looping over each result in the inner map
+ while (resultSet.hasNext()) {
+ Map.Entry<String, Object> tempResult =
(Map.Entry<String, Object>) resultSet.next();
+ Map<String, Object> tempRetObject = new
HashMap<>();
+ tempRetObject.put(tempResult.getKey(),
tempResult.getValue());
+ SimpleEntry returnObject = new SimpleEntry<String,
Object>(tempResult.getKey(), tempRetObject);
+ Map<String, Object> resultReturnMap = new
HashMap<>();
+ resultReturnMap.put(innerResultSet.getKey(),
returnObject);
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(resultReturnMap.toString());
+ }
+ // return the object to the
graphQueryResultCallback object
+ graphQueryResultCallback.process(resultReturnMap,
resultSet.hasNext());
+ }
+ } else {
+ // for non-maps, return objects need to be a
map<string, object> this simply converts the object
+ // to a map to be return to the
graphQueryResultCallback object
+ Map<String, Object> resultReturnMap = new HashMap<>();
+ resultReturnMap.put(innerResultSet.getKey(),
innerResultSet.getValue());
+ graphQueryResultCallback.process(resultReturnMap,
false);
+ }
+ }
+
+ }
+ }
+
+ return new HashMap<>();
+ }
+
+ @Override
+ public String getTransitUrl() {
+ return "memory://localhost/graph";
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json
new file mode 100644
index 0000000..b8084ab
--- /dev/null
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testAttributes.json
@@ -0,0 +1,3 @@
+[ {
+ "testProperty" : "test"
+} ]
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json
new file mode 100644
index 0000000..19852fd
--- /dev/null
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testComplexFlowFile.json
@@ -0,0 +1,9 @@
+[ {
+ "L" : [ 1, 2, 3 ]
+}, {
+ "result" : {
+ "1234" : {
+ "1234" : "123"
+ }
+ }
+} ]
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json
new file mode 100644
index 0000000..fefbaf3
--- /dev/null
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileContent.json
@@ -0,0 +1,3 @@
+[ {
+ "M" : 1
+} ]
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json
new file mode 100644
index 0000000..cad1981
--- /dev/null
+++
b/nifi-nar-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/resources/testFlowFileList.json
@@ -0,0 +1,3 @@
+[ {
+ "M" : [ 1, 2, 3 ]
+} ]
\ No newline at end of file