[ 
https://issues.apache.org/jira/browse/STREAMS-344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15809807#comment-15809807
 ] 

ASF GitHub Bot commented on STREAMS-344:
----------------------------------------

Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/348#discussion_r95085389
  
    --- Diff: 
streams-contrib/streams-persist-neo4j/src/main/java/org/apache/streams/neo4j/Neo4jPersistUtil.java
 ---
    @@ -0,0 +1,151 @@
    +package org.apache.streams.neo4j;
    +
    +import org.apache.streams.core.StreamsDatum;
    +import org.apache.streams.jackson.StreamsJacksonMapper;
    +import org.apache.streams.neo4j.bolt.Neo4jBoltPersistWriter;
    +import org.apache.streams.pojo.json.Activity;
    +import org.apache.streams.pojo.json.ActivityObject;
    +
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.javatuples.Pair;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Created by steve on 1/2/17.
    + */
    +public class Neo4jPersistUtil {
    +
    +  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jBoltPersistWriter.class);
    +
    +  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
    +
    +  private static CypherQueryGraphHelper helper = new 
CypherQueryGraphHelper();
    +
    +  public static List<Pair<String, Map<String, Object>>> 
prepareStatements(StreamsDatum entry) throws Exception {
    +
    +    List<Pair<String, Map<String, Object>>> statements = new ArrayList<>();
    +
    +    String id = entry.getId();
    +    Activity activity = null;
    +    ActivityObject activityObject = null;
    +    Object document = entry.getDocument();
    +
    +    if (document instanceof Activity) {
    +      activity = (Activity) document;
    +    } else if (document instanceof ActivityObject) {
    +      activityObject = (ActivityObject) document;
    +    } else {
    +      ObjectNode objectNode;
    +      if (document instanceof ObjectNode) {
    +        objectNode = (ObjectNode) document;
    +      } else if ( document instanceof String) {
    +        try {
    +          objectNode = mapper.readValue((String) document, 
ObjectNode.class);
    +        } catch (IOException ex) {
    +          LOGGER.error("Can't handle input: ", entry);
    +          throw ex;
    +        }
    +      } else {
    +        LOGGER.error("Can't handle input: ", entry);
    +        throw new Exception("Can't create statements from datum.");
    +      }
    +
    +      if ( objectNode.get("verb") != null ) {
    +        try {
    +          activity = mapper.convertValue(objectNode, Activity.class);
    +          activityObject = activity.getObject();
    +        } catch (Exception ex) {
    +          activityObject = mapper.convertValue(objectNode, 
ActivityObject.class);
    +        }
    +      } else {
    +        activityObject = mapper.convertValue(objectNode, 
ActivityObject.class);
    +      }
    +
    +    }
    +
    +    Preconditions.checkArgument(activity != null ^ activityObject != null);
    +
    +    if ( activityObject != null && 
!Strings.isNullOrEmpty(activityObject.getId())) {
    +
    +      statements.add(vertexStatement(activityObject));
    +
    +    } else if ( activity != null && 
!Strings.isNullOrEmpty(activity.getId())) {
    +
    +      statements.addAll(vertexStatements(activity));
    +
    +      statements.addAll(edgeStatements(activity));
    +
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> 
vertexStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new 
ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (actor != null && StringUtils.isNotBlank(actor.getId())) {
    +      Pair<String, Map<String, Object>> actorStatement = 
vertexStatement(actor);
    +      statements.add(actorStatement);
    +    }
    +
    +    if (object != null && StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> objectStatement = 
vertexStatement(object);
    +      statements.add(objectStatement);
    +    }
    +
    +    if (target != null && StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> targetStatement = 
vertexStatement(target);
    +      statements.add(targetStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static List<Pair<String, Map<String, Object>>> 
edgeStatements(Activity activity) {
    +    List<Pair<String, Map<String, Object>>> statements = new 
ArrayList<>();;
    +    ActivityObject actor = activity.getActor();
    +    ActivityObject object = activity.getObject();
    +    ActivityObject target = activity.getTarget();
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && object != null && 
StringUtils.isNotBlank(object.getId())) {
    +      Pair<String, Map<String, Object>> actorObjectEdgeStatement = 
helper.createActorObjectEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorObjectEdgeStatement.getValue1());
    +      actorObjectEdgeStatement = actorObjectEdgeStatement.setAt1(props);
    +      statements.add(actorObjectEdgeStatement);
    +    }
    +
    +    if (StringUtils.isNotBlank(actor.getId()) && target != null && 
StringUtils.isNotBlank(target.getId())) {
    +      Pair<String, Map<String, Object>> actorTargetEdgeStatement = 
helper.createActorTargetEdge(activity);
    +      HashMap<String, Object> props = new HashMap<>();
    +      props.put("props", actorTargetEdgeStatement.getValue1());
    +      actorTargetEdgeStatement = actorTargetEdgeStatement.setAt1(props);
    +      statements.add(actorTargetEdgeStatement);
    +    }
    +
    +    return statements;
    +  }
    +
    +  public static Pair<String, Map<String, Object>> 
vertexStatement(ActivityObject activityObject) {
    +    Pair<String, Map<String, Object>> mergeVertexRequest = 
helper.mergeVertexRequest(activityObject);
    +    HashMap<String, Object> props = new HashMap<>();
    --- End diff --
    
    ✔️ 


> Support binary protocol in streams-persist-graph
> ------------------------------------------------
>
>                 Key: STREAMS-344
>                 URL: https://issues.apache.org/jira/browse/STREAMS-344
>             Project: Streams
>          Issue Type: Improvement
>          Components: Persist
>            Reporter: Steve Blackmon
>            Assignee: Steve Blackmon
>             Fix For: 0.5
>
>
> Support batch writes in GraphHttpPersistWriter.
> Using a separate HTTP Post for every datum is inefficient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to