STREAMS:344: streams-persist-neo4j

Squashed commit of the following:

commit 76207b1577a0fb6f05992c8700151223db20e4b3
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Jan 8 12:29:32 2017 -0600

    STREAMS-344: address PR feedback

    from https://github.com/apache/incubator-streams/pull/348

commit ee700fd16e8631bdb0fb453d686beef4167af13b
Author: Steve Blackmon <sblack...@apache.org>
Date:   Mon Jan 2 19:42:33 2017 -0600

    add constructor

commit 1f4e175cf84a208252d488c2858ea420af0642f9
Author: Steve Blackmon <sblack...@apache.org>
Date:   Mon Jan 2 18:11:01 2017 -0600

    new neo4j module with bolt:// and http:// support, and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4bd22317
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4bd22317
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4bd22317

Branch: refs/heads/master
Commit: 4bd22317ea3a67b7dfdc0c9d3aba96a71f712e3a
Parents: 7810361
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Authored: Sun Jan 8 12:36:18 2017 -0600
Committer: Steve Blackmon @steveblackmon <sblack...@apache.org>
Committed: Sun Jan 8 12:36:18 2017 -0600

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   3 +-
 streams-contrib/streams-persist-graph/pom.xml   |  20 +-
 .../streams/graph/GraphHttpPersistWriter.java   | 250 --------------
 .../apache/streams/graph/GraphVertexReader.java | 126 -------
 .../apache/streams/graph/HttpGraphHelper.java   |   4 +-
 .../apache/streams/graph/QueryGraphHelper.java  |   4 +-
 .../graph/neo4j/CypherQueryGraphHelper.java     | 238 -------------
 .../graph/neo4j/Neo4jHttpGraphHelper.java       |  75 ----
 .../streams/graph/GraphBinaryConfiguration.json |  28 --
 .../streams/graph/GraphConfiguration.json       |  22 --
 .../streams/graph/GraphHttpConfiguration.json   |  22 --
 .../graph/neo4j/CypherQueryResponse.json        |  43 ---
 .../graph/test/TestCypherQueryGraphHelper.java  | 116 -------
 .../graph/test/TestNeo4jHttpVertexReader.java   |  81 -----
 streams-contrib/streams-persist-neo4j/pom.xml   | 263 ++++++++++++++
 .../streams/neo4j/CypherQueryGraphHelper.java   | 344 +++++++++++++++++++
 .../apache/streams/neo4j/Neo4jPersistUtil.java  | 151 ++++++++
 .../streams/neo4j/bolt/Neo4jBoltClient.java     |  92 +++++
 .../neo4j/bolt/Neo4jBoltPersistReader.java      | 326 ++++++++++++++++++
 .../neo4j/bolt/Neo4jBoltPersistWriter.java      |  77 +++++
 .../streams/neo4j/http/Neo4jHttpClient.java     |  74 ++++
 .../neo4j/http/Neo4jHttpGraphHelper.java        | 104 ++++++
 .../neo4j/http/Neo4jHttpPersistReader.java      | 173 ++++++++++
 .../neo4j/http/Neo4jHttpPersistWriter.java      | 171 +++++++++
 .../streams/neo4j/CypherQueryResponse.json      |  43 +++
 .../streams/neo4j/Neo4jConfiguration.json       |  27 ++
 .../streams/neo4j/Neo4jReaderConfiguration.json |  17 +
 .../streams/neo4j/test/Neo4jBoltPersistIT.java  | 156 +++++++++
 .../streams/neo4j/test/Neo4jHttpPersistIT.java  | 138 ++++++++
 .../neo4j/test/TestCypherQueryGraphHelper.java  | 150 ++++++++
 .../src/test/resources/Neo4jBoltPersistIT.conf  |  20 ++
 .../src/test/resources/Neo4jHttpPersistIT.conf  |  20 ++
 .../apache/streams/data/util/PropertyUtil.java  | 124 -------
 .../org/apache/streams/util/PropertyUtil.java   | 130 +++++++
 .../util/schema/test/PropertyUtilTest.java      |  25 ++
 35 files changed, 2510 insertions(+), 1147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 8408cef..aed60c9 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -37,8 +37,8 @@
     </properties>
 
     <modules>
-        <module>streams-persist-console</module>
         <module>streams-persist-cassandra</module>
+        <module>streams-persist-console</module>
         <module>streams-persist-elasticsearch</module>
         <module>streams-persist-filebuffer</module>
         <module>streams-persist-hbase</module>
@@ -46,6 +46,7 @@
         <module>streams-persist-graph</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
+        <module>streams-persist-neo4j</module>
         <module>streams-amazon-aws</module>
         <module>streams-processor-jackson</module>
         <module>streams-processor-json</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-graph/pom.xml 
b/streams-contrib/streams-persist-graph/pom.xml
index b8db538..996c706 100644
--- a/streams-contrib/streams-persist-graph/pom.xml
+++ b/streams-contrib/streams-persist-graph/pom.xml
@@ -147,25 +147,7 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <includes>**/*.json</includes>
-                    
<outputDirectory>${project.build.directory}/test-classes</outputDirectory>
-                    <includeGroupIds>org.apache.streams</includeGroupIds>
-                    <includeTypes>test-jar</includeTypes>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test-resource-dependencies</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>unpack-dependencies</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
+
             <!-- revisit using this if streams bumps to jdk8 -->
             <!--<plugin>-->
                 <!--<groupId>com.github.harti2006</groupId>-->

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
deleted file mode 100644
index 5b2dec6..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
-import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Adds activityobjects as vertices and activities as edges to a graph 
database with
- * an http rest endpoint (such as neo4j).
- */
-public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
-
-  public static final String STREAMS_ID = 
GraphHttpPersistWriter.class.getCanonicalName();
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GraphHttpPersistWriter.class);
-  private static final long MAX_WRITE_LATENCY = 1000;
-
-  private GraphHttpConfiguration configuration;
-
-  private QueryGraphHelper queryGraphHelper;
-  private HttpGraphHelper httpGraphHelper;
-
-  private static ObjectMapper mapper;
-
-  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-  /**
-   * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from 
JVM 'graph'.
-   */
-  public GraphHttpPersistWriter() {
-    this(new 
ComponentConfigurator<>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-  }
-
-  /**
-   * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration.
-   * @param configuration GraphHttpConfiguration
-   */
-  public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
-    super(StreamsJacksonMapper.getInstance().convertValue(configuration, 
HttpPersistWriterConfiguration.class));
-    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-      super.configuration.setResourcePath("/db/" + configuration.getGraph() + 
"/transaction/commit/");
-    } else if ( 
configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
-      super.configuration.setResourcePath("/graphs/" + 
configuration.getGraph());
-    }
-    this.configuration = configuration;
-  }
-
-  @Override
-  protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
-
-    Activity activity = null;
-    ActivityObject activityObject;
-    Object document = entry.getDocument();
-
-    if (document instanceof Activity) {
-      activity = (Activity) document;
-      activityObject = activity.getObject();
-    } else if (document instanceof ActivityObject) {
-      activityObject = (ActivityObject) document;
-    } else {
-      ObjectNode objectNode;
-      if (document instanceof ObjectNode) {
-        objectNode = (ObjectNode) document;
-      } else if ( document instanceof String) {
-        try {
-          objectNode = mapper.readValue((String) document, ObjectNode.class);
-        } catch (IOException ex) {
-          LOGGER.error("Can't handle input: ", entry);
-          throw ex;
-        }
-      } else {
-        LOGGER.error("Can't handle input: ", entry);
-        throw new Exception("Can't create payload from datum.");
-      }
-
-      if ( objectNode.get("verb") != null ) {
-        try {
-          activity = mapper.convertValue(objectNode, Activity.class);
-          activityObject = activity.getObject();
-        } catch (Exception ex) {
-          activityObject = mapper.convertValue(objectNode, 
ActivityObject.class);
-        }
-      } else {
-        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
-      }
-    }
-
-    Preconditions.checkArgument(activity != null || activityObject != null);
-
-    ObjectNode request = mapper.createObjectNode();
-    ArrayNode statements = mapper.createArrayNode();
-
-    // always add vertices first
-
-    List<String> labels = Collections.singletonList("streams");
-
-    if ( activityObject != null ) {
-      if ( activityObject.getObjectType() != null ) {
-        labels.add(activityObject.getObjectType());
-      }
-      
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
-    }
-
-    if ( activity != null ) {
-
-      ActivityObject actor = activity.getActor();
-      Provider provider = activity.getProvider();
-
-      if (provider != null && StringUtils.isNotBlank(provider.getId())) {
-        labels.add(provider.getId());
-      }
-      if (actor != null && StringUtils.isNotBlank(actor.getId())) {
-        if (actor.getObjectType() != null) {
-          labels.add(actor.getObjectType());
-        }
-        
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor)));
-      }
-
-      if (activityObject != null && 
StringUtils.isNotBlank(activityObject.getId())) {
-        if (activityObject.getObjectType() != null) {
-          labels.add(activityObject.getObjectType());
-        }
-        
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
-      }
-
-      // then add edge
-
-      if (StringUtils.isNotBlank(activity.getVerb())) {
-        
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
-      }
-    }
-
-    request.put("statements", statements);
-    return request;
-
-  }
-
-  @Override
-  protected ObjectNode executePost(HttpPost httpPost) {
-
-    Objects.requireNonNull(httpPost);
-
-    ObjectNode result = null;
-
-    CloseableHttpResponse response = null;
-
-    String entityString = null;
-    try {
-      response = httpclient.execute(httpPost);
-      HttpEntity entity = response.getEntity();
-      if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 201 && entity != null) {
-        entityString = EntityUtils.toString(entity);
-        result = mapper.readValue(entityString, ObjectNode.class);
-      }
-      LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), 
response.getStatusLine().getStatusCode(), entityString);
-      if ( result == null
-           || (
-              result.get("errors") != null
-                  && result.get("errors").isArray()
-                  && result.get("errors").iterator().hasNext()
-              )
-          ) {
-        LOGGER.error("Write Error: " + result.get("errors"));
-      } else {
-        LOGGER.debug("Write Success");
-      }
-    } catch (IOException ex) {
-      LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, 
ex.getMessage());
-    } catch (Exception ex) {
-      LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), 
response, ex.getMessage());
-    } finally {
-      try {
-        if ( response != null) {
-          response.close();
-        }
-      } catch (IOException ignored) {
-        LOGGER.trace("ignored IOException", ignored);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void prepare(Object configurationObject) {
-
-    super.prepare(configuration);
-    mapper = StreamsJacksonMapper.getInstance();
-
-    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-      queryGraphHelper = new CypherQueryGraphHelper();
-      httpGraphHelper = new Neo4jHttpGraphHelper();
-    }
-
-    Objects.requireNonNull(queryGraphHelper);
-    Objects.requireNonNull(httpGraphHelper);
-  }
-
-  @Override
-  public void cleanUp() {
-
-    LOGGER.info("exiting");
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
deleted file mode 100644
index 9560083..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import org.apache.streams.components.http.HttpProviderConfiguration;
-import org.apache.streams.components.http.provider.SimpleHttpProvider;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.neo4j.CypherQueryResponse;
-import org.apache.streams.graph.neo4j.ItemData;
-import org.apache.streams.graph.neo4j.ItemMetadata;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Reads a stream of activityobjects from vertices in a graph database with
- * an http rest endpoint (such as neo4j).
- */
-public class GraphVertexReader extends SimpleHttpProvider implements 
StreamsPersistReader {
-
-  public static final String STREAMS_ID = 
GraphVertexReader.class.getCanonicalName();
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GraphVertexReader.class);
-
-  protected GraphReaderConfiguration configuration;
-
-  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-  /**
-   * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 
'graph'.
-   */
-  public GraphVertexReader() {
-    this(new 
ComponentConfigurator<>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-  }
-
-  /**
-   * GraphVertexReader constructor - use supplied GraphReaderConfiguration.
-   * @param configuration GraphReaderConfiguration
-   */
-  public GraphVertexReader(GraphReaderConfiguration configuration) {
-    super(mapper.convertValue(configuration, HttpProviderConfiguration.class));
-    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-      super.configuration.setResourcePath("/db/" + configuration.getGraph() + 
"/transaction/commit");
-    } else if ( 
configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
-      super.configuration.setResourcePath("/graphs/" + 
configuration.getGraph());
-    }
-    this.configuration = configuration;
-  }
-
-  /**
-   * Neo API query returns something like this:
-   * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { 
"data": { props }, etc... } ] ] }
-   *
-   * @param jsonNode jsonNode
-   * @return result
-   */
-  public List<ObjectNode> parse(JsonNode jsonNode) {
-    List<ObjectNode> results = new ArrayList<>();
-
-    ObjectNode root = (ObjectNode) jsonNode;
-
-    CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, 
CypherQueryResponse.class);
-
-    for ( List<List<ItemMetadata>> dataWrapper : 
cypherQueryResponse.getData()) {
-
-      for (List<ItemMetadata> itemMetadatas : dataWrapper) {
-
-        for (ItemMetadata itemMetadata : itemMetadatas) {
-
-          ItemData itemData = itemMetadata.getData();
-
-          LOGGER.debug("itemData: " + itemData);
-
-          
results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
-        }
-
-      }
-
-    }
-    return results;
-  }
-
-  @Override
-  public String getId() {
-    return STREAMS_ID;
-  }
-
-  @Override
-  public void prepare(Object configurationObject) {
-
-    super.prepare(configurationObject);
-
-  }
-
-  @Override
-  public StreamsResultSet readAll() {
-    return readCurrent();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
index ca1f4e4..804e9ff 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
@@ -29,6 +29,8 @@ import java.util.Map;
  */
 public interface HttpGraphHelper {
 
-  ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters);
+  ObjectNode readData(Pair<String, Map<String, Object>> queryPlusParameters);
+
+  ObjectNode writeData(Pair<String, Map<String, Object>> queryPlusParameters);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
index 1699aee..38ceb55 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
@@ -39,6 +39,8 @@ public interface QueryGraphHelper {
 
   public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject);
 
-  public Pair<String, Map<String, Object>> createEdgeRequest(Activity 
activity);
+  public Pair<String, Map<String, Object>> createActorObjectEdge(Activity 
activity);
+
+  public Pair<String, Map<String, Object>> createActorTargetEdge(Activity 
activity);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
deleted file mode 100644
index a361139..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.QueryGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.javatuples.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.stringtemplate.v4.ST;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Supporting class for interacting with neo4j via rest API
- */
-public class CypherQueryGraphHelper implements QueryGraphHelper {
-
-  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
-
-  private static final String getVertexLongIdStatementTemplate = "MATCH (v) 
WHERE ID(v) = <id> RETURN v";
-  private static final String getVertexStringIdStatementTemplate = "MATCH (v 
{id: '<id>'} ) RETURN v";
-
-  private static final String createVertexStatementTemplate =
-      "MATCH (x {id: '<id>'}) "
-          + "CREATE UNIQUE (v:<type> { props }) "
-          + "ON CREATE SET v <labels> "
-          + "RETURN v";
-
-
-
-  private static final String mergeVertexStatementTemplate =
-      "MERGE (v:<type> {id: '<id>'}) "
-          + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "
-          + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "
-          + "RETURN v";
-
-  private static final String createEdgeStatementTemplate =
-      "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "
-          + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "
-          + "RETURN r";
-
-  /**
-   * getVertexRequest.
-   * @param streamsId streamsId
-   * @return pair (streamsId, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
-
-    ST getVertex = new ST(getVertexStringIdStatementTemplate);
-    getVertex.add("id", streamsId);
-
-    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
-
-    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * getVertexRequest.
-   * @param vertexId numericId
-   * @return pair (streamsId, parameterMap)
-   */
-  @Override
-  public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
-
-    ST getVertex = new ST(getVertexLongIdStatementTemplate);
-    getVertex.add("id", vertexId);
-
-    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
-
-    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
-
-    return queryPlusParameters;
-
-  }
-
-  /**
-   * createVertexRequest.
-   * @param activityObject activityObject
-   * @return pair (query, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject 
activityObject) {
-
-    Objects.requireNonNull(activityObject.getObjectType());
-
-    List<String> labels = getLabels(activityObject);
-
-    ST createVertex = new ST(createVertexStatementTemplate);
-    createVertex.add("id", activityObject.getId());
-    createVertex.add("type", activityObject.getObjectType());
-
-    if ( labels.size() > 0 ) {
-      createVertex.add("labels", String.join(" ", labels));
-    }
-
-    String query = createVertex.render();
-
-    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
-    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(createVertex.render(), props);
-
-    LOGGER.debug("createVertexRequest: ({},{})", query, props);
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * mergeVertexRequest.
-   * @param activityObject activityObject
-   * @return pair (query, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject) {
-
-    Objects.requireNonNull(activityObject.getObjectType());
-
-    Pair queryPlusParameters = new Pair(null, new HashMap<>());
-
-    List<String> labels = getLabels(activityObject);
-
-    ST mergeVertex = new ST(mergeVertexStatementTemplate);
-    mergeVertex.add("id", activityObject.getId());
-    mergeVertex.add("type", activityObject.getObjectType());
-    if ( labels.size() > 0 ) {
-      mergeVertex.add("labels", String.join(" ", labels));
-    }
-    String query = mergeVertex.render();
-
-    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
-    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-    LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
-
-    queryPlusParameters = queryPlusParameters.setAt0(query);
-    queryPlusParameters = queryPlusParameters.setAt1(props);
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * createEdgeRequest.
-   * @param activity activity
-   * @return pair (query, parameterMap)
-   */
-  public Pair<String, Map<String, Object>> createEdgeRequest(Activity 
activity) {
-
-    Pair queryPlusParameters = new Pair(null, new HashMap<>());
-
-    ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
-    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-    ST mergeEdge = new ST(createEdgeStatementTemplate);
-    mergeEdge.add("s_id", activity.getActor().getId());
-    mergeEdge.add("s_type", activity.getActor().getObjectType());
-    mergeEdge.add("d_id", activity.getObject().getId());
-    mergeEdge.add("d_type", activity.getObject().getObjectType());
-    mergeEdge.add("r_id", activity.getId());
-    mergeEdge.add("r_type", activity.getVerb());
-    mergeEdge.add("r_props", getPropertyCreater(props));
-
-    // set the activityObject's and extensions null, because their properties 
don't need to appear on the relationship
-    activity.setActor(null);
-    activity.setObject(null);
-    activity.setTarget(null);
-    activity.getAdditionalProperties().put("extensions", null);
-
-    String statement = mergeEdge.render();
-    queryPlusParameters = queryPlusParameters.setAt0(statement);
-    queryPlusParameters = queryPlusParameters.setAt1(props);
-
-    LOGGER.debug("createEdgeRequest: ({},{})", statement, props);
-
-    return queryPlusParameters;
-  }
-
-  /**
-   * getPropertyCreater.
-   * @param map paramMap
-   * @return PropertyCreater string
-   */
-  public static String getPropertyCreater(Map<String, Object> map) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("{");
-    List<String> parts = new ArrayList<>();
-    for ( Map.Entry<String, Object> entry : map.entrySet()) {
-      if ( entry.getValue() instanceof String ) {
-        String propVal = (String) (entry.getValue());
-        parts.add("`" + entry.getKey() + "`:'" + propVal + "'");
-      }
-    }
-    builder.append(String.join(",", parts));
-    builder.append("}");
-    return builder.toString();
-  }
-
-  private List<String> getLabels(ActivityObject activityObject) {
-    List<String> labels = Collections.singletonList(":streams");
-    if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
-      List<String> extraLabels = 
(List<String>)activityObject.getAdditionalProperties().get("labels");
-      for ( String extraLabel : extraLabels ) {
-        labels.add(":" + extraLabel);
-      }
-    }
-    return labels;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
deleted file mode 100644
index 9f47058..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import org.apache.streams.graph.HttpGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.javatuples.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Supporting class for interacting with neo4j via rest API.
- */
-public class Neo4jHttpGraphHelper implements HttpGraphHelper {
-
-  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
-
-  private static final String statementKey = "statement";
-  private static final String paramsKey = "parameters";
-  private static final String propsKey = "props";
-
-  /**
-   * createHttpRequest neo4j rest json payload.
-   *
-   * @param queryPlusParameters (query, parameter map)
-   * @return ObjectNode
-   */
-  public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters) {
-
-    LOGGER.debug("createHttpRequest: ", queryPlusParameters);
-
-    Objects.requireNonNull(queryPlusParameters);
-    Objects.requireNonNull(queryPlusParameters.getValue0());
-    Objects.requireNonNull(queryPlusParameters.getValue1());
-
-    ObjectNode request = MAPPER.createObjectNode();
-
-    request.put(statementKey, queryPlusParameters.getValue0());
-
-    ObjectNode params = MAPPER.createObjectNode();
-    ObjectNode props = MAPPER.convertValue(queryPlusParameters.getValue1(), 
ObjectNode.class);
-
-    params.put(propsKey, props);
-    request.put(paramsKey, params);
-
-    LOGGER.debug("createHttpRequest: ", request);
-
-    return request;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
deleted file mode 100644
index 04a70e1..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json
+++ /dev/null
@@ -1,28 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0";
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphBinaryConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "type": {
-            "type": "string",
-            "description": "Graph DB type",
-            "enum" : ["neo4j", "gremlin"]
-        },
-        "file": {
-            "type": "string",
-            "description": "New Graph DB File"
-        },
-        "indexFields": {
-            "type": "array",
-            "items": {
-                "type": "string"
-            },
-            "description": "Fields to index under streams label"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
deleted file mode 100644
index de92443..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0";
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphConfiguration",
-    "extends" : 
{"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "type": {
-            "type": "string",
-            "description": "Graph DB type",
-            "enum" : ["neo4j", "rexster"]
-        },
-        "graph": {
-            "type": "string",
-            "description": "Graph DB Graph ID"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
deleted file mode 100644
index c63e0fb..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0";
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphHttpConfiguration",
-    "extends" : 
{"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "type": {
-            "type": "string",
-            "description": "Graph DB type",
-            "enum" : ["neo4j", "rexster"]
-        },
-        "graph": {
-            "type": "string",
-            "description": "Graph DB Graph ID"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
deleted file mode 100644
index 77c6fd7..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json
+++ /dev/null
@@ -1,43 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0";
-    ],
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.neo4j.CypherQueryResponse",
-    "properties": {
-        "columns": {
-            "type": "array",
-            "id": "http://jsonschema.net/columns";,
-            "required": false,
-            "items": {
-                "type": "string",
-                "id": "http://jsonschema.net/columns/0";,
-                "required": false
-            }
-        },
-        "data": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "array",
-                "required": false,
-                "items": {
-                    "type": "array",
-                    "required": false,
-                    "items": {
-                        "type": "object",
-                        "javaType" : 
"org.apache.streams.graph.neo4j.ItemMetadata",
-                        "properties": {
-                            "data": {
-                                "type": "object",
-                                "javaType" : 
"org.apache.streams.graph.neo4j.ItemData"
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
deleted file mode 100644
index c29c8b7..0000000
--- 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.test;
-
-import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-
-import org.javatuples.Pair;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- * TestCypherQueryGraphHelper tests
- * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper
- */
-public class TestCypherQueryGraphHelper {
-
-  CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
-
-  @Test
-  public void getVertexRequestIdTest() throws Exception {
-
-    Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest("id");
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-
-  }
-
-  @Test
-  public void getVertexRequestLongTest() throws Exception {
-
-    Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest(new Long(1));
-
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-
-  }
-
-  @Test
-  public void createVertexRequestTest() throws Exception {
-
-    ActivityObject activityObject = new ActivityObject();
-    activityObject.setId("id");
-    activityObject.setObjectType("type");
-    activityObject.setContent("content");
-
-    Pair<String, Map<String, Object>> queryAndParams = 
helper.createVertexRequest(activityObject);
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-    assert(queryAndParams.getValue1() != null);
-
-  }
-
-  @Test
-  public void mergeVertexRequestTest() throws Exception {
-
-    ActivityObject activityObject = new ActivityObject();
-    activityObject.setId("id");
-    activityObject.setObjectType("type");
-    activityObject.setContent("content");
-
-    Pair<String, Map<String, Object>> queryAndParams = 
helper.mergeVertexRequest(activityObject);
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-    assert(queryAndParams.getValue1() != null);
-
-  }
-
-  @Test
-  public void createEdgeRequestTest() throws Exception {
-
-    ActivityObject actor = new ActivityObject();
-    actor.setId("actor");
-    actor.setObjectType("type");
-    actor.setContent("content");
-
-    ActivityObject object = new ActivityObject();
-    object.setId("object");
-    object.setObjectType("type");
-    object.setContent("content");
-
-    Activity activity = new Activity();
-    activity.setId("activity");
-    activity.setVerb("verb");
-    activity.setContent("content");
-
-    activity.setActor(actor);
-    activity.setObject(object);
-
-    Pair<String, Map<String, Object>> queryAndParams = 
helper.createEdgeRequest(activity);
-
-    assert(queryAndParams != null);
-    assert(queryAndParams.getValue0() != null);
-    assert(queryAndParams.getValue1() != null);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
deleted file mode 100644
index 24ddd65..0000000
--- 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.test;
-
-import org.apache.streams.graph.GraphHttpConfiguration;
-import org.apache.streams.graph.GraphReaderConfiguration;
-import org.apache.streams.graph.GraphVertexReader;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.IOUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-/**
- * Unit test for {@link org.apache.streams.graph.GraphVertexReader}
- *
- * Test that graph db responses can be converted to streams data.
- */
-public class TestNeo4jHttpVertexReader {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class);
-
-  private static final ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
-
-  private JsonNode sampleReaderResult;
-
-  private GraphReaderConfiguration testConfiguration;
-
-  private GraphVertexReader graphPersistReader;
-
-  @Before
-  public void prepareTest() throws IOException {
-
-    testConfiguration = new GraphReaderConfiguration();
-    testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
-
-    graphPersistReader = new GraphVertexReader(testConfiguration);
-    InputStream testActivityFileStream = 
TestNeo4jHttpVertexReader.class.getClassLoader()
-        .getResourceAsStream("sampleReaderResult.json");
-    String sampleText = IOUtils.toString(testActivityFileStream, "utf-8");
-    sampleReaderResult = mapper.readValue(sampleText, JsonNode.class);
-
-  }
-
-  @Test
-  public void testParseNeoResult() throws IOException {
-
-    List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult);
-
-    assert( result.size() == 10);
-
-    for( int i = 0 ; i < 10; i++ )
-      assert( result.get(i).get("extensions").size() == 5);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-neo4j/pom.xml 
b/streams-contrib/streams-persist-neo4j/pom.xml
new file mode 100644
index 0000000..d117558
--- /dev/null
+++ b/streams-contrib/streams-persist-neo4j/pom.xml
@@ -0,0 +1,263 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.5-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-neo4j</artifactId>
+    <name>streams-persist-neo4j</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-converters</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-graph</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>stringtemplate</artifactId>
+            <version>4.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.javatuples</groupId>
+            <artifactId>javatuples</artifactId>
+            <version>1.2</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.neo4j.driver</groupId>
+            <artifactId>neo4j-java-driver</artifactId>
+            <version>1.0.6</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-schema-activitystreams</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-testing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.streams.plugins</groupId>
+                <artifactId>streams-plugin-pojo</artifactId>
+                <version>${project.version}</version>
+                <configuration>
+                    <sourcePaths>
+                        
<sourcePath>${project.basedir}/src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    
<targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory>
+                    
<targetPackage>org.apache.streams.graph.pojo</targetPackage>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.streams</groupId>
+                        <artifactId>streams-http</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <includes>**/*.json</includes>
+                    
<outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                    <includeGroupIds>org.apache.streams</includeGroupIds>
+                    <includeTypes>test-jar</includeTypes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test-resource-dependencies</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>dockerITs</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+                <property>
+                    <name>skipITs</name>
+                    <value>false</value>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>io.fabric8</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                        <configuration combine.self="override">
+                            <watchInterval>500</watchInterval>
+                            <logDate>default</logDate>
+                            <verbose>true</verbose>
+                            <autoPull>on</autoPull>
+                            <images>
+                                <image>
+                                    <name>neo4j:3.0.6</name>
+                                    <alias>neo4j</alias>
+                                    <run>
+                                        <env>
+                                            <NEO4J_AUTH>none</NEO4J_AUTH>
+                                        </env>
+                                        <namingStrategy>none</namingStrategy>
+                                        <ports>
+                                            
<port>${neo4j.http.host}:${neo4j.http.port}:7474</port>
+                                            
<port>${neo4j.tcp.host}:${neo4j.tcp.port}:7687</port>
+                                        </ports>
+                                        
<portPropertyFile>neo4j.properties</portPropertyFile>
+                                        <wait>
+                                            <log>neo4j startup</log>
+                                            <http>
+                                                
<url>http://${neo4j.http.host}:${neo4j.http.port}</url>
+                                                <method>GET</method>
+                                                <status>200</status>
+                                            </http>
+                                            <time>20000</time>
+                                            <kill>1000</kill>
+                                            <shutdown>500</shutdown>
+                                            <!--<tcp>-->
+                                            
<!--<host>${es.transport.host}</host>-->
+                                            <!--<ports>-->
+                                            
<!--<port>${es.transport.port}</port>-->
+                                            <!--</ports>-->
+                                            <!--</tcp>-->
+                                        </wait>
+                                        <log>
+                                            <enabled>true</enabled>
+                                            <date>default</date>
+                                            <color>cyan</color>
+                                        </log>
+                                    </run>
+                                    <watch>
+                                        <mode>none</mode>
+                                    </watch>
+                                </image>
+
+                            </images>
+                        </configuration>
+
+                    </plugin>
+
+                </plugins>
+            </build>
+
+        </profile>
+    </profiles>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
new file mode 100644
index 0000000..c117c16
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.neo4j;
+
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.util.PropertyUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.stringtemplate.v4.ST;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Supporting class for interacting with neo4j via rest API
+ */
+public class CypherQueryGraphHelper implements QueryGraphHelper, Serializable {
+
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CypherQueryGraphHelper.class);
+
+  public static final String getVertexLongIdStatementTemplate = "MATCH (v) 
WHERE ID(v) = <id> RETURN v";
+  public static final String getVertexStringIdStatementTemplate = "MATCH (v 
{id: '<id>'} ) RETURN v";
+  public static final String getVerticesLabelIdStatementTemplate = "MATCH 
(v:<type>) RETURN v";
+
+  public final static String createVertexStatementTemplate = "MATCH (x {id: 
'<id>'}) "+
+      "CREATE UNIQUE (v:`<type>` { props }) "+
+      "ON CREATE SET v <labels> "+
+      "RETURN v";
+
+  public final static String mergeVertexStatementTemplate = "MERGE (v:`<type>` 
{id: '<id>'}) "+
+      "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+      "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
+      "RETURN v";
+
+  public final static String createEdgeStatementTemplate = "MATCH 
(s:`<s_type>` {id: '<s_id>'}),(d:`<d_type>` {id: '<d_id>'}) "+
+      "CREATE UNIQUE (s)-[r:`<r_type>` <r_props>]->(d) "+
+      "RETURN r";
+
+  public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
+
+    ST getVertex = new ST(getVertexStringIdStatementTemplate);
+    getVertex.add("id", streamsId);
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
+
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * getVertexRequest.
+   * @param vertexId numericId
+   * @return pair (streamsId, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
+
+    ST getVertex = new ST(getVertexLongIdStatementTemplate);
+    getVertex.add("id", vertexId);
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
+
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+    return queryPlusParameters;
+
+  }
+
+  /**
+   * createVertexRequest.
+   * @param activityObject activityObject
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject 
activityObject) {
+
+    Objects.requireNonNull(activityObject.getObjectType());
+
+    List<String> labels = getLabels(activityObject);
+
+    ST createVertex = new ST(createVertexStatementTemplate);
+    createVertex.add("id", activityObject.getId());
+    createVertex.add("type", activityObject.getObjectType());
+
+    if ( labels.size() > 0 ) {
+      createVertex.add("labels", String.join(" ", labels));
+    }
+
+    String query = createVertex.render();
+
+    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(createVertex.render(), props);
+
+    LOGGER.debug("createVertexRequest: ({},{})", query, props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * getVerticesRequest gets all vertices with a label.
+   * @param labelId labelId
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> getVerticesRequest(String labelId) {
+    ST getVertex = new ST(getVerticesLabelIdStatementTemplate);
+    getVertex.add("type", labelId);
+
+    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
+
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * mergeVertexRequest.
+   * @param activityObject activityObject
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject) {
+
+    Objects.requireNonNull(activityObject.getObjectType());
+
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+    List<String> labels = getLabels(activityObject);
+
+    ST mergeVertex = new ST(mergeVertexStatementTemplate);
+    mergeVertex.add("id", activityObject.getId());
+    mergeVertex.add("type", activityObject.getObjectType());
+    if ( labels.size() > 0 ) {
+      mergeVertex.add("labels", String.join(" ", labels));
+    }
+    String query = mergeVertex.render();
+
+    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
+
+    queryPlusParameters = queryPlusParameters.setAt0(query);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * createActorObjectEdge.
+   * @param activity activity
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createActorObjectEdge(Activity 
activity) {
+
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+    ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    ST mergeEdge = new ST(createEdgeStatementTemplate);
+    mergeEdge.add("s_id", activity.getActor().getId());
+    mergeEdge.add("s_type", activity.getActor().getObjectType());
+    mergeEdge.add("d_id", activity.getObject().getId());
+    mergeEdge.add("d_type", activity.getObject().getObjectType());
+    mergeEdge.add("r_id", activity.getId());
+    mergeEdge.add("r_type", activity.getVerb());
+    mergeEdge.add("r_props", getActorObjectEdgePropertyCreater(props));
+
+    String statement = mergeEdge.render();
+    queryPlusParameters = queryPlusParameters.setAt0(statement);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    LOGGER.debug("createActorObjectEdge: ({},{})", statement, props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * createActorTargetEdge.
+   * @param activity activity
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createActorTargetEdge(Activity 
activity) {
+
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
+
+    ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    ST mergeEdge = new ST(createEdgeStatementTemplate);
+    mergeEdge.add("s_id", activity.getActor().getId());
+    mergeEdge.add("s_type", activity.getActor().getObjectType());
+    mergeEdge.add("d_id", activity.getTarget().getId());
+    mergeEdge.add("d_type", activity.getTarget().getObjectType());
+    mergeEdge.add("r_id", activity.getId());
+    mergeEdge.add("r_type", activity.getVerb());
+    mergeEdge.add("r_props", getActorTargetEdgePropertyCreater(props));
+
+    String statement = mergeEdge.render();
+    queryPlusParameters = queryPlusParameters.setAt0(statement);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    LOGGER.debug("createActorObjectEdge: ({},{})", statement, props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * getPropertyValueSetter.
+   * @param map paramMap
+   * @return PropertyValueSetter string
+   */
+  public static String getPropertyValueSetter(Map<String, Object> map, String 
symbol) {
+    StringBuilder builder = new StringBuilder();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String)(entry.getValue());
+        builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + 
StringEscapeUtils.escapeJava(propVal) + "'");
+      }
+    }
+    return builder.toString();
+  }
+
+  /**
+   * getPropertyParamSetter.
+   * @param map paramMap
+   * @return PropertyParamSetter string
+   */
+  public static String getPropertyParamSetter(Map<String, Object> map, String 
symbol) {
+    StringBuilder builder = new StringBuilder();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String)(entry.getValue());
+        builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + 
StringEscapeUtils.escapeJava(propVal) + "'");
+      }
+    }
+    return builder.toString();
+  }
+
+  /**
+   * getPropertyCreater.
+   * @param map paramMap
+   * @return PropertyCreater string
+   */
+  public static String getPropertyCreater(Map<String, Object> map) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{ ");
+    List<String> parts = new ArrayList<>();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String) (entry.getValue());
+        parts.add("`"+entry.getKey() + "`:'" + 
StringEscapeUtils.escapeJava(propVal) + "'");
+      }
+    }
+    builder.append(String.join(" ", parts));
+    builder.append(" }");
+    return builder.toString();
+  }
+
+  private String getActorObjectEdgePropertyCreater(Map<String, Object> map) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{ ");
+    List<String> parts = new ArrayList<>();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String) (entry.getValue());
+        if( !entry.getKey().contains(".")) {
+          parts.add("`"+entry.getKey() + "`: '" + 
StringEscapeUtils.escapeJava(propVal) + "'");
+        }
+      }
+    }
+    builder.append(String.join(", ", parts));
+    builder.append(" }");
+    return builder.toString();
+  }
+
+  private String getActorTargetEdgePropertyCreater(Map<String, Object> map) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{ ");
+    List<String> parts = new ArrayList<>();
+    for( Map.Entry<String, Object> entry : map.entrySet()) {
+      if( entry.getValue() instanceof String ) {
+        String propVal = (String) (entry.getValue());
+        if( !entry.getKey().contains(".")) {
+          parts.add("`"+entry.getKey() + "`: '" + 
StringEscapeUtils.escapeJava(propVal) + "'");
+        } else if( entry.getKey().startsWith("object.") && 
!entry.getKey().contains(".id")) {
+          parts.add("`"+entry.getKey().substring("object.".length()) + "`: '" 
+ StringEscapeUtils.escapeJava(propVal) + "'");
+        }
+      }
+    }
+    builder.append(String.join(", ", parts));
+    builder.append(" }");
+    return builder.toString();
+  }
+
+  /**
+   * getLabels.
+   * @param activityObject activityObject
+   * @return PropertyCreater string
+   */
+  public static List<String> getLabels(ActivityObject activityObject) {
+    List<String> labels = Collections.singletonList(":streams");
+    if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
+      List<String> extraLabels = 
(List<String>)activityObject.getAdditionalProperties().get("labels");
+      for ( String extraLabel : extraLabels ) {
+        labels.add(":" + extraLabel);
+      }
+    }
+    return labels;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
new file mode 100644
index 0000000..6058c66
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
@@ -0,0 +1,151 @@
+package org.apache.streams.neo4j;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import org.apache.commons.lang3.StringUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by steve on 1/2/17.
+ */
+public class Neo4jPersistUtil {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
+
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
+
+  public static List<Pair<String, Map<String, Object>>> 
prepareStatements(StreamsDatum entry) throws Exception {
+
+    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
+
+    String id = entry.getId();
+    Activity activity = null;
+    ActivityObject activityObject = null;
+    Object document = entry.getDocument();
+
+    if (document instanceof Activity) {
+      activity = (Activity) document;
+    } else if (document instanceof ActivityObject) {
+      activityObject = (ActivityObject) document;
+    } else {
+      ObjectNode objectNode;
+      if (document instanceof ObjectNode) {
+        objectNode = (ObjectNode) document;
+      } else if ( document instanceof String) {
+        try {
+          objectNode = mapper.readValue((String) document, ObjectNode.class);
+        } catch (IOException ex) {
+          LOGGER.error("Can't handle input: ", entry);
+          throw ex;
+        }
+      } else {
+        LOGGER.error("Can't handle input: ", entry);
+        throw new Exception("Can't create statements from datum.");
+      }
+
+      if ( objectNode.get("verb") != null ) {
+        try {
+          activity = mapper.convertValue(objectNode, Activity.class);
+          activityObject = activity.getObject();
+        } catch (Exception ex) {
+          activityObject = mapper.convertValue(objectNode, 
ActivityObject.class);
+        }
+      } else {
+        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
+      }
+
+    }
+
+    Preconditions.checkArgument(activity != null ^ activityObject != null);
+
+    if ( activityObject != null && 
!Strings.isNullOrEmpty(activityObject.getId())) {
+
+      statements.add(vertexStatement(activityObject));
+
+    } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) {
+
+      statements.addAll(vertexStatements(activity));
+
+      statements.addAll(edgeStatements(activity));
+
+    }
+
+    return statements;
+  }
+
+  public static List<Pair<String, Map<String, Object>>> 
vertexStatements(Activity activity) {
+    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
+    ActivityObject actor = activity.getActor();
+    ActivityObject object = activity.getObject();
+    ActivityObject target = activity.getTarget();
+
+    if (actor != null && StringUtils.isNotBlank(actor.getId())) {
+      Pair<String, Map<String, Object>> actorStatement = 
vertexStatement(actor);
+      statements.add(actorStatement);
+    }
+
+    if (object != null && StringUtils.isNotBlank(object.getId())) {
+      Pair<String, Map<String, Object>> objectStatement = 
vertexStatement(object);
+      statements.add(objectStatement);
+    }
+
+    if (target != null && StringUtils.isNotBlank(target.getId())) {
+      Pair<String, Map<String, Object>> targetStatement = 
vertexStatement(target);
+      statements.add(targetStatement);
+    }
+
+    return statements;
+  }
+
+  public static List<Pair<String, Map<String, Object>>> 
edgeStatements(Activity activity) {
+    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();;
+    ActivityObject actor = activity.getActor();
+    ActivityObject object = activity.getObject();
+    ActivityObject target = activity.getTarget();
+
+    if (StringUtils.isNotBlank(actor.getId()) && object != null && 
StringUtils.isNotBlank(object.getId())) {
+      Pair<String, Map<String, Object>> actorObjectEdgeStatement = 
helper.createActorObjectEdge(activity);
+      Map<String, Object> props = new HashMap<>();
+      props.put("props", actorObjectEdgeStatement.getValue1());
+      actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
+      statements.add(actorObjectEdgeStatement);
+    }
+
+    if (StringUtils.isNotBlank(actor.getId()) && target != null && 
StringUtils.isNotBlank(target.getId())) {
+      Pair<String, Map<String, Object>> actorTargetEdgeStatement = 
helper.createActorTargetEdge(activity);
+      Map<String, Object> props = new HashMap<>();
+      props.put("props", actorTargetEdgeStatement.getValue1());
+      actorTargetEdgeStatement = actorTargetEdgeStatement.setAt1(props);
+      statements.add(actorTargetEdgeStatement);
+    }
+
+    return statements;
+  }
+
+  public static Pair<String, Map<String, Object>> 
vertexStatement(ActivityObject activityObject) {
+    Pair<String, Map<String, Object>> mergeVertexRequest = 
helper.mergeVertexRequest(activityObject);
+    Map<String, Object> props = new HashMap<>();
+    props.put("props", mergeVertexRequest.getValue1());
+    mergeVertexRequest = mergeVertexRequest.setAt1(props);
+    return mergeVertexRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
new file mode 100644
index 0000000..9bfc049
--- /dev/null
+++ 
b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java
@@ -0,0 +1,92 @@
+package org.apache.streams.neo4j.bolt;
+
+import org.apache.streams.neo4j.Neo4jConfiguration;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.neo4j.driver.v1.AuthToken;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class Neo4jBoltClient {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(Neo4jBoltClient.class);
+
+    private Driver client;
+
+    public Neo4jConfiguration config;
+
+    private Neo4jBoltClient(Neo4jConfiguration neo4jConfiguration) {
+        this.config = neo4jConfiguration;
+        try {
+            this.start();
+        } catch (Exception e) {
+            e.printStackTrace();
+            this.client = null;
+        }
+    }
+
+    private static Map<Neo4jConfiguration, Neo4jBoltClient> INSTANCE_MAP = new 
ConcurrentHashMap<Neo4jConfiguration, Neo4jBoltClient>();
+
+    public static Neo4jBoltClient getInstance(Neo4jConfiguration 
neo4jConfiguration) {
+        if ( INSTANCE_MAP != null &&
+             INSTANCE_MAP.size() > 0 &&
+             INSTANCE_MAP.containsKey(neo4jConfiguration)) {
+            return INSTANCE_MAP.get(neo4jConfiguration);
+        } else {
+            Neo4jBoltClient instance = new Neo4jBoltClient(neo4jConfiguration);
+            if( instance != null && instance.client != null ) {
+                INSTANCE_MAP.put(neo4jConfiguration, instance);
+                return instance;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public void start() throws Exception {
+
+        Objects.nonNull(config);
+        assertThat("config.getScheme().startsWith(\"tcp\")", 
config.getScheme().startsWith("tcp"));
+
+        LOGGER.info("Neo4jConfiguration.start {}", config);
+
+        AuthToken authToken = null;
+        if( StringUtils.isNotBlank(config.getUsername()) && 
StringUtils.isNotBlank(config.getPassword())) {
+            authToken = AuthTokens.basic( config.getUsername(), 
config.getPassword() );
+        }
+
+        if( authToken == null ) {
+            client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) 
+ ":" + config.getPort());
+        } else {
+            client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) 
+ ":" + config.getPort(), authToken);
+        }
+
+        Objects.nonNull(client);
+
+    }
+
+    public void stop() throws Exception {
+        this.client.session().close();
+        this.client = null;
+    }
+
+    public Neo4jConfiguration config() {
+        return config;
+    }
+
+    public Driver client() {
+        return client;
+    }
+}

Reply via email to