[
https://issues.apache.org/jira/browse/STREAMS-344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15809806#comment-15809806
]
ASF GitHub Bot commented on STREAMS-344:
----------------------------------------
Github user steveblackmon commented on a diff in the pull request:
https://github.com/apache/incubator-streams/pull/348#discussion_r95085386
--- Diff:
streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
---
@@ -0,0 +1,151 @@
+package org.apache.streams.neo4j;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import org.apache.commons.lang3.StringUtils;
+import org.javatuples.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by steve on 1/2/17.
+ */
+public class Neo4jPersistUtil {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
+
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private static CypherQueryGraphHelper helper = new
CypherQueryGraphHelper();
+
+ public static List<Pair<String, Map<String, Object>>>
prepareStatements(StreamsDatum entry) throws Exception {
+
+ List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
+
+ String id = entry.getId();
+ Activity activity = null;
+ ActivityObject activityObject = null;
+ Object document = entry.getDocument();
+
+ if (document instanceof Activity) {
+ activity = (Activity) document;
+ } else if (document instanceof ActivityObject) {
+ activityObject = (ActivityObject) document;
+ } else {
+ ObjectNode objectNode;
+ if (document instanceof ObjectNode) {
+ objectNode = (ObjectNode) document;
+ } else if ( document instanceof String) {
+ try {
+ objectNode = mapper.readValue((String) document,
ObjectNode.class);
+ } catch (IOException ex) {
+ LOGGER.error("Can't handle input: ", entry);
+ throw ex;
+ }
+ } else {
+ LOGGER.error("Can't handle input: ", entry);
+ throw new Exception("Can't create statements from datum.");
+ }
+
+ if ( objectNode.get("verb") != null ) {
+ try {
+ activity = mapper.convertValue(objectNode, Activity.class);
+ activityObject = activity.getObject();
+ } catch (Exception ex) {
+ activityObject = mapper.convertValue(objectNode,
ActivityObject.class);
+ }
+ } else {
+ activityObject = mapper.convertValue(objectNode,
ActivityObject.class);
+ }
+
+ }
+
+ Preconditions.checkArgument(activity != null ^ activityObject != null);
+
+ if ( activityObject != null &&
!Strings.isNullOrEmpty(activityObject.getId())) {
+
+ statements.add(vertexStatement(activityObject));
+
+ } else if ( activity != null &&
!Strings.isNullOrEmpty(activity.getId())) {
+
+ statements.addAll(vertexStatements(activity));
+
+ statements.addAll(edgeStatements(activity));
+
+ }
+
+ return statements;
+ }
+
+ public static List<Pair<String, Map<String, Object>>>
vertexStatements(Activity activity) {
+ List<Pair<String, Map<String, Object>>> statements = new
ArrayList<>();;
+ ActivityObject actor = activity.getActor();
+ ActivityObject object = activity.getObject();
+ ActivityObject target = activity.getTarget();
+
+ if (actor != null && StringUtils.isNotBlank(actor.getId())) {
+ Pair<String, Map<String, Object>> actorStatement =
vertexStatement(actor);
+ statements.add(actorStatement);
+ }
+
+ if (object != null && StringUtils.isNotBlank(object.getId())) {
+ Pair<String, Map<String, Object>> objectStatement =
vertexStatement(object);
+ statements.add(objectStatement);
+ }
+
+ if (target != null && StringUtils.isNotBlank(target.getId())) {
+ Pair<String, Map<String, Object>> targetStatement =
vertexStatement(target);
+ statements.add(targetStatement);
+ }
+
+ return statements;
+ }
+
+ public static List<Pair<String, Map<String, Object>>>
edgeStatements(Activity activity) {
+ List<Pair<String, Map<String, Object>>> statements = new
ArrayList<>();;
+ ActivityObject actor = activity.getActor();
+ ActivityObject object = activity.getObject();
+ ActivityObject target = activity.getTarget();
+
+ if (StringUtils.isNotBlank(actor.getId()) && object != null &&
StringUtils.isNotBlank(object.getId())) {
+ Pair<String, Map<String, Object>> actorObjectEdgeStatement =
helper.createActorObjectEdge(activity);
+ HashMap<String, Object> props = new HashMap<>();
+ props.put("props", actorObjectEdgeStatement.getValue1());
+ actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
+ statements.add(actorObjectEdgeStatement);
+ }
+
+ if (StringUtils.isNotBlank(actor.getId()) && target != null &&
StringUtils.isNotBlank(target.getId())) {
+ Pair<String, Map<String, Object>> actorTargetEdgeStatement =
helper.createActorTargetEdge(activity);
+ HashMap<String, Object> props = new HashMap<>();
--- End diff --
✔️
> Support binary protocol in streams-persist-graph
> ------------------------------------------------
>
> Key: STREAMS-344
> URL: https://issues.apache.org/jira/browse/STREAMS-344
> Project: Streams
> Issue Type: Improvement
> Components: Persist
> Reporter: Steve Blackmon
> Assignee: Steve Blackmon
> Fix For: 0.5
>
>
> Support batch writes in GraphHttpPersistWriter.
> Using a separate HTTP Post for every datum is inefficient.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)