basic implementation, confirmed working (although errors not uncommon) using https://github.com/w2ogroup/streams-examples.git/twitter-userstream-neo4j (STREAMS-231 branch)
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d471f34d Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d471f34d Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d471f34d Branch: refs/heads/STREAMS-212 Commit: d471f34dc2a251963efec8a4130df0608ed70b4f Parents: 6c32ce2 Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Sat Nov 22 21:00:42 2014 -0600 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Sat Nov 22 21:01:12 2014 -0600 ---------------------------------------------------------------------- streams-contrib/pom.xml | 1 + streams-contrib/streams-persist-graph/README.md | 44 +++++ streams-contrib/streams-persist-graph/pom.xml | 96 +++++++++ .../streams/graph/GraphPersistWriter.java | 193 +++++++++++++++++++ .../streams/graph/neo4j/CypherGraphUtil.java | 147 ++++++++++++++ .../streams/graph/GraphConfiguration.json | 19 ++ .../graph/GraphEdgeWriterConfiguration.json | 30 +++ .../graph/GraphVertexWriterConfiguration.json | 30 +++ .../serializer/util/TwitterActivityUtil.java | 7 +- .../apache/streams/data/util/PropertyUtil.java | 89 +++++++++ 10 files changed, 655 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index fcec297..cc45923 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -40,6 +40,7 @@ <module>streams-persist-cassandra</module> <module>streams-persist-console</module> <module>streams-persist-elasticsearch</module> + <module>streams-persist-graph</module> <module>streams-persist-hbase</module> <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/README.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/README.md b/streams-contrib/streams-persist-graph/README.md new file mode 100644 index 0000000..086f5b5 --- /dev/null +++ b/streams-contrib/streams-persist-graph/README.md @@ -0,0 +1,44 @@ +streams-persist-graph +===================== + +Build graph index of stream + +Example Neo4J configuration: + + { + "graph": { + "type": "neo4j", + "protocol": "http", + "hostname": "localhost", + "port": 7474, + "graph": "data" + "vertices": { + "verbs": [ + "post", + "share", + "tweet" + ], + "objectType": "page" + } + }, + } + +Example Rexster configuration: + + { + "graph": { + "type": "rexster", + "protocol": "http", + "hostname": "localhost", + "port": 8182, + "graph": "data", + "vertices": { + "verbs": [ + "post", + "share", + "tweet" + ], + "objectType": "page" + } + }, + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/pom.xml b/streams-contrib/streams-persist-graph/pom.xml new file mode 100644 index 0000000..86458a5 --- /dev/null +++ b/streams-contrib/streams-persist-graph/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>streams-contrib</artifactId> + <groupId>org.apache.streams</groupId> + <version>0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>streams-persist-graph</artifactId> + <name>streams-persist-graph</name> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-http</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>stringtemplate</artifactId> + <version>4.0.2</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.graph.pojo</targetPackage> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>true</useJodaDates> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java new file mode 100644 index 0000000..352bc68 --- /dev/null +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.util.EntityUtils; +import org.apache.streams.components.http.HttpPersistWriterConfiguration; +import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.graph.neo4j.CypherGraphUtil; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class GraphPersistWriter extends SimpleHTTPPostPersistWriter { + + public static final String STREAMS_ID = GraphPersistWriter.class.getCanonicalName(); + + private final static Logger LOGGER = LoggerFactory.getLogger(GraphPersistWriter.class); + private final static long MAX_WRITE_LATENCY = 1000; + + protected GraphWriterConfiguration configuration; + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); + private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public GraphPersistWriter() { + this(GraphConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("blueprints"))); + } + + public GraphPersistWriter(GraphWriterConfiguration configuration) { + super((HttpPersistWriterConfiguration)configuration); + if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) + super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit"); + else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER)) + super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); + this.configuration = configuration; + } + + @Override + protected ObjectNode preparePayload(StreamsDatum entry) { + + Activity activity = null; + + if (entry.getDocument() instanceof Activity) { + activity = (Activity) entry.getDocument(); + } else if (entry.getDocument() instanceof ObjectNode) { + activity = mapper.convertValue(entry.getDocument(), Activity.class); + } else if (entry.getDocument() instanceof String) { + try { + activity = mapper.readValue((String) entry.getDocument(), Activity.class); + } catch (Throwable e) { + LOGGER.warn(e.getMessage()); + } + } + + Preconditions.checkNotNull(activity); + + ObjectNode request = mapper.createObjectNode(); + ArrayNode statements = mapper.createArrayNode(); + + activity.getActor().setObjectType("page"); + + // always add vertices first + // what types of verbs are relevant for adding vertices? + if( configuration.getVertices().getVerbs().contains(activity.getVerb())) { + + // what objects and objectTypes are relevant for adding vertices? + if( configuration.getVertices().getObjects().contains("actor") && + configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) { + statements.add(CypherGraphUtil.mergeVertexRequest(activity.getActor())); + } + if( configuration.getVertices().getObjects().contains("object") && + configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType())) { + statements.add(CypherGraphUtil.mergeVertexRequest(activity.getObject())); + } + if( configuration.getVertices().getObjects().contains("provider") && + configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType())) { + statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider())); + } + if( configuration.getVertices().getObjects().contains("target") && + configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType())) { + statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider())); + } + + } + + // what types of verbs are relevant for adding edges? + if( configuration.getEdges().getVerbs().contains(activity.getVerb())) { + + // what objects and objectTypes are relevant for adding edges? + if( configuration.getEdges().getObjects().contains("actor") && + configuration.getEdges().getObjects().contains("object") && + configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) && + configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType())) { + statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getObject())); + } + if( configuration.getEdges().getObjects().contains("actor") && + configuration.getEdges().getObjects().contains("target") && + configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) && + configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType())) { + statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getTarget())); + } + if( configuration.getEdges().getObjects().contains("provider") && + configuration.getEdges().getObjects().contains("actor") && + configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType()) && + configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())) { + statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getProvider(), activity.getActor())); + } + } + + request.put("statements", statements); + return request; + + } + + @Override + protected ObjectNode executePost(HttpPost httpPost) { + + Preconditions.checkNotNull(httpPost); + + ObjectNode result = null; + + CloseableHttpResponse response = null; + + String entityString = null; + try { + response = httpclient.execute(httpPost); + HttpEntity entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) { + entityString = EntityUtils.toString(entity); + result = mapper.readValue(entityString, ObjectNode.class); + } + LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString); + } catch (IOException e) { + LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage()); + } finally { + try { + response.close(); + } catch (IOException e) {} + } + return result; + } + + @Override + public void prepare(Object configurationObject) { + + super.prepare(configurationObject); + + } + + @Override + public void cleanUp() { + + LOGGER.info("exiting"); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java new file mode 100644 index 0000000..92ee12f --- /dev/null +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java @@ -0,0 +1,147 @@ +package org.apache.streams.graph.neo4j; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.streams.data.util.PropertyUtil; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.stringtemplate.v4.ST; + +import java.util.List; +import java.util.Map; + +/** + * Created by steve on 11/13/14. + */ +public class CypherGraphUtil { + + private final static ObjectMapper mapper = new StreamsJacksonMapper(); + + public final static String statementKey = "statement"; + public final static String paramsKey = "parameters"; + public final static String propsKey = "props"; + + public final static String getVertexStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; + + public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+ + "CREATE UNIQUE (n:<type> { props }) "+ + "RETURN n"; + + public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+ + "ON CREATE SET v:<type>"; + + public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+ + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+ + "RETURN r"; + + public static ObjectNode getVertexRequest(String id) { + + ObjectNode request = mapper.createObjectNode(); + + ST getVertex = new ST(getVertexStatementTemplate); + getVertex.add("id", id); + request.put(statementKey, getVertex.render()); + + return request; + } + + public static ObjectNode createVertexRequest(ActivityObject activityObject) { + + Preconditions.checkNotNull(activityObject.getObjectType()); + + ObjectNode request = mapper.createObjectNode(); + + ST createVertex = new ST(createVertexStatementTemplate); + createVertex.add("id", activityObject.getId()); + createVertex.add("type", activityObject.getObjectType()); + request.put(statementKey, createVertex.render()); + + ObjectNode params = mapper.createObjectNode(); + ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class); + ObjectNode props = PropertyUtil.flattenToObjectNode(object, '_'); + params.put(propsKey, props); + request.put(paramsKey, params); + + return request; + } + + public static ObjectNode mergeVertexRequest(ActivityObject activityObject) { + + Preconditions.checkNotNull(activityObject.getObjectType()); + + ObjectNode request = mapper.createObjectNode(); + + ST mergeVertex = new ST(mergeVertexStatementTemplate); + mergeVertex.add("id", activityObject.getId()); + mergeVertex.add("type", activityObject.getObjectType()); + + ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '_'); + + String statement = mergeVertex.render(); + statement += getPropertySetter(props, "v"); + statement += (" RETURN v;"); + request.put(statementKey, statement); + + return request; + } + + public static ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination) { + + ObjectNode request = mapper.createObjectNode(); + + // set the activityObject's and extensions null, because their properties don't need to appear on the relationship + activity.setActor(null); + activity.setObject(null); + activity.setTarget(null); + activity.getAdditionalProperties().put("extensions", null); + + ObjectNode object = mapper.convertValue(activity, ObjectNode.class); + Map<String, Object> props = PropertyUtil.flattenToMap(object, '_'); + + ST mergeEdge = new ST(createEdgeStatementTemplate); + mergeEdge.add("s_id", source.getId()); + mergeEdge.add("s_type", source.getObjectType()); + mergeEdge.add("d_id", destination.getId()); + mergeEdge.add("d_type", destination.getObjectType()); + mergeEdge.add("r_id", activity.getId()); + mergeEdge.add("r_type", activity.getVerb()); + mergeEdge.add("r_props", getPropertyCreater(props)); + + String statement = mergeEdge.render(); + request.put(statementKey, statement); + + return request; + } + + public static String getPropertySetter(Map<String, Object> map, String symbol) { + StringBuilder builder = new StringBuilder(); + for( Map.Entry<String, Object> entry : map.entrySet()) { + if( entry.getValue() instanceof String ) { + String propVal = (String)(entry.getValue()); + builder.append("," + symbol + "." + entry.getKey() + " = '" + propVal + "'"); + } + } + return builder.toString(); + } + + public static String getPropertyCreater(Map<String, Object> map) { + StringBuilder builder = new StringBuilder(); + builder.append("{"); + List<String> parts = Lists.newArrayList(); + for( Map.Entry<String, Object> entry : map.entrySet()) { + if( entry.getValue() instanceof String ) { + String propVal = (String) (entry.getValue()); + parts.add(entry.getKey() + ":'" + propVal + "'"); + } + } + builder.append(Joiner.on(",").join(parts)); + builder.append("}"); + return builder.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json new file mode 100644 index 0000000..1e1fac4 --- /dev/null +++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json @@ -0,0 +1,19 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.graph.GraphConfiguration", + "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpPersistWriterConfiguration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "type": { + "type": "string", + "description": "Graph DB type", + "enum" : ["neo4j", "rexster"] + }, + "graph": { + "type": "string", + "description": "Graph DB Graph ID" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json new file mode 100644 index 0000000..f9e3868 --- /dev/null +++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json @@ -0,0 +1,30 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "objects": { + "type": "array", + "required": false, + "items": { + "type": "string" + } + }, + "verbs": { + "type": "array", + "required": false, + "items": { + "type": "string" + } + }, + "objectTypes": { + "type": "array", + "required": false, + "items": { + "type": "string" + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json new file mode 100644 index 0000000..798f4f6 --- /dev/null +++ b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json @@ -0,0 +1,30 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "objects": { + "type": "array", + "required": false, + "items": { + "type": "string" + } + }, + "verbs": { + "type": "array", + "required": false, + "items": { + "type": "string" + } + }, + "objectTypes": { + "type": "array", + "required": false, + "items": { + "type": "string" + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java index 56b7005..3407da7 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java @@ -87,6 +87,7 @@ public class TwitterActivityUtil { public static void updateActivity(User user, Activity activity) throws ActivitySerializerException { activity.setActor(buildActor(user)); activity.setId(null); + activity.setVerb(null); } /** @@ -114,6 +115,7 @@ public class TwitterActivityUtil { public static Actor buildActor(Delete delete) { Actor actor = new Actor(); actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr())); + actor.setObjectType("page"); return actor; } @@ -161,7 +163,7 @@ public class TwitterActivityUtil { .orNull(); if( id != null ) actObj.setId(id); - actObj.setObjectType("tweet"); + actObj.setObjectType("post"); actObj.setContent(tweet.getText()); return actObj; } @@ -191,6 +193,7 @@ public class TwitterActivityUtil { .or(Optional.of(user.getId().toString())) .orNull() )); + actor.setObjectType("page"); actor.setDisplayName(user.getName()); actor.setAdditionalProperty("handle", user.getScreenName()); @@ -267,7 +270,9 @@ public class TwitterActivityUtil { public static Provider getProvider() { Provider provider = new Provider(); provider.setId("id:providers:twitter"); + provider.setObjectType("application"); provider.setDisplayName("Twitter"); + return provider; } /** http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d471f34d/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java ---------------------------------------------------------------------- diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java new file mode 100644 index 0000000..dbdff3b --- /dev/null +++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/PropertyUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.data.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ValueNode; +import com.google.common.collect.Maps; +import org.apache.streams.jackson.StreamsJacksonMapper; + +import java.util.Iterator; +import java.util.Map; + +/** + * Class transforms nested properties of activities, actors, objects, etc... + */ +public class PropertyUtil { + + /** + * Property on the activity object to use for extensions + */ + public static final String EXTENSION_PROPERTY = "extensions"; + + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + public static Map<String, Object> flattenToMap(ObjectNode object) { + Map<String, Object> flatObject = Maps.newHashMap(); + addKeys(new String(), object, flatObject, '.'); + return flatObject; + } + + public static ObjectNode flattenToObjectNode(ObjectNode object) { + Map<String, Object> flatObject = flattenToMap(object, '.'); + addKeys(new String(), object, flatObject, '.'); + return mapper.convertValue(flatObject, ObjectNode.class); + } + + public static Map<String, Object> flattenToMap(ObjectNode object, char seperator) { + Map<String, Object> flatObject = Maps.newHashMap(); + addKeys(new String(), object, flatObject, seperator); + return flatObject; + } + + public static ObjectNode flattenToObjectNode(ObjectNode object, char seperator) { + Map<String, Object> flatObject = flattenToMap(object, seperator); + addKeys(new String(), object, flatObject, seperator); + return mapper.convertValue(flatObject, ObjectNode.class); + } + + private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, Object> map, char seperator) { + if (jsonNode.isObject()) { + ObjectNode objectNode = (ObjectNode) jsonNode; + Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields(); + String pathPrefix = currentPath.isEmpty() ? "" : currentPath + seperator; + + while (iter.hasNext()) { + Map.Entry<String, JsonNode> entry = iter.next(); + addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, seperator); + } + } else if (jsonNode.isArray()) { + ArrayNode arrayNode = (ArrayNode) jsonNode; + map.put(currentPath, arrayNode); + } else if (jsonNode.isValueNode()) { + ValueNode valueNode = (ValueNode) jsonNode; + if( valueNode.isTextual() ) + map.put(currentPath, valueNode.asText()); + else if ( valueNode.isNumber() ) + map.put(currentPath, valueNode); + } + } +}