STREAMS:344: streams-persist-neo4j Squashed commit of the following:
commit 76207b1577a0fb6f05992c8700151223db20e4b3 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Jan 8 12:29:32 2017 -0600 STREAMS-344: address PR feedback from https://github.com/apache/incubator-streams/pull/348 commit ee700fd16e8631bdb0fb453d686beef4167af13b Author: Steve Blackmon <sblack...@apache.org> Date: Mon Jan 2 19:42:33 2017 -0600 add constructor commit 1f4e175cf84a208252d488c2858ea420af0642f9 Author: Steve Blackmon <sblack...@apache.org> Date: Mon Jan 2 18:11:01 2017 -0600 new neo4j module with bolt:// and http:// support, and tests Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4bd22317 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4bd22317 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4bd22317 Branch: refs/heads/master Commit: 4bd22317ea3a67b7dfdc0c9d3aba96a71f712e3a Parents: 7810361 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Authored: Sun Jan 8 12:36:18 2017 -0600 Committer: Steve Blackmon @steveblackmon <sblack...@apache.org> Committed: Sun Jan 8 12:36:18 2017 -0600 ---------------------------------------------------------------------- streams-contrib/pom.xml | 3 +- streams-contrib/streams-persist-graph/pom.xml | 20 +- .../streams/graph/GraphHttpPersistWriter.java | 250 -------------- .../apache/streams/graph/GraphVertexReader.java | 126 ------- .../apache/streams/graph/HttpGraphHelper.java | 4 +- .../apache/streams/graph/QueryGraphHelper.java | 4 +- .../graph/neo4j/CypherQueryGraphHelper.java | 238 ------------- .../graph/neo4j/Neo4jHttpGraphHelper.java | 75 ---- .../streams/graph/GraphBinaryConfiguration.json | 28 -- .../streams/graph/GraphConfiguration.json | 22 -- .../streams/graph/GraphHttpConfiguration.json | 22 -- .../graph/neo4j/CypherQueryResponse.json | 43 --- .../graph/test/TestCypherQueryGraphHelper.java | 116 ------- .../graph/test/TestNeo4jHttpVertexReader.java | 81 ----- streams-contrib/streams-persist-neo4j/pom.xml | 263 ++++++++++++++ .../streams/neo4j/CypherQueryGraphHelper.java | 344 +++++++++++++++++++ .../apache/streams/neo4j/Neo4jPersistUtil.java | 151 ++++++++ .../streams/neo4j/bolt/Neo4jBoltClient.java | 92 +++++ .../neo4j/bolt/Neo4jBoltPersistReader.java | 326 ++++++++++++++++++ .../neo4j/bolt/Neo4jBoltPersistWriter.java | 77 +++++ .../streams/neo4j/http/Neo4jHttpClient.java | 74 ++++ .../neo4j/http/Neo4jHttpGraphHelper.java | 104 ++++++ .../neo4j/http/Neo4jHttpPersistReader.java | 173 ++++++++++ .../neo4j/http/Neo4jHttpPersistWriter.java | 171 +++++++++ .../streams/neo4j/CypherQueryResponse.json | 43 +++ .../streams/neo4j/Neo4jConfiguration.json | 27 ++ .../streams/neo4j/Neo4jReaderConfiguration.json | 17 + .../streams/neo4j/test/Neo4jBoltPersistIT.java | 156 +++++++++ .../streams/neo4j/test/Neo4jHttpPersistIT.java | 138 ++++++++ .../neo4j/test/TestCypherQueryGraphHelper.java | 150 ++++++++ .../src/test/resources/Neo4jBoltPersistIT.conf | 20 ++ .../src/test/resources/Neo4jHttpPersistIT.conf | 20 ++ .../apache/streams/data/util/PropertyUtil.java | 124 ------- .../org/apache/streams/util/PropertyUtil.java | 130 +++++++ .../util/schema/test/PropertyUtilTest.java | 25 ++ 35 files changed, 2510 insertions(+), 1147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 8408cef..aed60c9 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -37,8 +37,8 @@ </properties> <modules> - <module>streams-persist-console</module> <module>streams-persist-cassandra</module> + <module>streams-persist-console</module> <module>streams-persist-elasticsearch</module> <module>streams-persist-filebuffer</module> <module>streams-persist-hbase</module> @@ -46,6 +46,7 @@ <module>streams-persist-graph</module> <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> + <module>streams-persist-neo4j</module> <module>streams-amazon-aws</module> <module>streams-processor-jackson</module> <module>streams-processor-json</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml index b8db538..996c706 100644 --- a/streams-contrib/streams-persist-graph/pom.xml +++ b/streams-contrib/streams-persist-graph/pom.xml @@ -147,25 +147,7 @@ </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <configuration> - <includes>**/*.json</includes> - <outputDirectory>${project.build.directory}/test-classes</outputDirectory> - <includeGroupIds>org.apache.streams</includeGroupIds> - <includeTypes>test-jar</includeTypes> - </configuration> - <executions> - <execution> - <id>test-resource-dependencies</id> - <phase>process-test-resources</phase> - <goals> - <goal>unpack-dependencies</goal> - </goals> - </execution> - </executions> - </plugin> + <!-- revisit using this if streams bumps to jdk8 --> <!--<plugin>--> <!--<groupId>com.github.harti2006</groupId>--> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java deleted file mode 100644 index 5b2dec6..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph; - -import org.apache.streams.components.http.HttpPersistWriterConfiguration; -import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.graph.neo4j.CypherQueryGraphHelper; -import org.apache.streams.graph.neo4j.Neo4jHttpGraphHelper; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.apache.streams.pojo.json.Provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Adds activityobjects as vertices and activities as edges to a graph database with - * an http rest endpoint (such as neo4j). - */ -public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { - - public static final String STREAMS_ID = GraphHttpPersistWriter.class.getCanonicalName(); - - private static final Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class); - private static final long MAX_WRITE_LATENCY = 1000; - - private GraphHttpConfiguration configuration; - - private QueryGraphHelper queryGraphHelper; - private HttpGraphHelper httpGraphHelper; - - private static ObjectMapper mapper; - - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** - * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from JVM 'graph'. - */ - public GraphHttpPersistWriter() { - this(new ComponentConfigurator<>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); - } - - /** - * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration. - * @param configuration GraphHttpConfiguration - */ - public GraphHttpPersistWriter(GraphHttpConfiguration configuration) { - super(StreamsJacksonMapper.getInstance().convertValue(configuration, HttpPersistWriterConfiguration.class)); - if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { - super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit/"); - } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) { - super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); - } - this.configuration = configuration; - } - - @Override - protected ObjectNode preparePayload(StreamsDatum entry) throws Exception { - - Activity activity = null; - ActivityObject activityObject; - Object document = entry.getDocument(); - - if (document instanceof Activity) { - activity = (Activity) document; - activityObject = activity.getObject(); - } else if (document instanceof ActivityObject) { - activityObject = (ActivityObject) document; - } else { - ObjectNode objectNode; - if (document instanceof ObjectNode) { - objectNode = (ObjectNode) document; - } else if ( document instanceof String) { - try { - objectNode = mapper.readValue((String) document, ObjectNode.class); - } catch (IOException ex) { - LOGGER.error("Can't handle input: ", entry); - throw ex; - } - } else { - LOGGER.error("Can't handle input: ", entry); - throw new Exception("Can't create payload from datum."); - } - - if ( objectNode.get("verb") != null ) { - try { - activity = mapper.convertValue(objectNode, Activity.class); - activityObject = activity.getObject(); - } catch (Exception ex) { - activityObject = mapper.convertValue(objectNode, ActivityObject.class); - } - } else { - activityObject = mapper.convertValue(objectNode, ActivityObject.class); - } - } - - Preconditions.checkArgument(activity != null || activityObject != null); - - ObjectNode request = mapper.createObjectNode(); - ArrayNode statements = mapper.createArrayNode(); - - // always add vertices first - - List<String> labels = Collections.singletonList("streams"); - - if ( activityObject != null ) { - if ( activityObject.getObjectType() != null ) { - labels.add(activityObject.getObjectType()); - } - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject))); - } - - if ( activity != null ) { - - ActivityObject actor = activity.getActor(); - Provider provider = activity.getProvider(); - - if (provider != null && StringUtils.isNotBlank(provider.getId())) { - labels.add(provider.getId()); - } - if (actor != null && StringUtils.isNotBlank(actor.getId())) { - if (actor.getObjectType() != null) { - labels.add(actor.getObjectType()); - } - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor))); - } - - if (activityObject != null && StringUtils.isNotBlank(activityObject.getId())) { - if (activityObject.getObjectType() != null) { - labels.add(activityObject.getObjectType()); - } - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject))); - } - - // then add edge - - if (StringUtils.isNotBlank(activity.getVerb())) { - statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity))); - } - } - - request.put("statements", statements); - return request; - - } - - @Override - protected ObjectNode executePost(HttpPost httpPost) { - - Objects.requireNonNull(httpPost); - - ObjectNode result = null; - - CloseableHttpResponse response = null; - - String entityString = null; - try { - response = httpclient.execute(httpPost); - HttpEntity entity = response.getEntity(); - if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) { - entityString = EntityUtils.toString(entity); - result = mapper.readValue(entityString, ObjectNode.class); - } - LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString); - if ( result == null - || ( - result.get("errors") != null - && result.get("errors").isArray() - && result.get("errors").iterator().hasNext() - ) - ) { - LOGGER.error("Write Error: " + result.get("errors")); - } else { - LOGGER.debug("Write Success"); - } - } catch (IOException ex) { - LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage()); - } catch (Exception ex) { - LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), response, ex.getMessage()); - } finally { - try { - if ( response != null) { - response.close(); - } - } catch (IOException ignored) { - LOGGER.trace("ignored IOException", ignored); - } - } - return result; - } - - @Override - public void prepare(Object configurationObject) { - - super.prepare(configuration); - mapper = StreamsJacksonMapper.getInstance(); - - if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { - queryGraphHelper = new CypherQueryGraphHelper(); - httpGraphHelper = new Neo4jHttpGraphHelper(); - } - - Objects.requireNonNull(queryGraphHelper); - Objects.requireNonNull(httpGraphHelper); - } - - @Override - public void cleanUp() { - - LOGGER.info("exiting"); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java deleted file mode 100644 index 9560083..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph; - -import org.apache.streams.components.http.HttpProviderConfiguration; -import org.apache.streams.components.http.provider.SimpleHttpProvider; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsPersistReader; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.data.util.PropertyUtil; -import org.apache.streams.graph.neo4j.CypherQueryResponse; -import org.apache.streams.graph.neo4j.ItemData; -import org.apache.streams.graph.neo4j.ItemMetadata; -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * Reads a stream of activityobjects from vertices in a graph database with - * an http rest endpoint (such as neo4j). - */ -public class GraphVertexReader extends SimpleHttpProvider implements StreamsPersistReader { - - public static final String STREAMS_ID = GraphVertexReader.class.getCanonicalName(); - - private static final Logger LOGGER = LoggerFactory.getLogger(GraphVertexReader.class); - - protected GraphReaderConfiguration configuration; - - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - /** - * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 'graph'. - */ - public GraphVertexReader() { - this(new ComponentConfigurator<>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); - } - - /** - * GraphVertexReader constructor - use supplied GraphReaderConfiguration. - * @param configuration GraphReaderConfiguration - */ - public GraphVertexReader(GraphReaderConfiguration configuration) { - super(mapper.convertValue(configuration, HttpProviderConfiguration.class)); - if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) { - super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit"); - } else if ( configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) { - super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); - } - this.configuration = configuration; - } - - /** - * Neo API query returns something like this: - * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { "data": { props }, etc... } ] ] } - * - * @param jsonNode jsonNode - * @return result - */ - public List<ObjectNode> parse(JsonNode jsonNode) { - List<ObjectNode> results = new ArrayList<>(); - - ObjectNode root = (ObjectNode) jsonNode; - - CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, CypherQueryResponse.class); - - for ( List<List<ItemMetadata>> dataWrapper : cypherQueryResponse.getData()) { - - for (List<ItemMetadata> itemMetadatas : dataWrapper) { - - for (ItemMetadata itemMetadata : itemMetadatas) { - - ItemData itemData = itemMetadata.getData(); - - LOGGER.debug("itemData: " + itemData); - - results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.')); - } - - } - - } - return results; - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public void prepare(Object configurationObject) { - - super.prepare(configurationObject); - - } - - @Override - public StreamsResultSet readAll() { - return readCurrent(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java index ca1f4e4..804e9ff 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java @@ -29,6 +29,8 @@ import java.util.Map; */ public interface HttpGraphHelper { - ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters); + ObjectNode readData(Pair<String, Map<String, Object>> queryPlusParameters); + + ObjectNode writeData(Pair<String, Map<String, Object>> queryPlusParameters); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java index 1699aee..38ceb55 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java @@ -39,6 +39,8 @@ public interface QueryGraphHelper { public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject); - public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity); + public Pair<String, Map<String, Object>> createActorObjectEdge(Activity activity); + + public Pair<String, Map<String, Object>> createActorTargetEdge(Activity activity); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java deleted file mode 100644 index a361139..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph.neo4j; - -import org.apache.streams.data.util.PropertyUtil; -import org.apache.streams.graph.QueryGraphHelper; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.javatuples.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.stringtemplate.v4.ST; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * Supporting class for interacting with neo4j via rest API - */ -public class CypherQueryGraphHelper implements QueryGraphHelper { - - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); - - private static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v"; - private static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; - - private static final String createVertexStatementTemplate = - "MATCH (x {id: '<id>'}) " - + "CREATE UNIQUE (v:<type> { props }) " - + "ON CREATE SET v <labels> " - + "RETURN v"; - - - - private static final String mergeVertexStatementTemplate = - "MERGE (v:<type> {id: '<id>'}) " - + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() " - + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() " - + "RETURN v"; - - private static final String createEdgeStatementTemplate = - "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) " - + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) " - + "RETURN r"; - - /** - * getVertexRequest. - * @param streamsId streamsId - * @return pair (streamsId, parameterMap) - */ - public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) { - - ST getVertex = new ST(getVertexStringIdStatementTemplate); - getVertex.add("id", streamsId); - - Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); - - LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); - - return queryPlusParameters; - } - - /** - * getVertexRequest. - * @param vertexId numericId - * @return pair (streamsId, parameterMap) - */ - @Override - public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) { - - ST getVertex = new ST(getVertexLongIdStatementTemplate); - getVertex.add("id", vertexId); - - Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); - - LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); - - return queryPlusParameters; - - } - - /** - * createVertexRequest. - * @param activityObject activityObject - * @return pair (query, parameterMap) - */ - public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) { - - Objects.requireNonNull(activityObject.getObjectType()); - - List<String> labels = getLabels(activityObject); - - ST createVertex = new ST(createVertexStatementTemplate); - createVertex.add("id", activityObject.getId()); - createVertex.add("type", activityObject.getObjectType()); - - if ( labels.size() > 0 ) { - createVertex.add("labels", String.join(" ", labels)); - } - - String query = createVertex.render(); - - ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - - Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props); - - LOGGER.debug("createVertexRequest: ({},{})", query, props); - - return queryPlusParameters; - } - - /** - * mergeVertexRequest. - * @param activityObject activityObject - * @return pair (query, parameterMap) - */ - public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) { - - Objects.requireNonNull(activityObject.getObjectType()); - - Pair queryPlusParameters = new Pair(null, new HashMap<>()); - - List<String> labels = getLabels(activityObject); - - ST mergeVertex = new ST(mergeVertexStatementTemplate); - mergeVertex.add("id", activityObject.getId()); - mergeVertex.add("type", activityObject.getObjectType()); - if ( labels.size() > 0 ) { - mergeVertex.add("labels", String.join(" ", labels)); - } - String query = mergeVertex.render(); - - ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - - LOGGER.debug("mergeVertexRequest: ({},{})", query, props); - - queryPlusParameters = queryPlusParameters.setAt0(query); - queryPlusParameters = queryPlusParameters.setAt1(props); - - return queryPlusParameters; - } - - /** - * createEdgeRequest. - * @param activity activity - * @return pair (query, parameterMap) - */ - public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) { - - Pair queryPlusParameters = new Pair(null, new HashMap<>()); - - ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); - - ST mergeEdge = new ST(createEdgeStatementTemplate); - mergeEdge.add("s_id", activity.getActor().getId()); - mergeEdge.add("s_type", activity.getActor().getObjectType()); - mergeEdge.add("d_id", activity.getObject().getId()); - mergeEdge.add("d_type", activity.getObject().getObjectType()); - mergeEdge.add("r_id", activity.getId()); - mergeEdge.add("r_type", activity.getVerb()); - mergeEdge.add("r_props", getPropertyCreater(props)); - - // set the activityObject's and extensions null, because their properties don't need to appear on the relationship - activity.setActor(null); - activity.setObject(null); - activity.setTarget(null); - activity.getAdditionalProperties().put("extensions", null); - - String statement = mergeEdge.render(); - queryPlusParameters = queryPlusParameters.setAt0(statement); - queryPlusParameters = queryPlusParameters.setAt1(props); - - LOGGER.debug("createEdgeRequest: ({},{})", statement, props); - - return queryPlusParameters; - } - - /** - * getPropertyCreater. - * @param map paramMap - * @return PropertyCreater string - */ - public static String getPropertyCreater(Map<String, Object> map) { - StringBuilder builder = new StringBuilder(); - builder.append("{"); - List<String> parts = new ArrayList<>(); - for ( Map.Entry<String, Object> entry : map.entrySet()) { - if ( entry.getValue() instanceof String ) { - String propVal = (String) (entry.getValue()); - parts.add("`" + entry.getKey() + "`:'" + propVal + "'"); - } - } - builder.append(String.join(",", parts)); - builder.append("}"); - return builder.toString(); - } - - private List<String> getLabels(ActivityObject activityObject) { - List<String> labels = Collections.singletonList(":streams"); - if ( activityObject.getAdditionalProperties().containsKey("labels") ) { - List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels"); - for ( String extraLabel : extraLabels ) { - labels.add(":" + extraLabel); - } - } - return labels; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java deleted file mode 100644 index 9f47058..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph.neo4j; - -import org.apache.streams.graph.HttpGraphHelper; -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.javatuples.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Objects; - -/** - * Supporting class for interacting with neo4j via rest API. - */ -public class Neo4jHttpGraphHelper implements HttpGraphHelper { - - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); - - private static final String statementKey = "statement"; - private static final String paramsKey = "parameters"; - private static final String propsKey = "props"; - - /** - * createHttpRequest neo4j rest json payload. - * - * @param queryPlusParameters (query, parameter map) - * @return ObjectNode - */ - public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters) { - - LOGGER.debug("createHttpRequest: ", queryPlusParameters); - - Objects.requireNonNull(queryPlusParameters); - Objects.requireNonNull(queryPlusParameters.getValue0()); - Objects.requireNonNull(queryPlusParameters.getValue1()); - - ObjectNode request = MAPPER.createObjectNode(); - - request.put(statementKey, queryPlusParameters.getValue0()); - - ObjectNode params = MAPPER.createObjectNode(); - ObjectNode props = MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class); - - params.put(propsKey, props); - request.put(paramsKey, params); - - LOGGER.debug("createHttpRequest: ", request); - - return request; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json deleted file mode 100644 index 04a70e1..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphBinaryConfiguration.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "javaType" : "org.apache.streams.graph.GraphBinaryConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "type": { - "type": "string", - "description": "Graph DB type", - "enum" : ["neo4j", "gremlin"] - }, - "file": { - "type": "string", - "description": "New Graph DB File" - }, - "indexFields": { - "type": "array", - "items": { - "type": "string" - }, - "description": "Fields to index under streams label" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json deleted file mode 100644 index de92443..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "javaType" : "org.apache.streams.graph.GraphConfiguration", - "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"}, - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "type": { - "type": "string", - "description": "Graph DB type", - "enum" : ["neo4j", "rexster"] - }, - "graph": { - "type": "string", - "description": "Graph DB Graph ID" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json deleted file mode 100644 index c63e0fb..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphHttpConfiguration.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "javaType" : "org.apache.streams.graph.GraphHttpConfiguration", - "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpConfiguration.json"}, - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "type": { - "type": "string", - "description": "Graph DB type", - "enum" : ["neo4j", "rexster"] - }, - "graph": { - "type": "string", - "description": "Graph DB Graph ID" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json deleted file mode 100644 index 77c6fd7..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/neo4j/CypherQueryResponse.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "$license": [ - "http://www.apache.org/licenses/LICENSE-2.0" - ], - "id": "#", - "javaType" : "org.apache.streams.graph.neo4j.CypherQueryResponse", - "properties": { - "columns": { - "type": "array", - "id": "http://jsonschema.net/columns", - "required": false, - "items": { - "type": "string", - "id": "http://jsonschema.net/columns/0", - "required": false - } - }, - "data": { - "type": "array", - "required": false, - "items": { - "type": "array", - "required": false, - "items": { - "type": "array", - "required": false, - "items": { - "type": "object", - "javaType" : "org.apache.streams.graph.neo4j.ItemMetadata", - "properties": { - "data": { - "type": "object", - "javaType" : "org.apache.streams.graph.neo4j.ItemData" - } - } - } - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java deleted file mode 100644 index c29c8b7..0000000 --- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph.test; - -import org.apache.streams.graph.neo4j.CypherQueryGraphHelper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; - -import org.javatuples.Pair; -import org.junit.Test; - -import java.util.Map; - -/** - * TestCypherQueryGraphHelper tests - * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper - */ -public class TestCypherQueryGraphHelper { - - CypherQueryGraphHelper helper = new CypherQueryGraphHelper(); - - @Test - public void getVertexRequestIdTest() throws Exception { - - Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest("id"); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - - } - - @Test - public void getVertexRequestLongTest() throws Exception { - - Pair<String, Map<String, Object>> queryAndParams = helper.getVertexRequest(new Long(1)); - - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - - } - - @Test - public void createVertexRequestTest() throws Exception { - - ActivityObject activityObject = new ActivityObject(); - activityObject.setId("id"); - activityObject.setObjectType("type"); - activityObject.setContent("content"); - - Pair<String, Map<String, Object>> queryAndParams = helper.createVertexRequest(activityObject); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - assert(queryAndParams.getValue1() != null); - - } - - @Test - public void mergeVertexRequestTest() throws Exception { - - ActivityObject activityObject = new ActivityObject(); - activityObject.setId("id"); - activityObject.setObjectType("type"); - activityObject.setContent("content"); - - Pair<String, Map<String, Object>> queryAndParams = helper.mergeVertexRequest(activityObject); - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - assert(queryAndParams.getValue1() != null); - - } - - @Test - public void createEdgeRequestTest() throws Exception { - - ActivityObject actor = new ActivityObject(); - actor.setId("actor"); - actor.setObjectType("type"); - actor.setContent("content"); - - ActivityObject object = new ActivityObject(); - object.setId("object"); - object.setObjectType("type"); - object.setContent("content"); - - Activity activity = new Activity(); - activity.setId("activity"); - activity.setVerb("verb"); - activity.setContent("content"); - - activity.setActor(actor); - activity.setObject(object); - - Pair<String, Map<String, Object>> queryAndParams = helper.createEdgeRequest(activity); - - assert(queryAndParams != null); - assert(queryAndParams.getValue0() != null); - assert(queryAndParams.getValue1() != null); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java deleted file mode 100644 index 24ddd65..0000000 --- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph.test; - -import org.apache.streams.graph.GraphHttpConfiguration; -import org.apache.streams.graph.GraphReaderConfiguration; -import org.apache.streams.graph.GraphVertexReader; -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.io.IOUtils; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.List; - -/** - * Unit test for {@link org.apache.streams.graph.GraphVertexReader} - * - * Test that graph db responses can be converted to streams data. - */ -public class TestNeo4jHttpVertexReader { - - private static final Logger LOGGER = LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class); - - private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - private JsonNode sampleReaderResult; - - private GraphReaderConfiguration testConfiguration; - - private GraphVertexReader graphPersistReader; - - @Before - public void prepareTest() throws IOException { - - testConfiguration = new GraphReaderConfiguration(); - testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J); - - graphPersistReader = new GraphVertexReader(testConfiguration); - InputStream testActivityFileStream = TestNeo4jHttpVertexReader.class.getClassLoader() - .getResourceAsStream("sampleReaderResult.json"); - String sampleText = IOUtils.toString(testActivityFileStream, "utf-8"); - sampleReaderResult = mapper.readValue(sampleText, JsonNode.class); - - } - - @Test - public void testParseNeoResult() throws IOException { - - List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult); - - assert( result.size() == 10); - - for( int i = 0 ; i < 10; i++ ) - assert( result.get(i).get("extensions").size() == 5); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/pom.xml b/streams-contrib/streams-persist-neo4j/pom.xml new file mode 100644 index 0000000..d117558 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/pom.xml @@ -0,0 +1,263 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>streams-contrib</artifactId> + <groupId>org.apache.streams</groupId> + <version>0.5-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>streams-persist-neo4j</artifactId> + <name>streams-persist-neo4j</name> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-converters</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-http</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-persist-graph</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>stringtemplate</artifactId> + <version>4.0.2</version> + </dependency> + <dependency> + <groupId>org.javatuples</groupId> + <artifactId>javatuples</artifactId> + <version>1.2</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.neo4j.driver</groupId> + <artifactId>neo4j-java-driver</artifactId> + <version>1.0.6</version> + </dependency> + + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-schema-activitystreams</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-testing</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.streams.plugins</groupId> + <artifactId>streams-plugin-pojo</artifactId> + <version>${project.version}</version> + <configuration> + <sourcePaths> + <sourcePath>${project.basedir}/src/main/jsonschema</sourcePath> + </sourcePaths> + <targetDirectory>${project.basedir}/target/generated-sources/pojo</targetDirectory> + <targetPackage>org.apache.streams.graph.pojo</targetPackage> + </configuration> + <executions> + <execution> + <goals> + <goal>generate-sources</goal> + </goals> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-http</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/pojo</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <includes>**/*.json</includes> + <outputDirectory>${project.build.directory}/test-classes</outputDirectory> + <includeGroupIds>org.apache.streams</includeGroupIds> + <includeTypes>test-jar</includeTypes> + </configuration> + <executions> + <execution> + <id>test-resource-dependencies</id> + <phase>process-test-resources</phase> + <goals> + <goal>unpack-dependencies</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <profiles> + <profile> + <id>dockerITs</id> + <activation> + <activeByDefault>false</activeByDefault> + <property> + <name>skipITs</name> + <value>false</value> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + <configuration combine.self="override"> + <watchInterval>500</watchInterval> + <logDate>default</logDate> + <verbose>true</verbose> + <autoPull>on</autoPull> + <images> + <image> + <name>neo4j:3.0.6</name> + <alias>neo4j</alias> + <run> + <env> + <NEO4J_AUTH>none</NEO4J_AUTH> + </env> + <namingStrategy>none</namingStrategy> + <ports> + <port>${neo4j.http.host}:${neo4j.http.port}:7474</port> + <port>${neo4j.tcp.host}:${neo4j.tcp.port}:7687</port> + </ports> + <portPropertyFile>neo4j.properties</portPropertyFile> + <wait> + <log>neo4j startup</log> + <http> + <url>http://${neo4j.http.host}:${neo4j.http.port}</url> + <method>GET</method> + <status>200</status> + </http> + <time>20000</time> + <kill>1000</kill> + <shutdown>500</shutdown> + <!--<tcp>--> + <!--<host>${es.transport.host}</host>--> + <!--<ports>--> + <!--<port>${es.transport.port}</port>--> + <!--</ports>--> + <!--</tcp>--> + </wait> + <log> + <enabled>true</enabled> + <date>default</date> + <color>cyan</color> + </log> + </run> + <watch> + <mode>none</mode> + </watch> + </image> + + </images> + </configuration> + + </plugin> + + </plugins> + </build> + + </profile> + </profiles> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java new file mode 100644 index 0000000..c117c16 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/CypherQueryGraphHelper.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.neo4j; + +import org.apache.streams.graph.QueryGraphHelper; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.apache.streams.util.PropertyUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.stringtemplate.v4.ST; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Supporting class for interacting with neo4j via rest API + */ +public class CypherQueryGraphHelper implements QueryGraphHelper, Serializable { + + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + private static final Logger LOGGER = LoggerFactory.getLogger(CypherQueryGraphHelper.class); + + public static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v"; + public static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; + public static final String getVerticesLabelIdStatementTemplate = "MATCH (v:<type>) RETURN v"; + + public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+ + "CREATE UNIQUE (v:`<type>` { props }) "+ + "ON CREATE SET v <labels> "+ + "RETURN v"; + + public final static String mergeVertexStatementTemplate = "MERGE (v:`<type>` {id: '<id>'}) "+ + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+ + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() "+ + "RETURN v"; + + public final static String createEdgeStatementTemplate = "MATCH (s:`<s_type>` {id: '<s_id>'}),(d:`<d_type>` {id: '<d_id>'}) "+ + "CREATE UNIQUE (s)-[r:`<r_type>` <r_props>]->(d) "+ + "RETURN r"; + + public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) { + + ST getVertex = new ST(getVertexStringIdStatementTemplate); + getVertex.add("id", streamsId); + + Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); + + LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); + + return queryPlusParameters; + } + + /** + * getVertexRequest. + * @param vertexId numericId + * @return pair (streamsId, parameterMap) + */ + public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) { + + ST getVertex = new ST(getVertexLongIdStatementTemplate); + getVertex.add("id", vertexId); + + Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); + + LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); + + return queryPlusParameters; + + } + + /** + * createVertexRequest. + * @param activityObject activityObject + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) { + + Objects.requireNonNull(activityObject.getObjectType()); + + List<String> labels = getLabels(activityObject); + + ST createVertex = new ST(createVertexStatementTemplate); + createVertex.add("id", activityObject.getId()); + createVertex.add("type", activityObject.getObjectType()); + + if ( labels.size() > 0 ) { + createVertex.add("labels", String.join(" ", labels)); + } + + String query = createVertex.render(); + + ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + + Pair<String, Map<String, Object>> queryPlusParameters = new Pair(createVertex.render(), props); + + LOGGER.debug("createVertexRequest: ({},{})", query, props); + + return queryPlusParameters; + } + + /** + * getVerticesRequest gets all vertices with a label. + * @param labelId labelId + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> getVerticesRequest(String labelId) { + ST getVertex = new ST(getVerticesLabelIdStatementTemplate); + getVertex.add("type", labelId); + + Pair<String, Map<String, Object>> queryPlusParameters = new Pair(getVertex.render(), null); + + LOGGER.debug("getVertexRequest", queryPlusParameters.toString()); + + return queryPlusParameters; + } + + /** + * mergeVertexRequest. + * @param activityObject activityObject + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) { + + Objects.requireNonNull(activityObject.getObjectType()); + + Pair queryPlusParameters = new Pair(null, new HashMap<>()); + + List<String> labels = getLabels(activityObject); + + ST mergeVertex = new ST(mergeVertexStatementTemplate); + mergeVertex.add("id", activityObject.getId()); + mergeVertex.add("type", activityObject.getObjectType()); + if ( labels.size() > 0 ) { + mergeVertex.add("labels", String.join(" ", labels)); + } + String query = mergeVertex.render(); + + ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + + LOGGER.debug("mergeVertexRequest: ({},{})", query, props); + + queryPlusParameters = queryPlusParameters.setAt0(query); + queryPlusParameters = queryPlusParameters.setAt1(props); + + return queryPlusParameters; + } + + /** + * createActorObjectEdge. + * @param activity activity + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> createActorObjectEdge(Activity activity) { + + Pair queryPlusParameters = new Pair(null, new HashMap<>()); + + ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + + ST mergeEdge = new ST(createEdgeStatementTemplate); + mergeEdge.add("s_id", activity.getActor().getId()); + mergeEdge.add("s_type", activity.getActor().getObjectType()); + mergeEdge.add("d_id", activity.getObject().getId()); + mergeEdge.add("d_type", activity.getObject().getObjectType()); + mergeEdge.add("r_id", activity.getId()); + mergeEdge.add("r_type", activity.getVerb()); + mergeEdge.add("r_props", getActorObjectEdgePropertyCreater(props)); + + String statement = mergeEdge.render(); + queryPlusParameters = queryPlusParameters.setAt0(statement); + queryPlusParameters = queryPlusParameters.setAt1(props); + + LOGGER.debug("createActorObjectEdge: ({},{})", statement, props); + + return queryPlusParameters; + } + + /** + * createActorTargetEdge. + * @param activity activity + * @return pair (query, parameterMap) + */ + public Pair<String, Map<String, Object>> createActorTargetEdge(Activity activity) { + + Pair queryPlusParameters = new Pair(null, new HashMap<>()); + + ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); + + ST mergeEdge = new ST(createEdgeStatementTemplate); + mergeEdge.add("s_id", activity.getActor().getId()); + mergeEdge.add("s_type", activity.getActor().getObjectType()); + mergeEdge.add("d_id", activity.getTarget().getId()); + mergeEdge.add("d_type", activity.getTarget().getObjectType()); + mergeEdge.add("r_id", activity.getId()); + mergeEdge.add("r_type", activity.getVerb()); + mergeEdge.add("r_props", getActorTargetEdgePropertyCreater(props)); + + String statement = mergeEdge.render(); + queryPlusParameters = queryPlusParameters.setAt0(statement); + queryPlusParameters = queryPlusParameters.setAt1(props); + + LOGGER.debug("createActorObjectEdge: ({},{})", statement, props); + + return queryPlusParameters; + } + + /** + * getPropertyValueSetter. + * @param map paramMap + * @return PropertyValueSetter string + */ + public static String getPropertyValueSetter(Map<String, Object> map, String symbol) { + StringBuilder builder = new StringBuilder(); + for( Map.Entry<String, Object> entry : map.entrySet()) { + if( entry.getValue() instanceof String ) { + String propVal = (String)(entry.getValue()); + builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + StringEscapeUtils.escapeJava(propVal) + "'"); + } + } + return builder.toString(); + } + + /** + * getPropertyParamSetter. + * @param map paramMap + * @return PropertyParamSetter string + */ + public static String getPropertyParamSetter(Map<String, Object> map, String symbol) { + StringBuilder builder = new StringBuilder(); + for( Map.Entry<String, Object> entry : map.entrySet()) { + if( entry.getValue() instanceof String ) { + String propVal = (String)(entry.getValue()); + builder.append("," + symbol + ".`" + entry.getKey() + "` = '" + StringEscapeUtils.escapeJava(propVal) + "'"); + } + } + return builder.toString(); + } + + /** + * getPropertyCreater. + * @param map paramMap + * @return PropertyCreater string + */ + public static String getPropertyCreater(Map<String, Object> map) { + StringBuilder builder = new StringBuilder(); + builder.append("{ "); + List<String> parts = new ArrayList<>(); + for( Map.Entry<String, Object> entry : map.entrySet()) { + if( entry.getValue() instanceof String ) { + String propVal = (String) (entry.getValue()); + parts.add("`"+entry.getKey() + "`:'" + StringEscapeUtils.escapeJava(propVal) + "'"); + } + } + builder.append(String.join(" ", parts)); + builder.append(" }"); + return builder.toString(); + } + + private String getActorObjectEdgePropertyCreater(Map<String, Object> map) { + StringBuilder builder = new StringBuilder(); + builder.append("{ "); + List<String> parts = new ArrayList<>(); + for( Map.Entry<String, Object> entry : map.entrySet()) { + if( entry.getValue() instanceof String ) { + String propVal = (String) (entry.getValue()); + if( !entry.getKey().contains(".")) { + parts.add("`"+entry.getKey() + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'"); + } + } + } + builder.append(String.join(", ", parts)); + builder.append(" }"); + return builder.toString(); + } + + private String getActorTargetEdgePropertyCreater(Map<String, Object> map) { + StringBuilder builder = new StringBuilder(); + builder.append("{ "); + List<String> parts = new ArrayList<>(); + for( Map.Entry<String, Object> entry : map.entrySet()) { + if( entry.getValue() instanceof String ) { + String propVal = (String) (entry.getValue()); + if( !entry.getKey().contains(".")) { + parts.add("`"+entry.getKey() + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'"); + } else if( entry.getKey().startsWith("object.") && !entry.getKey().contains(".id")) { + parts.add("`"+entry.getKey().substring("object.".length()) + "`: '" + StringEscapeUtils.escapeJava(propVal) + "'"); + } + } + } + builder.append(String.join(", ", parts)); + builder.append(" }"); + return builder.toString(); + } + + /** + * getLabels. + * @param activityObject activityObject + * @return PropertyCreater string + */ + public static List<String> getLabels(ActivityObject activityObject) { + List<String> labels = Collections.singletonList(":streams"); + if ( activityObject.getAdditionalProperties().containsKey("labels") ) { + List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels"); + for ( String extraLabel : extraLabels ) { + labels.add(":" + extraLabel); + } + } + return labels; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java new file mode 100644 index 0000000..6058c66 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java @@ -0,0 +1,151 @@ +package org.apache.streams.neo4j; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import org.apache.commons.lang3.StringUtils; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Created by steve on 1/2/17. + */ +public class Neo4jPersistUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jBoltPersistWriter.class); + + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + private static CypherQueryGraphHelper helper = new CypherQueryGraphHelper(); + + public static List<Pair<String, Map<String, Object>>> prepareStatements(StreamsDatum entry) throws Exception { + + List<Pair<String, Map<String, Object>>> statements = new ArrayList<>(); + + String id = entry.getId(); + Activity activity = null; + ActivityObject activityObject = null; + Object document = entry.getDocument(); + + if (document instanceof Activity) { + activity = (Activity) document; + } else if (document instanceof ActivityObject) { + activityObject = (ActivityObject) document; + } else { + ObjectNode objectNode; + if (document instanceof ObjectNode) { + objectNode = (ObjectNode) document; + } else if ( document instanceof String) { + try { + objectNode = mapper.readValue((String) document, ObjectNode.class); + } catch (IOException ex) { + LOGGER.error("Can't handle input: ", entry); + throw ex; + } + } else { + LOGGER.error("Can't handle input: ", entry); + throw new Exception("Can't create statements from datum."); + } + + if ( objectNode.get("verb") != null ) { + try { + activity = mapper.convertValue(objectNode, Activity.class); + activityObject = activity.getObject(); + } catch (Exception ex) { + activityObject = mapper.convertValue(objectNode, ActivityObject.class); + } + } else { + activityObject = mapper.convertValue(objectNode, ActivityObject.class); + } + + } + + Preconditions.checkArgument(activity != null ^ activityObject != null); + + if ( activityObject != null && !Strings.isNullOrEmpty(activityObject.getId())) { + + statements.add(vertexStatement(activityObject)); + + } else if ( activity != null && !Strings.isNullOrEmpty(activity.getId())) { + + statements.addAll(vertexStatements(activity)); + + statements.addAll(edgeStatements(activity)); + + } + + return statements; + } + + public static List<Pair<String, Map<String, Object>>> vertexStatements(Activity activity) { + List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();; + ActivityObject actor = activity.getActor(); + ActivityObject object = activity.getObject(); + ActivityObject target = activity.getTarget(); + + if (actor != null && StringUtils.isNotBlank(actor.getId())) { + Pair<String, Map<String, Object>> actorStatement = vertexStatement(actor); + statements.add(actorStatement); + } + + if (object != null && StringUtils.isNotBlank(object.getId())) { + Pair<String, Map<String, Object>> objectStatement = vertexStatement(object); + statements.add(objectStatement); + } + + if (target != null && StringUtils.isNotBlank(target.getId())) { + Pair<String, Map<String, Object>> targetStatement = vertexStatement(target); + statements.add(targetStatement); + } + + return statements; + } + + public static List<Pair<String, Map<String, Object>>> edgeStatements(Activity activity) { + List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();; + ActivityObject actor = activity.getActor(); + ActivityObject object = activity.getObject(); + ActivityObject target = activity.getTarget(); + + if (StringUtils.isNotBlank(actor.getId()) && object != null && StringUtils.isNotBlank(object.getId())) { + Pair<String, Map<String, Object>> actorObjectEdgeStatement = helper.createActorObjectEdge(activity); + Map<String, Object> props = new HashMap<>(); + props.put("props", actorObjectEdgeStatement.getValue1()); + actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props); + statements.add(actorObjectEdgeStatement); + } + + if (StringUtils.isNotBlank(actor.getId()) && target != null && StringUtils.isNotBlank(target.getId())) { + Pair<String, Map<String, Object>> actorTargetEdgeStatement = helper.createActorTargetEdge(activity); + Map<String, Object> props = new HashMap<>(); + props.put("props", actorTargetEdgeStatement.getValue1()); + actorTargetEdgeStatement = actorTargetEdgeStatement.setAt1(props); + statements.add(actorTargetEdgeStatement); + } + + return statements; + } + + public static Pair<String, Map<String, Object>> vertexStatement(ActivityObject activityObject) { + Pair<String, Map<String, Object>> mergeVertexRequest = helper.mergeVertexRequest(activityObject); + Map<String, Object> props = new HashMap<>(); + props.put("props", mergeVertexRequest.getValue1()); + mergeVertexRequest = mergeVertexRequest.setAt1(props); + return mergeVertexRequest; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4bd22317/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java new file mode 100644 index 0000000..9bfc049 --- /dev/null +++ b/streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/bolt/Neo4jBoltClient.java @@ -0,0 +1,92 @@ +package org.apache.streams.neo4j.bolt; + +import org.apache.streams.neo4j.Neo4jConfiguration; + +import com.google.common.base.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.neo4j.driver.v1.AuthToken; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import static org.hamcrest.MatcherAssert.assertThat; + +public class Neo4jBoltClient { + + private static final Logger LOGGER = LoggerFactory + .getLogger(Neo4jBoltClient.class); + + private Driver client; + + public Neo4jConfiguration config; + + private Neo4jBoltClient(Neo4jConfiguration neo4jConfiguration) { + this.config = neo4jConfiguration; + try { + this.start(); + } catch (Exception e) { + e.printStackTrace(); + this.client = null; + } + } + + private static Map<Neo4jConfiguration, Neo4jBoltClient> INSTANCE_MAP = new ConcurrentHashMap<Neo4jConfiguration, Neo4jBoltClient>(); + + public static Neo4jBoltClient getInstance(Neo4jConfiguration neo4jConfiguration) { + if ( INSTANCE_MAP != null && + INSTANCE_MAP.size() > 0 && + INSTANCE_MAP.containsKey(neo4jConfiguration)) { + return INSTANCE_MAP.get(neo4jConfiguration); + } else { + Neo4jBoltClient instance = new Neo4jBoltClient(neo4jConfiguration); + if( instance != null && instance.client != null ) { + INSTANCE_MAP.put(neo4jConfiguration, instance); + return instance; + } else { + return null; + } + } + } + + public void start() throws Exception { + + Objects.nonNull(config); + assertThat("config.getScheme().startsWith(\"tcp\")", config.getScheme().startsWith("tcp")); + + LOGGER.info("Neo4jConfiguration.start {}", config); + + AuthToken authToken = null; + if( StringUtils.isNotBlank(config.getUsername()) && StringUtils.isNotBlank(config.getPassword())) { + authToken = AuthTokens.basic( config.getUsername(), config.getPassword() ); + } + + if( authToken == null ) { + client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) + ":" + config.getPort()); + } else { + client = GraphDatabase.driver("bolt://" + config.getHosts().get(0) + ":" + config.getPort(), authToken); + } + + Objects.nonNull(client); + + } + + public void stop() throws Exception { + this.client.session().close(); + this.client = null; + } + + public Neo4jConfiguration config() { + return config; + } + + public Driver client() { + return client; + } +}