http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java deleted file mode 100644 index 9183a72..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.graph; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpPersistWriterConfiguration; -import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.graph.neo4j.CypherGraphUtil; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class GraphPersistWriter extends SimpleHTTPPostPersistWriter { - - public static final String STREAMS_ID = GraphPersistWriter.class.getCanonicalName(); - - private final static Logger LOGGER = LoggerFactory.getLogger(GraphPersistWriter.class); - private final static long MAX_WRITE_LATENCY = 1000; - - protected GraphWriterConfiguration configuration; - - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); - private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); - - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - public GraphPersistWriter() { - this(new ComponentConfigurator<GraphWriterConfiguration>(GraphWriterConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); - } - - public GraphPersistWriter(GraphWriterConfiguration configuration) { - super((HttpPersistWriterConfiguration)configuration); - if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J)) - super.configuration.setResourcePath("/db/" + configuration.getGraph() + "/transaction/commit"); - else if( configuration.getType().equals(GraphConfiguration.Type.REXSTER)) - super.configuration.setResourcePath("/graphs/" + configuration.getGraph()); - this.configuration = configuration; - } - - @Override - protected ObjectNode preparePayload(StreamsDatum entry) { - - Activity activity = null; - - if (entry.getDocument() instanceof Activity) { - activity = (Activity) entry.getDocument(); - } else if (entry.getDocument() instanceof ObjectNode) { - activity = mapper.convertValue(entry.getDocument(), Activity.class); - } else if (entry.getDocument() instanceof String) { - try { - activity = mapper.readValue((String) entry.getDocument(), Activity.class); - } catch (Throwable e) { - LOGGER.warn(e.getMessage()); - } - } - - Preconditions.checkNotNull(activity); - - ObjectNode request = mapper.createObjectNode(); - ArrayNode statements = mapper.createArrayNode(); - - activity.getActor().setObjectType("page"); - - // always add vertices first - // what types of verbs are relevant for adding vertices? - if( configuration.getVertices().getVerbs().contains(activity.getVerb())) { - - // what objects and objectTypes are relevant for adding vertices? - if( configuration.getVertices().getObjects().contains("actor") && - configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) { - statements.add(CypherGraphUtil.mergeVertexRequest(activity.getActor())); - } - if( configuration.getVertices().getObjects().contains("object") && - configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType())) { - statements.add(CypherGraphUtil.mergeVertexRequest(activity.getObject())); - } - if( configuration.getVertices().getObjects().contains("provider") && - configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType())) { - statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider())); - } - if( configuration.getVertices().getObjects().contains("target") && - configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType())) { - statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider())); - } - - } - - // what types of verbs are relevant for adding edges? - if( configuration.getEdges().getVerbs().contains(activity.getVerb())) { - - // what objects and objectTypes are relevant for adding edges? - if( configuration.getEdges().getObjects().contains("actor") && - configuration.getEdges().getObjects().contains("object") && - configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) && - configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType())) { - statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getObject())); - } - if( configuration.getEdges().getObjects().contains("actor") && - configuration.getEdges().getObjects().contains("target") && - configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) && - configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType())) { - statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getActor(), activity.getTarget())); - } - if( configuration.getEdges().getObjects().contains("provider") && - configuration.getEdges().getObjects().contains("actor") && - configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType()) && - configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())) { - statements.add(CypherGraphUtil.createEdgeRequest(activity, activity.getProvider(), activity.getActor())); - } - } - - request.put("statements", statements); - return request; - - } - - @Override - protected ObjectNode executePost(HttpPost httpPost) { - - Preconditions.checkNotNull(httpPost); - - ObjectNode result = null; - - CloseableHttpResponse response = null; - - String entityString = null; - try { - response = httpclient.execute(httpPost); - HttpEntity entity = response.getEntity(); - if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 && entity != null) { - entityString = EntityUtils.toString(entity); - result = mapper.readValue(entityString, ObjectNode.class); - } - LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), response.getStatusLine().getStatusCode(), entityString); - } catch (IOException e) { - LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, e.getMessage()); - } finally { - try { - response.close(); - } catch (IOException e) {} - } - return result; - } - - @Override - public void prepare(Object configurationObject) { - - super.prepare(configurationObject); - - } - - @Override - public void cleanUp() { - - LOGGER.info("exiting"); - - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java deleted file mode 100644 index 92ee12f..0000000 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java +++ /dev/null @@ -1,147 +0,0 @@ -package org.apache.streams.graph.neo4j; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.streams.data.util.PropertyUtil; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.stringtemplate.v4.ST; - -import java.util.List; -import java.util.Map; - -/** - * Created by steve on 11/13/14. - */ -public class CypherGraphUtil { - - private final static ObjectMapper mapper = new StreamsJacksonMapper(); - - public final static String statementKey = "statement"; - public final static String paramsKey = "parameters"; - public final static String propsKey = "props"; - - public final static String getVertexStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; - - public final static String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) "+ - "CREATE UNIQUE (n:<type> { props }) "+ - "RETURN n"; - - public final static String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) "+ - "ON CREATE SET v:<type>"; - - public final static String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+ - "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+ - "RETURN r"; - - public static ObjectNode getVertexRequest(String id) { - - ObjectNode request = mapper.createObjectNode(); - - ST getVertex = new ST(getVertexStatementTemplate); - getVertex.add("id", id); - request.put(statementKey, getVertex.render()); - - return request; - } - - public static ObjectNode createVertexRequest(ActivityObject activityObject) { - - Preconditions.checkNotNull(activityObject.getObjectType()); - - ObjectNode request = mapper.createObjectNode(); - - ST createVertex = new ST(createVertexStatementTemplate); - createVertex.add("id", activityObject.getId()); - createVertex.add("type", activityObject.getObjectType()); - request.put(statementKey, createVertex.render()); - - ObjectNode params = mapper.createObjectNode(); - ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class); - ObjectNode props = PropertyUtil.flattenToObjectNode(object, '_'); - params.put(propsKey, props); - request.put(paramsKey, params); - - return request; - } - - public static ObjectNode mergeVertexRequest(ActivityObject activityObject) { - - Preconditions.checkNotNull(activityObject.getObjectType()); - - ObjectNode request = mapper.createObjectNode(); - - ST mergeVertex = new ST(mergeVertexStatementTemplate); - mergeVertex.add("id", activityObject.getId()); - mergeVertex.add("type", activityObject.getObjectType()); - - ObjectNode object = mapper.convertValue(activityObject, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '_'); - - String statement = mergeVertex.render(); - statement += getPropertySetter(props, "v"); - statement += (" RETURN v;"); - request.put(statementKey, statement); - - return request; - } - - public static ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination) { - - ObjectNode request = mapper.createObjectNode(); - - // set the activityObject's and extensions null, because their properties don't need to appear on the relationship - activity.setActor(null); - activity.setObject(null); - activity.setTarget(null); - activity.getAdditionalProperties().put("extensions", null); - - ObjectNode object = mapper.convertValue(activity, ObjectNode.class); - Map<String, Object> props = PropertyUtil.flattenToMap(object, '_'); - - ST mergeEdge = new ST(createEdgeStatementTemplate); - mergeEdge.add("s_id", source.getId()); - mergeEdge.add("s_type", source.getObjectType()); - mergeEdge.add("d_id", destination.getId()); - mergeEdge.add("d_type", destination.getObjectType()); - mergeEdge.add("r_id", activity.getId()); - mergeEdge.add("r_type", activity.getVerb()); - mergeEdge.add("r_props", getPropertyCreater(props)); - - String statement = mergeEdge.render(); - request.put(statementKey, statement); - - return request; - } - - public static String getPropertySetter(Map<String, Object> map, String symbol) { - StringBuilder builder = new StringBuilder(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String)(entry.getValue()); - builder.append("," + symbol + "." + entry.getKey() + " = '" + propVal + "'"); - } - } - return builder.toString(); - } - - public static String getPropertyCreater(Map<String, Object> map) { - StringBuilder builder = new StringBuilder(); - builder.append("{"); - List<String> parts = Lists.newArrayList(); - for( Map.Entry<String, Object> entry : map.entrySet()) { - if( entry.getValue() instanceof String ) { - String propVal = (String) (entry.getValue()); - parts.add(entry.getKey() + ":'" + propVal + "'"); - } - } - builder.append(Joiner.on(",").join(parts)); - builder.append("}"); - return builder.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json deleted file mode 100644 index 1e1fac4..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.graph.GraphConfiguration", - "extends" : {"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpPersistWriterConfiguration.json"}, - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "type": { - "type": "string", - "description": "Graph DB type", - "enum" : ["neo4j", "rexster"] - }, - "graph": { - "type": "string", - "description": "Graph DB Graph ID" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json deleted file mode 100644 index f9e3868..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "objects": { - "type": "array", - "required": false, - "items": { - "type": "string" - } - }, - "verbs": { - "type": "array", - "required": false, - "items": { - "type": "string" - } - }, - "objectTypes": { - "type": "array", - "required": false, - "items": { - "type": "string" - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json deleted file mode 100644 index 798f4f6..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "objects": { - "type": "array", - "required": false, - "items": { - "type": "string" - } - }, - "verbs": { - "type": "array", - "required": false, - "items": { - "type": "string" - } - }, - "objectTypes": { - "type": "array", - "required": false, - "items": { - "type": "string" - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json deleted file mode 100644 index 1e059d8..0000000 --- a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "type": "object", - "$schema": "http://json-schema.org/draft-03/schema", - "id": "#", - "javaType" : "org.apache.streams.graph.GraphWriterConfiguration", - "extends" : {"$ref":"GraphConfiguration.json"}, - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "vertices": { - "type": "object", - "javaType": "org.apache.streams.graph.GraphVertexWriterConfiguration" - }, - "edges": { - "type": "object", - "javaType": "org.apache.streams.graph.GraphEdgeWriterConfiguration" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/pom.xml b/streams-contrib/streams-processor-jackson/pom.xml new file mode 100644 index 0000000..8bb44fd --- /dev/null +++ b/streams-contrib/streams-processor-jackson/pom.xml @@ -0,0 +1,86 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <artifactId>streams-processor-jackson</artifactId> + <version>0.1-SNAPSHOT</version> + + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-contrib</artifactId> + <version>0.1-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path-assert</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java new file mode 100644 index 0000000..56b0c5c --- /dev/null +++ b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java @@ -0,0 +1,62 @@ +package org.apache.streams.jackson; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * This processor walks an input objectnode and corrects any artifacts + * that may have occured from improper serialization of jsonschema2pojo beans. + * + * The logic is also available for inclusion in other module via static import. + */ +public class CleanAdditionalPropertiesProcessor implements StreamsProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class); + + private ObjectMapper mapper; + + @Override + public List<StreamsDatum> process(StreamsDatum datum) { + List<StreamsDatum> result = Lists.newLinkedList(); + ObjectNode activity = this.mapper.convertValue(datum.getDocument(), ObjectNode.class); + cleanAdditionalProperties(activity); + datum.setDocument(activity); + result.add(datum); + return result; + } + + @Override + public void prepare(Object o) { + this.mapper = StreamsJacksonMapper.getInstance(); + this.mapper.registerModule(new JsonOrgModule()); + } + + @Override + public void cleanUp() { + + } + + public static void cleanAdditionalProperties(ObjectNode node) { + if( node.get("additionalProperties") != null ) { + ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties"); + cleanAdditionalProperties(additionalProperties); + Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = additionalProperties.fields(); + while( jsonNodeIterator.hasNext() ) { + Map.Entry<String, JsonNode> entry = jsonNodeIterator.next(); + node.put(entry.getKey(), entry.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java new file mode 100644 index 0000000..63d03e9 --- /dev/null +++ b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java @@ -0,0 +1,112 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.streams.jackson; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * + */ +public class TypeConverterProcessor implements StreamsProcessor { + + private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class); + + private List<String> formats = Lists.newArrayList(); + + private ObjectMapper mapper; + + private Class inClass; + private Class outClass; + + public TypeConverterProcessor(Class inClass, Class outClass, ObjectMapper mapper) { + this.inClass = inClass; + this.outClass = outClass; + this.mapper = mapper; + } + + public TypeConverterProcessor(Class inClass, Class outClass, List<String> formats) { + this.inClass = inClass; + this.outClass = outClass; + this.formats = formats; + } + + public TypeConverterProcessor(Class inClass, Class outClass) { + this.inClass = inClass; + this.outClass = outClass; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + List<StreamsDatum> result = Lists.newLinkedList(); + Object inDoc = entry.getDocument(); + ObjectNode node = null; + if( inClass == String.class || + inDoc instanceof String ) { + try { + node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + node = this.mapper.convertValue(inDoc, ObjectNode.class); + } + + if(node != null) { + Object outDoc; + try { + if( outClass == String.class ) + outDoc = this.mapper.writeValueAsString(node); + else + outDoc = this.mapper.convertValue(node, outClass); + + StreamsDatum outDatum = new StreamsDatum(outDoc, entry.getId(), entry.getTimestamp(), entry.getSequenceid()); + outDatum.setMetadata(entry.getMetadata()); + result.add(outDatum); + } catch (Throwable e) { + LOGGER.warn(e.getMessage()); + LOGGER.warn(node.toString()); + } + } + + return result; + } + + @Override + public void prepare(Object configurationObject) { + if( formats.size() > 0 ) + this.mapper = StreamsJacksonMapper.getInstance(formats); + else + this.mapper = StreamsJacksonMapper.getInstance(); + } + + @Override + public void cleanUp() { + + } +}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java new file mode 100644 index 0000000..1316d5c --- /dev/null +++ b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.jackson.test; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.jackson.TypeConverterProcessor; +import org.apache.streams.pojo.json.Activity; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static junit.framework.Assert.*; + +/** + * + */ +public class TypeConverterProcessorTest { + + private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\ "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu ...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit le\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709 31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_ https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}"; + + public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z"; + + @Test + public void testTypeConverterStringToString() { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(String.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof String); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterStringToObjectNode() { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(String.class, ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof ObjectNode); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterObjectNodeToString() throws IOException { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT)); + ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class); + StreamsDatum datum = new StreamsDatum(node, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof String); + assertEquals(ID, resultDatum.getId()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml index 23165af..ae96d4e 100644 --- a/streams-contrib/streams-provider-datasift/pom.xml +++ b/streams-contrib/streams-provider-datasift/pom.xml @@ -37,7 +37,7 @@ </dependency> <dependency> <groupId>org.apache.streams</groupId> - <artifactId>streams-converters</artifactId> + <artifactId>streams-processor-jackson</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java index bc982e9..a4a4b5a 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java @@ -23,7 +23,8 @@ import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.serializer.DatasiftActivityConverter; +import org.apache.streams.datasift.provider.DatasiftConverter; +import org.apache.streams.datasift.serializer.DatasiftActivitySerializer; import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; @@ -41,7 +42,7 @@ public class DatasiftActivitySerializerProcessor implements StreamsProcessor { private ObjectMapper mapper; private Class outClass; - private DatasiftActivityConverter datasiftActivitySerializer; + private DatasiftActivitySerializer datasiftActivitySerializer; public final static String TERMINATE = new String("TERMINATE"); @@ -76,8 +77,8 @@ public class DatasiftActivitySerializerProcessor implements StreamsProcessor { @Override public void prepare(Object configurationObject) { - this.mapper = StreamsJacksonMapper.getInstance(StreamsDatasiftMapper.DATASIFT_FORMAT); - this.datasiftActivitySerializer = new DatasiftActivityConverter(); + this.mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT)); + this.datasiftActivitySerializer = new DatasiftActivitySerializer(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java index a0d2fc3..1166b2e 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java @@ -25,10 +25,10 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.datasift.Datasift; import org.apache.streams.datasift.provider.DatasiftConverter; -import org.apache.streams.datasift.serializer.DatasiftActivityConverter; +import org.apache.streams.datasift.serializer.DatasiftActivitySerializer; import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.apache.streams.pojo.json.Activity; -import org.apache.streams.converter.CleanAdditionalPropertiesProcessor; +import org.apache.streams.jackson.CleanAdditionalPropertiesProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +43,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor { private ObjectMapper mapper; private Class outClass; - private DatasiftActivityConverter datasiftInteractionActivitySerializer; + private DatasiftActivitySerializer datasiftInteractionActivitySerializer; private DatasiftConverter converter; public final static String TERMINATE = new String("TERMINATE"); @@ -75,7 +75,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor { @Override public void prepare(Object configurationObject) { this.mapper = StreamsDatasiftMapper.getInstance(); - this.datasiftInteractionActivitySerializer = new DatasiftActivityConverter(); + this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer(); if(this.outClass.equals(Activity.class)) { this.converter = new ActivityConverter(); } else if (this.outClass.equals(String.class)) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java deleted file mode 100644 index 65bebce..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.datasift.serializer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.data.ActivityConverterFactory; -import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.twitter.Twitter; -import org.apache.streams.datasift.util.StreamsDatasiftMapper; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; - -import java.util.List; - -/** - * We should be able to @Deprecate this soon and adopt ActivityConverterProcessor - */ -public class DatasiftActivityConverter implements ActivityConverter<Datasift> { - - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(StreamsDatasiftMapper.DATASIFT_FORMAT); - - private static DatasiftActivityConverter instance = new DatasiftActivityConverter(); - - public static DatasiftActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public Datasift serialize(Activity deserialized) throws ActivitySerializerException { - return null; - } - - @Override - public Activity deserialize(Datasift serialized) throws ActivitySerializerException { - Class detectedClass = DatasiftEventClassifier.getInstance().detectClass(serialized); - Class converterClass = DatasiftConverterResolver.getInstance().bestSerializer(detectedClass); - ActivityConverter serializer = ActivityConverterFactory.getInstance(converterClass); - return serializer.deserialize(serialized); - } - - public Activity deserialize(String json) throws ActivitySerializerException { - try { - return deserialize(MAPPER.readValue(json, Datasift.class)); - } catch (Exception e) { - throw new ActivitySerializerException(e); - } - } - - @Override - public List<Activity> deserializeAll(List<Datasift> serializedList) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java new file mode 100644 index 0000000..b587cd6 --- /dev/null +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.datasift.serializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.datasift.Datasift; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; +import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.pojo.json.Activity; + +import java.util.List; + +/** + * + */ +public class DatasiftActivitySerializer implements ActivitySerializer<Datasift> { + + private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance(); + + @Override + public String serializationFormat() { + return null; + } + + @Override + public Datasift serialize(Activity deserialized) throws ActivitySerializerException { + return null; + } + + @Override + public Activity deserialize(Datasift serialized) throws ActivitySerializerException { + ActivitySerializer serializer = DatasiftEventClassifier.bestSerializer(serialized); + return serializer.deserialize(serialized); + } + + public Activity deserialize(String json) throws ActivitySerializerException { + try { + return deserialize(MAPPER.readValue(json, Datasift.class)); + } catch (Exception e) { + throw new ActivitySerializerException(e); + } + } + + @Override + public List<Activity> deserializeAll(List<Datasift> serializedList) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java deleted file mode 100644 index edeab34..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.datasift.serializer; - -import org.apache.streams.data.ActivityConverterResolver; -import org.apache.streams.datasift.instagram.Instagram; -import org.apache.streams.datasift.twitter.Twitter; - -/** - * Ensures datasift documents can be converted to Activity - */ -public class DatasiftConverterResolver implements ActivityConverterResolver { - - public DatasiftConverterResolver() { - - } - - private static DatasiftConverterResolver instance = new DatasiftConverterResolver(); - - public static DatasiftConverterResolver getInstance() { - return instance; - } - - public Class bestSerializer(Class documentClass) { - - if(documentClass == Twitter.class) { - return DatasiftTwitterActivityConverter.class; - } else if(documentClass == Instagram.class) { - return DatasiftInstagramActivityConverter.class; - } else { - return DatasiftInteractionActivityConverter.class; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java index 226c3f6..7d7d547 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java @@ -18,39 +18,36 @@ package org.apache.streams.datasift.serializer; -import com.google.common.base.Preconditions; -import org.apache.streams.data.DocumentClassifier; +import org.apache.streams.data.ActivitySerializer; import org.apache.streams.datasift.Datasift; import org.apache.streams.datasift.instagram.Instagram; import org.apache.streams.datasift.interaction.Interaction; import org.apache.streams.datasift.twitter.Twitter; /** - * Ensures datasift documents can be converted to Activity + * Created by sblackmon on 11/6/14. */ -public class DatasiftEventClassifier implements DocumentClassifier { +public class DatasiftEventClassifier { - public DatasiftEventClassifier() { + public static Class detectClass(Datasift event) { - } - - private static DatasiftEventClassifier instance = new DatasiftEventClassifier(); - - public static DatasiftEventClassifier getInstance() { - return instance; - } - - public Class detectClass(Object document) { - - Preconditions.checkArgument(document instanceof Datasift); - Datasift datasift = (Datasift)document; - if(datasift.getTwitter() != null) { + if(event.getTwitter() != null) { return Twitter.class; - } else if(datasift.getInstagram() != null) { + } else if(event.getInstagram() != null) { return Instagram.class; } else { return Interaction.class; } } + public static ActivitySerializer bestSerializer(Datasift event) { + + if(event.getTwitter() != null) { + return DatasiftTwitterActivitySerializer.getInstance(); + } else if(event.getInstagram() != null) { + return DatasiftInstagramActivitySerializer.getInstance(); + } else { + return DatasiftInteractionActivitySerializer.getInstance(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java deleted file mode 100644 index eade439..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java +++ /dev/null @@ -1,124 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.streams.datasift.serializer; - -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.instagram.From; -import org.apache.streams.datasift.instagram.Instagram; -import org.apache.streams.instagram.serializer.util.InstagramActivityUtil; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.Actor; -import org.apache.streams.pojo.json.Image; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; - -/** - * - */ -public class DatasiftInstagramActivityConverter extends DatasiftInteractionActivityConverter { - - private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivityConverter.class); - - private static DatasiftInstagramActivityConverter instance = new DatasiftInstagramActivityConverter(); - - public static DatasiftInstagramActivityConverter getInstance() { - return instance; - } - - @Override - public Activity convert(Datasift event) { - Activity activity = super.convert(event); - - Instagram instagram = event.getInstagram(); - - activity.setActor(buildActor(event, instagram)); - activity.setId(formatId(activity.getVerb(), event.getInteraction().getId())); - activity.setProvider(InstagramActivityUtil.getProvider()); - activity.setLinks(getLinks(event.getInstagram())); - - activity.setVerb(selectVerb(event)); - addInstagramExtensions(activity, instagram); - return activity; - } - - /** - * Gets links from the object - * @return - */ - private List<String> getLinks(Instagram instagram) { - List<String> result = Lists.newLinkedList(); - if( instagram.getMedia() != null ) { - result.add(instagram.getMedia().getImage()); - result.add(instagram.getMedia().getVideo()); - } - return result; - } - - public Actor buildActor(Datasift event, Instagram instagram) { - Actor actor = super.buildActor(event.getInteraction()); - From user = instagram.getFrom(); - - actor.setDisplayName(firstStringIfNotNull(user.getFullName())); - actor.setId(formatId(Optional.fromNullable( - user.getId()) - .or(actor.getId()))); - - Image profileImage = new Image(); - String profileUrl = null; - profileUrl = user.getProfilePicture(); - profileImage.setUrl(profileUrl); - actor.setImage(profileImage); - - return actor; - } - - public void addInstagramExtensions(Activity activity, Instagram instagram) { - Map<String, Object> extensions = ensureExtensions(activity); - List<String> hashTags; - if(instagram.getMedia() != null) { - hashTags = instagram.getMedia().getTags(); - extensions.put("hashtags", hashTags); - extensions.put("keywords", activity.getContent()); - } else { - extensions.put("keywords", activity.getContent()); - - } - - } - - private String selectVerb(Datasift event) { - if( event.getInteraction().getSubtype().equals("like")) - return "like"; - else - return "post"; - } - - public static String formatId(String... idparts) { - return Joiner.on(":").join(Lists.asList("id:instagram", idparts)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java new file mode 100644 index 0000000..d121d65 --- /dev/null +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java @@ -0,0 +1,124 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.streams.datasift.serializer; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import org.apache.streams.datasift.Datasift; +import org.apache.streams.datasift.instagram.From; +import org.apache.streams.datasift.instagram.Instagram; +import org.apache.streams.instagram.serializer.util.InstagramActivityUtil; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.Actor; +import org.apache.streams.pojo.json.Image; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; + +/** + * + */ +public class DatasiftInstagramActivitySerializer extends DatasiftInteractionActivitySerializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivitySerializer.class); + + private static DatasiftInstagramActivitySerializer instance = new DatasiftInstagramActivitySerializer(); + + public static DatasiftInstagramActivitySerializer getInstance() { + return instance; + } + + @Override + public Activity convert(Datasift event) { + Activity activity = super.convert(event); + + Instagram instagram = event.getInstagram(); + + activity.setActor(buildActor(event, instagram)); + activity.setId(formatId(activity.getVerb(), event.getInteraction().getId())); + activity.setProvider(InstagramActivityUtil.getProvider()); + activity.setLinks(getLinks(event.getInstagram())); + + activity.setVerb(selectVerb(event)); + addInstagramExtensions(activity, instagram); + return activity; + } + + /** + * Gets links from the object + * @return + */ + private List<String> getLinks(Instagram instagram) { + List<String> result = Lists.newLinkedList(); + if( instagram.getMedia() != null ) { + result.add(instagram.getMedia().getImage()); + result.add(instagram.getMedia().getVideo()); + } + return result; + } + + public Actor buildActor(Datasift event, Instagram instagram) { + Actor actor = super.buildActor(event.getInteraction()); + From user = instagram.getFrom(); + + actor.setDisplayName(firstStringIfNotNull(user.getFullName())); + actor.setId(formatId(Optional.fromNullable( + user.getId()) + .or(actor.getId()))); + + Image profileImage = new Image(); + String profileUrl = null; + profileUrl = user.getProfilePicture(); + profileImage.setUrl(profileUrl); + actor.setImage(profileImage); + + return actor; + } + + public void addInstagramExtensions(Activity activity, Instagram instagram) { + Map<String, Object> extensions = ensureExtensions(activity); + List<String> hashTags; + if(instagram.getMedia() != null) { + hashTags = instagram.getMedia().getTags(); + extensions.put("hashtags", hashTags); + extensions.put("keywords", activity.getContent()); + } else { + extensions.put("keywords", activity.getContent()); + + } + + } + + private String selectVerb(Datasift event) { + if( event.getInteraction().getSubtype().equals("like")) + return "like"; + else + return "post"; + } + + public static String formatId(String... idparts) { + return Joiner.on(":").join(Lists.asList("id:instagram", idparts)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java deleted file mode 100644 index da21006..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java +++ /dev/null @@ -1,222 +0,0 @@ -package org.apache.streams.datasift.serializer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.streams.data.ActivityConverter; -import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.interaction.Interaction; -import org.apache.streams.datasift.links.Links; -import org.apache.streams.datasift.util.StreamsDatasiftMapper; -import org.apache.streams.pojo.json.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; - -/** - * - */ -public class DatasiftInteractionActivityConverter implements ActivityConverter<Datasift> { - - private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInteractionActivityConverter.class); - - private static DatasiftInteractionActivityConverter instance = new DatasiftInteractionActivityConverter(); - - public static DatasiftInteractionActivityConverter getInstance() { - return instance; - } - - ObjectMapper mapper = StreamsDatasiftMapper.getInstance(); - - @Override - public String serializationFormat() { - return "application/json+datasift.com.v1.1"; - } - - @Override - public Datasift serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON"); - } - - public Activity deserialize(String datasiftJson) { - try { - return deserialize(this.mapper.readValue(datasiftJson, Datasift.class)); - } catch (Exception e) { - LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson); - LOGGER.error("Exception : {}", e); - throw new RuntimeException(e); - } - } - - @Override - public Activity deserialize(Datasift serialized) { - - try { - - Activity activity = convert(serialized); - - return activity; - - } catch (Exception e) { - throw new IllegalArgumentException("Unable to deserialize", e); - } - - } - - @Override - public List<Activity> deserializeAll(List<Datasift> datasifts) { - List<Activity> activities = Lists.newArrayList(); - for( Datasift datasift : datasifts ) { - activities.add(deserialize(datasift)); - } - return activities; - } - - public static Generator buildGenerator(Interaction interaction) { - Generator generator = new Generator(); - generator.setDisplayName(interaction.getSource()); - generator.setId(interaction.getSource()); - return generator; - } - - public static Icon getIcon(Interaction interaction) { - return null; - } - - public static Provider buildProvider(Interaction interaction) { - Provider provider = new Provider(); - provider.setId("id:providers:"+interaction.getType()); - provider.setDisplayName(interaction.getType()); - return provider; - } - - public static String getUrls(Interaction interaction) { - return null; - } - - public static void addDatasiftExtension(Activity activity, Datasift datasift) { - Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity); - extensions.put("datasift", datasift); - } - - public static String formatId(String... idparts) { - return Joiner.on(":").join(Lists.asList("id:datasift", idparts)); - } - - public Activity convert(Datasift event) { - - Preconditions.checkNotNull(event); - Preconditions.checkNotNull(event.getInteraction()); - - Activity activity = new Activity(); - activity.setActor(buildActor(event.getInteraction())); - activity.setVerb(selectVerb(event)); - activity.setObject(buildActivityObject(event.getInteraction())); - activity.setId(formatId(activity.getVerb(), event.getInteraction().getId())); - activity.setTarget(buildTarget(event.getInteraction())); - activity.setPublished(event.getInteraction().getCreatedAt()); - activity.setGenerator(buildGenerator(event.getInteraction())); - activity.setIcon(getIcon(event.getInteraction())); - activity.setProvider(buildProvider(event.getInteraction())); - activity.setTitle(event.getInteraction().getTitle()); - activity.setContent(event.getInteraction().getContent()); - activity.setUrl(event.getInteraction().getLink()); - activity.setLinks(getLinks(event)); - addDatasiftExtension(activity, event); - if( event.getInteraction().getGeo() != null) { - addLocationExtension(activity, event.getInteraction()); - } - return activity; - } - - private String selectVerb(Datasift event) { - return "post"; - } - - public Actor buildActor(Interaction interaction) { - Actor actor = new Actor(); - org.apache.streams.datasift.interaction.Author author = interaction.getAuthor(); - if(author == null) { - LOGGER.warn("Interaction does not contain author information."); - return actor; - } - String userName = author.getUsername(); - String name = author.getName(); - Long id = author.getId(); - if(userName != null) { - actor.setDisplayName(userName); - } else { - actor.setDisplayName(name); - } - - if(id != null) { - actor.setId(id.toString()); - } else { - if(userName != null) - actor.setId(userName); - else - actor.setId(name); - } - Image image = new Image(); - image.setUrl(interaction.getAuthor().getAvatar()); - actor.setImage(image); - if (interaction.getAuthor().getLink()!=null){ - actor.setUrl(interaction.getAuthor().getLink()); - } - return actor; - } - - public static ActivityObject buildActivityObject(Interaction interaction) { - ActivityObject actObj = new ActivityObject(); - actObj.setObjectType(interaction.getContenttype()); - actObj.setUrl(interaction.getLink()); - actObj.setId(formatId("post",interaction.getId())); - actObj.setContent(interaction.getContent()); - - return actObj; - } - - public static List<String> getLinks(Datasift event) { - List<String> result = Lists.newArrayList(); - Links links = event.getLinks(); - if(links == null) - return null; - for(Object link : links.getNormalizedUrl()) { - if(link != null) { - if(link instanceof String) { - result.add((String) link); - } else { - LOGGER.warn("link is not of type String : {}", link.getClass().getName()); - } - } - } - return result; - } - - public static ActivityObject buildTarget(Interaction interaction) { - return null; - } - - public static void addLocationExtension(Activity activity, Interaction interaction) { - Map<String, Object> extensions = ensureExtensions(activity); - Map<String, Object> location = new HashMap<String, Object>(); - Map<String, Double> coordinates = new HashMap<String, Double>(); - coordinates.put("latitude", interaction.getGeo().getLatitude()); - coordinates.put("longitude", interaction.getGeo().getLongitude()); - location.put("coordinates", coordinates); - extensions.put("location", location); - } - - public static String firstStringIfNotNull(List<Object> list) { - if( list != null && list.size() > 0) { - return (String) list.get(0); - } else return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java new file mode 100644 index 0000000..c856dc2 --- /dev/null +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java @@ -0,0 +1,222 @@ +package org.apache.streams.datasift.serializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.datasift.Datasift; +import org.apache.streams.datasift.interaction.Interaction; +import org.apache.streams.datasift.links.Links; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; +import org.apache.streams.pojo.json.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; + +/** + * + */ +public class DatasiftInteractionActivitySerializer implements ActivitySerializer<Datasift>, Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInteractionActivitySerializer.class); + + private static DatasiftInteractionActivitySerializer instance = new DatasiftInteractionActivitySerializer(); + + public static DatasiftInteractionActivitySerializer getInstance() { + return instance; + } + + ObjectMapper mapper = StreamsDatasiftMapper.getInstance(); + + @Override + public String serializationFormat() { + return "application/json+datasift.com.v1.1"; + } + + @Override + public Datasift serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON"); + } + + public Activity deserialize(String datasiftJson) { + try { + return deserialize(this.mapper.readValue(datasiftJson, Datasift.class)); + } catch (Exception e) { + LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson); + LOGGER.error("Exception : {}", e); + throw new RuntimeException(e); + } + } + + @Override + public Activity deserialize(Datasift serialized) { + + try { + + Activity activity = convert(serialized); + + return activity; + + } catch (Exception e) { + throw new IllegalArgumentException("Unable to deserialize", e); + } + + } + + @Override + public List<Activity> deserializeAll(List<Datasift> datasifts) { + List<Activity> activities = Lists.newArrayList(); + for( Datasift datasift : datasifts ) { + activities.add(deserialize(datasift)); + } + return activities; + } + + public static Generator buildGenerator(Interaction interaction) { + Generator generator = new Generator(); + generator.setDisplayName(interaction.getSource()); + generator.setId(interaction.getSource()); + return generator; + } + + public static Icon getIcon(Interaction interaction) { + return null; + } + + public static Provider buildProvider(Interaction interaction) { + Provider provider = new Provider(); + provider.setId("id:providers:"+interaction.getType()); + provider.setDisplayName(interaction.getType()); + return provider; + } + + public static String getUrls(Interaction interaction) { + return null; + } + + public static void addDatasiftExtension(Activity activity, Datasift datasift) { + Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity); + extensions.put("datasift", datasift); + } + + public static String formatId(String... idparts) { + return Joiner.on(":").join(Lists.asList("id:datasift", idparts)); + } + + public Activity convert(Datasift event) { + + Preconditions.checkNotNull(event); + Preconditions.checkNotNull(event.getInteraction()); + + Activity activity = new Activity(); + activity.setActor(buildActor(event.getInteraction())); + activity.setVerb(selectVerb(event)); + activity.setObject(buildActivityObject(event.getInteraction())); + activity.setId(formatId(activity.getVerb(), event.getInteraction().getId())); + activity.setTarget(buildTarget(event.getInteraction())); + activity.setPublished(event.getInteraction().getCreatedAt()); + activity.setGenerator(buildGenerator(event.getInteraction())); + activity.setIcon(getIcon(event.getInteraction())); + activity.setProvider(buildProvider(event.getInteraction())); + activity.setTitle(event.getInteraction().getTitle()); + activity.setContent(event.getInteraction().getContent()); + activity.setUrl(event.getInteraction().getLink()); + activity.setLinks(getLinks(event)); + addDatasiftExtension(activity, event); + if( event.getInteraction().getGeo() != null) { + addLocationExtension(activity, event.getInteraction()); + } + return activity; + } + + private String selectVerb(Datasift event) { + return "post"; + } + + public Actor buildActor(Interaction interaction) { + Actor actor = new Actor(); + org.apache.streams.datasift.interaction.Author author = interaction.getAuthor(); + if(author == null) { + LOGGER.warn("Interaction does not contain author information."); + return actor; + } + String userName = author.getUsername(); + String name = author.getName(); + Long id = author.getId(); + if(userName != null) { + actor.setDisplayName(userName); + } else { + actor.setDisplayName(name); + } + + if(id != null) { + actor.setId(id.toString()); + } else { + if(userName != null) + actor.setId(userName); + else + actor.setId(name); + } + Image image = new Image(); + image.setUrl(interaction.getAuthor().getAvatar()); + actor.setImage(image); + if (interaction.getAuthor().getLink()!=null){ + actor.setUrl(interaction.getAuthor().getLink()); + } + return actor; + } + + public static ActivityObject buildActivityObject(Interaction interaction) { + ActivityObject actObj = new ActivityObject(); + actObj.setObjectType(interaction.getContenttype()); + actObj.setUrl(interaction.getLink()); + actObj.setId(formatId("post",interaction.getId())); + actObj.setContent(interaction.getContent()); + + return actObj; + } + + public static List<String> getLinks(Datasift event) { + List<String> result = Lists.newArrayList(); + Links links = event.getLinks(); + if(links == null) + return null; + for(Object link : links.getNormalizedUrl()) { + if(link != null) { + if(link instanceof String) { + result.add((String) link); + } else { + LOGGER.warn("link is not of type String : {}", link.getClass().getName()); + } + } + } + return result; + } + + public static ActivityObject buildTarget(Interaction interaction) { + return null; + } + + public static void addLocationExtension(Activity activity, Interaction interaction) { + Map<String, Object> extensions = ensureExtensions(activity); + Map<String, Object> location = new HashMap<String, Object>(); + Map<String, Double> coordinates = new HashMap<String, Double>(); + coordinates.put("latitude", interaction.getGeo().getLatitude()); + coordinates.put("longitude", interaction.getGeo().getLongitude()); + location.put("coordinates", coordinates); + extensions.put("location", location); + } + + public static String firstStringIfNotNull(List<Object> list) { + if( list != null && list.size() > 0) { + return (String) list.get(0); + } else return null; + } +}