http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
deleted file mode 100644
index 9183a72..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphPersistWriter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.graph.neo4j.CypherGraphUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class GraphPersistWriter extends SimpleHTTPPostPersistWriter {
-
-    public static final String STREAMS_ID = 
GraphPersistWriter.class.getCanonicalName();
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GraphPersistWriter.class);
-    private final static long MAX_WRITE_LATENCY = 1000;
-
-    protected GraphWriterConfiguration configuration;
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-    private volatile AtomicLong lastWrite = new 
AtomicLong(System.currentTimeMillis());
-    private ScheduledExecutorService backgroundFlushTask = 
Executors.newSingleThreadScheduledExecutor();
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    public GraphPersistWriter() {
-        this(new 
ComponentConfigurator<GraphWriterConfiguration>(GraphWriterConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-    }
-
-    public GraphPersistWriter(GraphWriterConfiguration configuration) {
-        super((HttpPersistWriterConfiguration)configuration);
-        if( configuration.getType().equals(GraphConfiguration.Type.NEO_4_J))
-            super.configuration.setResourcePath("/db/" + 
configuration.getGraph() + "/transaction/commit");
-        else if( 
configuration.getType().equals(GraphConfiguration.Type.REXSTER))
-            super.configuration.setResourcePath("/graphs/" + 
configuration.getGraph());
-        this.configuration = configuration;
-    }
-
-    @Override
-    protected ObjectNode preparePayload(StreamsDatum entry) {
-
-        Activity activity = null;
-
-        if (entry.getDocument() instanceof Activity) {
-            activity = (Activity) entry.getDocument();
-        } else if (entry.getDocument() instanceof ObjectNode) {
-            activity = mapper.convertValue(entry.getDocument(), 
Activity.class);
-        } else if (entry.getDocument() instanceof String) {
-            try {
-                activity = mapper.readValue((String) entry.getDocument(), 
Activity.class);
-            } catch (Throwable e) {
-                LOGGER.warn(e.getMessage());
-            }
-        }
-
-        Preconditions.checkNotNull(activity);
-
-        ObjectNode request = mapper.createObjectNode();
-        ArrayNode statements = mapper.createArrayNode();
-
-        activity.getActor().setObjectType("page");
-
-        // always add vertices first
-        // what types of verbs are relevant for adding vertices?
-        if( 
configuration.getVertices().getVerbs().contains(activity.getVerb())) {
-
-            // what objects and objectTypes are relevant for adding vertices?
-            if( configuration.getVertices().getObjects().contains("actor") &&
-                
configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType()))
 {
-                
statements.add(CypherGraphUtil.mergeVertexRequest(activity.getActor()));
-            }
-            if( configuration.getVertices().getObjects().contains("object") &&
-                
configuration.getVertices().getObjectTypes().contains(activity.getObject().getObjectType()))
 {
-                
statements.add(CypherGraphUtil.mergeVertexRequest(activity.getObject()));
-            }
-            if( configuration.getVertices().getObjects().contains("provider") 
&&
-                
configuration.getVertices().getObjectTypes().contains(activity.getProvider().getObjectType()))
 {
-                
statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider()));
-            }
-            if( configuration.getVertices().getObjects().contains("target") &&
-                
configuration.getVertices().getObjectTypes().contains(activity.getTarget().getObjectType()))
 {
-                
statements.add(CypherGraphUtil.mergeVertexRequest(activity.getProvider()));
-            }
-
-        }
-
-        // what types of verbs are relevant for adding edges?
-        if( configuration.getEdges().getVerbs().contains(activity.getVerb())) {
-
-            // what objects and objectTypes are relevant for adding edges?
-            if( configuration.getEdges().getObjects().contains("actor") &&
-                configuration.getEdges().getObjects().contains("object") &&
-                
configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())
 &&
-                
configuration.getEdges().getObjectTypes().contains(activity.getObject().getObjectType()))
 {
-                statements.add(CypherGraphUtil.createEdgeRequest(activity, 
activity.getActor(), activity.getObject()));
-            }
-            if( configuration.getEdges().getObjects().contains("actor") &&
-                    configuration.getEdges().getObjects().contains("target") &&
-                    
configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())
 &&
-                    
configuration.getEdges().getObjectTypes().contains(activity.getTarget().getObjectType()))
 {
-                statements.add(CypherGraphUtil.createEdgeRequest(activity, 
activity.getActor(), activity.getTarget()));
-            }
-            if( configuration.getEdges().getObjects().contains("provider") &&
-                configuration.getEdges().getObjects().contains("actor") &&
-                
configuration.getEdges().getObjectTypes().contains(activity.getProvider().getObjectType())
 &&
-                
configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()))
 {
-                statements.add(CypherGraphUtil.createEdgeRequest(activity, 
activity.getProvider(), activity.getActor()));
-            }
-        }
-
-        request.put("statements", statements);
-        return request;
-
-    }
-
-    @Override
-    protected ObjectNode executePost(HttpPost httpPost) {
-
-        Preconditions.checkNotNull(httpPost);
-
-        ObjectNode result = null;
-
-        CloseableHttpResponse response = null;
-
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpPost);
-            HttpEntity entity = response.getEntity();
-            if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 201 && entity != null) {
-                entityString = EntityUtils.toString(entity);
-                result = mapper.readValue(entityString, ObjectNode.class);
-            }
-            LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), 
response.getStatusLine().getStatusCode(), entityString);
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), 
response, e.getMessage());
-        } finally {
-            try {
-                response.close();
-            } catch (IOException e) {}
-        }
-        return result;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        super.prepare(configurationObject);
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-        LOGGER.info("exiting");
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
deleted file mode 100644
index 92ee12f..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphUtil.java
+++ /dev/null
@@ -1,147 +0,0 @@
-package org.apache.streams.graph.neo4j;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.stringtemplate.v4.ST;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by steve on 11/13/14.
- */
-public class CypherGraphUtil {
-
-    private final static ObjectMapper mapper = new StreamsJacksonMapper();
-
-    public final static String statementKey = "statement";
-    public final static String paramsKey = "parameters";
-    public final static String propsKey = "props";
-
-    public final static String getVertexStatementTemplate = "MATCH (v {id: 
'<id>'} ) RETURN v";
-
-    public final static String createVertexStatementTemplate = "MATCH (x {id: 
'<id>'}) "+
-                                                                "CREATE UNIQUE 
(n:<type> { props }) "+
-                                                                "RETURN n";
-
-    public final static String mergeVertexStatementTemplate = "MERGE (v:<type> 
{id: '<id>'}) "+
-                                                               "ON CREATE SET 
v:<type>";
-
-    public final static String createEdgeStatementTemplate = "MATCH 
(s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
-                                                            "CREATE UNIQUE 
(s)-[r:<r_type> <r_props>]->(d) "+
-                                                            "RETURN r";
-
-    public static ObjectNode getVertexRequest(String id) {
-
-        ObjectNode request = mapper.createObjectNode();
-
-        ST getVertex = new ST(getVertexStatementTemplate);
-        getVertex.add("id", id);
-        request.put(statementKey, getVertex.render());
-
-        return request;
-    }
-
-    public static ObjectNode createVertexRequest(ActivityObject 
activityObject) {
-
-        Preconditions.checkNotNull(activityObject.getObjectType());
-
-        ObjectNode request = mapper.createObjectNode();
-
-        ST createVertex = new ST(createVertexStatementTemplate);
-        createVertex.add("id", activityObject.getId());
-        createVertex.add("type", activityObject.getObjectType());
-        request.put(statementKey, createVertex.render());
-
-        ObjectNode params = mapper.createObjectNode();
-        ObjectNode object = mapper.convertValue(activityObject, 
ObjectNode.class);
-        ObjectNode props = PropertyUtil.flattenToObjectNode(object, '_');
-        params.put(propsKey, props);
-        request.put(paramsKey, params);
-
-        return request;
-    }
-
-    public static ObjectNode mergeVertexRequest(ActivityObject activityObject) 
{
-
-        Preconditions.checkNotNull(activityObject.getObjectType());
-
-        ObjectNode request = mapper.createObjectNode();
-
-        ST mergeVertex = new ST(mergeVertexStatementTemplate);
-        mergeVertex.add("id", activityObject.getId());
-        mergeVertex.add("type", activityObject.getObjectType());
-
-        ObjectNode object = mapper.convertValue(activityObject, 
ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '_');
-
-        String statement = mergeVertex.render();
-        statement += getPropertySetter(props, "v");
-        statement += (" RETURN v;");
-        request.put(statementKey, statement);
-
-        return request;
-    }
-
-    public static ObjectNode createEdgeRequest(Activity activity, 
ActivityObject source, ActivityObject destination) {
-
-        ObjectNode request = mapper.createObjectNode();
-
-        // set the activityObject's and extensions null, because their 
properties don't need to appear on the relationship
-        activity.setActor(null);
-        activity.setObject(null);
-        activity.setTarget(null);
-        activity.getAdditionalProperties().put("extensions", null);
-
-        ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '_');
-
-        ST mergeEdge = new ST(createEdgeStatementTemplate);
-        mergeEdge.add("s_id", source.getId());
-        mergeEdge.add("s_type", source.getObjectType());
-        mergeEdge.add("d_id", destination.getId());
-        mergeEdge.add("d_type", destination.getObjectType());
-        mergeEdge.add("r_id", activity.getId());
-        mergeEdge.add("r_type", activity.getVerb());
-        mergeEdge.add("r_props", getPropertyCreater(props));
-
-        String statement = mergeEdge.render();
-        request.put(statementKey, statement);
-
-        return request;
-    }
-
-    public static String getPropertySetter(Map<String, Object> map, String 
symbol) {
-        StringBuilder builder = new StringBuilder();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String)(entry.getValue());
-                builder.append("," + symbol + "." + entry.getKey() + " = '" + 
propVal + "'");
-            }
-        }
-        return builder.toString();
-    }
-
-    public static String getPropertyCreater(Map<String, Object> map) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{");
-        List<String> parts = Lists.newArrayList();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String) (entry.getValue());
-                parts.add(entry.getKey() + ":'" + propVal + "'");
-            }
-        }
-        builder.append(Joiner.on(",").join(parts));
-        builder.append("}");
-        return builder.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
deleted file mode 100644
index 1e1fac4..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphConfiguration.json
+++ /dev/null
@@ -1,19 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphConfiguration",
-    "extends" : 
{"$ref":"../../../../../../../../../streams-components/streams-http/src/main/jsonschema/org/apache/streams/components/http/HttpPersistWriterConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "type": {
-            "type": "string",
-            "description": "Graph DB type",
-            "enum" : ["neo4j", "rexster"]
-        },
-        "graph": {
-            "type": "string",
-            "description": "Graph DB Graph ID"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
deleted file mode 100644
index f9e3868..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphEdgeWriterConfiguration.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphEdgeWriterConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "objects": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "verbs": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "objectTypes": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
deleted file mode 100644
index 798f4f6..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphVertexWriterConfiguration.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphVertexWriterConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "objects": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "verbs": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        },
-        "objectTypes": {
-            "type": "array",
-            "required": false,
-            "items": {
-                "type": "string"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
 
b/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
deleted file mode 100644
index 1e059d8..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/jsonschema/org/apache/streams/graph/GraphWriterConfiguration.json
+++ /dev/null
@@ -1,18 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema";,
-    "id": "#",
-    "javaType" : "org.apache.streams.graph.GraphWriterConfiguration",
-    "extends" : {"$ref":"GraphConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "vertices": {
-            "type": "object",
-            "javaType": 
"org.apache.streams.graph.GraphVertexWriterConfiguration"
-        },
-        "edges": {
-            "type": "object",
-            "javaType": "org.apache.streams.graph.GraphEdgeWriterConfiguration"
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-jackson/pom.xml 
b/streams-contrib/streams-processor-jackson/pom.xml
new file mode 100644
index 0000000..8bb44fd
--- /dev/null
+++ b/streams-contrib/streams-processor-jackson/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-processor-jackson</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path-assert</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
new file mode 100644
index 0000000..56b0c5c
--- /dev/null
+++ 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
@@ -0,0 +1,62 @@
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This processor walks an input objectnode and corrects any artifacts
+ * that may have occured from improper serialization of jsonschema2pojo beans.
+ *
+ * The logic is also available for inclusion in other module via static import.
+ */
+public class CleanAdditionalPropertiesProcessor implements StreamsProcessor {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class);
+
+    private ObjectMapper mapper;
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum datum) {
+        List<StreamsDatum> result = Lists.newLinkedList();
+        ObjectNode activity = this.mapper.convertValue(datum.getDocument(), 
ObjectNode.class);
+        cleanAdditionalProperties(activity);
+        datum.setDocument(activity);
+        result.add(datum);
+        return result;
+    }
+
+    @Override
+    public void prepare(Object o) {
+        this.mapper = StreamsJacksonMapper.getInstance();
+        this.mapper.registerModule(new JsonOrgModule());
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    public static void cleanAdditionalProperties(ObjectNode node) {
+        if( node.get("additionalProperties") != null ) {
+            ObjectNode additionalProperties = (ObjectNode) 
node.get("additionalProperties");
+            cleanAdditionalProperties(additionalProperties);
+            Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = 
additionalProperties.fields();
+            while( jsonNodeIterator.hasNext() ) {
+                Map.Entry<String, JsonNode> entry = jsonNodeIterator.next();
+                node.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
new file mode 100644
index 0000000..63d03e9
--- /dev/null
+++ 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public class TypeConverterProcessor implements StreamsProcessor {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TypeConverterProcessor.class);
+
+    private List<String> formats = Lists.newArrayList();
+
+    private ObjectMapper mapper;
+
+    private Class inClass;
+    private Class outClass;
+
+    public TypeConverterProcessor(Class inClass, Class outClass, ObjectMapper 
mapper) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+        this.mapper = mapper;
+    }
+
+    public TypeConverterProcessor(Class inClass, Class outClass, List<String> 
formats) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+        this.formats = formats;
+    }
+
+    public TypeConverterProcessor(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> result = Lists.newLinkedList();
+        Object inDoc = entry.getDocument();
+        ObjectNode node = null;
+        if( inClass == String.class ||
+            inDoc instanceof String ) {
+            try {
+                node = this.mapper.readValue((String)entry.getDocument(), 
ObjectNode.class);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        } else {
+            node = this.mapper.convertValue(inDoc, ObjectNode.class);
+        }
+
+        if(node != null) {
+            Object outDoc;
+            try {
+                if( outClass == String.class )
+                    outDoc = this.mapper.writeValueAsString(node);
+                else
+                    outDoc = this.mapper.convertValue(node, outClass);
+
+                StreamsDatum outDatum = new StreamsDatum(outDoc, 
entry.getId(), entry.getTimestamp(), entry.getSequenceid());
+                outDatum.setMetadata(entry.getMetadata());
+                result.add(outDatum);
+            } catch (Throwable e) {
+                LOGGER.warn(e.getMessage());
+                LOGGER.warn(node.toString());
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        if( formats.size() > 0 )
+            this.mapper = StreamsJacksonMapper.getInstance(formats);
+        else
+            this.mapper = StreamsJacksonMapper.getInstance();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
 
b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
new file mode 100644
index 0000000..1316d5c
--- /dev/null
+++ 
b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.jackson.test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.TypeConverterProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.*;
+
+/**
+ *
+ */
+public class TypeConverterProcessorTest {
+
+    private static final String DATASIFT_JSON = 
"{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter
 for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta 
Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue,
 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT 
@AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a 
genius. https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\
 "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 
14:28:06 
+0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran 
sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran 
one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the 
stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official 
Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu
 
...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed 
Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing 
Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: 
http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit
 le\":[\"Ed Sheeran - SING [Official Video] - 
YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell:
 Loved working with @edsheeran on Sing. He's a genius. 
https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta 
Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, 
pero mientras estemos aqui debemos BAILAR!!! 
#ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709
 
31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed,
 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 
+0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia
 Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n 
en 
Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_
 
https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon,
 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}";
+
+    public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z";
+
+    @Test
+    public void testTypeConverterStringToString() {
+        final String ID = "1";
+        StreamsProcessor processor = new TypeConverterProcessor(String.class, 
String.class, Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof String);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterStringToObjectNode() {
+        final String ID = "1";
+        StreamsProcessor processor = new TypeConverterProcessor(String.class, 
ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof ObjectNode);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterObjectNodeToString() throws IOException {
+        final String ID = "1";
+        StreamsProcessor processor = new 
TypeConverterProcessor(ObjectNode.class, String.class, 
Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT));
+        ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class);
+        StreamsDatum datum = new StreamsDatum(node, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof String);
+        assertEquals(ID, resultDatum.getId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml 
b/streams-contrib/streams-provider-datasift/pom.xml
index 23165af..ae96d4e 100644
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ b/streams-contrib/streams-provider-datasift/pom.xml
@@ -37,7 +37,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
-            <artifactId>streams-converters</artifactId>
+            <artifactId>streams-processor-jackson</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
index bc982e9..a4a4b5a 100644
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
+++ 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
@@ -23,7 +23,8 @@ import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.serializer.DatasiftActivityConverter;
+import org.apache.streams.datasift.provider.DatasiftConverter;
+import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
 import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
@@ -41,7 +42,7 @@ public class DatasiftActivitySerializerProcessor implements 
StreamsProcessor {
 
     private ObjectMapper mapper;
     private Class outClass;
-    private DatasiftActivityConverter datasiftActivitySerializer;
+    private DatasiftActivitySerializer datasiftActivitySerializer;
 
     public final static String TERMINATE = new String("TERMINATE");
 
@@ -76,8 +77,8 @@ public class DatasiftActivitySerializerProcessor implements 
StreamsProcessor {
 
     @Override
     public void prepare(Object configurationObject) {
-        this.mapper = 
StreamsJacksonMapper.getInstance(StreamsDatasiftMapper.DATASIFT_FORMAT);
-        this.datasiftActivitySerializer = new DatasiftActivityConverter();
+        this.mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+        this.datasiftActivitySerializer = new DatasiftActivitySerializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
index a0d2fc3..1166b2e 100644
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
+++ 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
@@ -25,10 +25,10 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.provider.DatasiftConverter;
-import org.apache.streams.datasift.serializer.DatasiftActivityConverter;
+import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
 import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.converter.CleanAdditionalPropertiesProcessor;
+import org.apache.streams.jackson.CleanAdditionalPropertiesProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +43,7 @@ public class DatasiftTypeConverterProcessor implements 
StreamsProcessor {
 
     private ObjectMapper mapper;
     private Class outClass;
-    private DatasiftActivityConverter datasiftInteractionActivitySerializer;
+    private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
     private DatasiftConverter converter;
 
     public final static String TERMINATE = new String("TERMINATE");
@@ -75,7 +75,7 @@ public class DatasiftTypeConverterProcessor implements 
StreamsProcessor {
     @Override
     public void prepare(Object configurationObject) {
         this.mapper = StreamsDatasiftMapper.getInstance();
-        this.datasiftInteractionActivitySerializer = new 
DatasiftActivityConverter();
+        this.datasiftInteractionActivitySerializer = new 
DatasiftActivitySerializer();
         if(this.outClass.equals(Activity.class)) {
             this.converter = new ActivityConverter();
         } else if (this.outClass.equals(String.class)) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java
deleted file mode 100644
index 65bebce..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivityConverter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.data.ActivityConverterFactory;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.twitter.Twitter;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-
-import java.util.List;
-
-/**
- *  We should be able to @Deprecate this soon and adopt 
ActivityConverterProcessor
- */
-public class DatasiftActivityConverter implements ActivityConverter<Datasift> {
-
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance(StreamsDatasiftMapper.DATASIFT_FORMAT);
-
-    private static DatasiftActivityConverter instance = new 
DatasiftActivityConverter();
-
-    public static DatasiftActivityConverter getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public Datasift serialize(Activity deserialized) throws 
ActivitySerializerException {
-        return null;
-    }
-
-    @Override
-    public Activity deserialize(Datasift serialized) throws 
ActivitySerializerException {
-        Class detectedClass = 
DatasiftEventClassifier.getInstance().detectClass(serialized);
-        Class converterClass = 
DatasiftConverterResolver.getInstance().bestSerializer(detectedClass);
-        ActivityConverter serializer = 
ActivityConverterFactory.getInstance(converterClass);
-        return serializer.deserialize(serialized);
-    }
-
-    public Activity deserialize(String json) throws 
ActivitySerializerException {
-        try {
-            return deserialize(MAPPER.readValue(json, Datasift.class));
-        } catch (Exception e) {
-            throw new ActivitySerializerException(e);
-        }
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Datasift> serializedList) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
new file mode 100644
index 0000000..b587cd6
--- /dev/null
+++ 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class DatasiftActivitySerializer implements 
ActivitySerializer<Datasift> {
+
+    private static final ObjectMapper MAPPER = 
StreamsDatasiftMapper.getInstance();
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public Datasift serialize(Activity deserialized) throws 
ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(Datasift serialized) throws 
ActivitySerializerException {
+        ActivitySerializer serializer = 
DatasiftEventClassifier.bestSerializer(serialized);
+        return serializer.deserialize(serialized);
+    }
+
+    public Activity deserialize(String json) throws 
ActivitySerializerException {
+        try {
+            return deserialize(MAPPER.readValue(json, Datasift.class));
+        } catch (Exception e) {
+            throw new ActivitySerializerException(e);
+        }
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Datasift> serializedList) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java
deleted file mode 100644
index edeab34..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftConverterResolver.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.serializer;
-
-import org.apache.streams.data.ActivityConverterResolver;
-import org.apache.streams.datasift.instagram.Instagram;
-import org.apache.streams.datasift.twitter.Twitter;
-
-/**
- * Ensures datasift documents can be converted to Activity
- */
-public class DatasiftConverterResolver implements ActivityConverterResolver {
-
-    public DatasiftConverterResolver() {
-
-    }
-
-    private static DatasiftConverterResolver instance = new 
DatasiftConverterResolver();
-
-    public static DatasiftConverterResolver getInstance() {
-        return instance;
-    }
-
-    public Class bestSerializer(Class documentClass) {
-
-        if(documentClass == Twitter.class) {
-            return DatasiftTwitterActivityConverter.class;
-        } else if(documentClass == Instagram.class) {
-            return DatasiftInstagramActivityConverter.class;
-        } else {
-            return DatasiftInteractionActivityConverter.class;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
index 226c3f6..7d7d547 100644
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
+++ 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
@@ -18,39 +18,36 @@
 
 package org.apache.streams.datasift.serializer;
 
-import com.google.common.base.Preconditions;
-import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.instagram.Instagram;
 import org.apache.streams.datasift.interaction.Interaction;
 import org.apache.streams.datasift.twitter.Twitter;
 
 /**
- * Ensures datasift documents can be converted to Activity
+ * Created by sblackmon on 11/6/14.
  */
-public class DatasiftEventClassifier implements DocumentClassifier {
+public class DatasiftEventClassifier {
 
-    public DatasiftEventClassifier() {
+    public static Class detectClass(Datasift event) {
 
-    }
-
-    private static DatasiftEventClassifier instance = new 
DatasiftEventClassifier();
-
-    public static DatasiftEventClassifier getInstance() {
-        return instance;
-    }
-
-    public Class detectClass(Object document) {
-
-        Preconditions.checkArgument(document instanceof Datasift);
-        Datasift datasift = (Datasift)document;
-        if(datasift.getTwitter() != null) {
+        if(event.getTwitter() != null) {
             return Twitter.class;
-        } else if(datasift.getInstagram() != null) {
+        } else if(event.getInstagram() != null) {
             return Instagram.class;
         } else {
             return Interaction.class;
         }
     }
 
+    public static ActivitySerializer bestSerializer(Datasift event) {
+
+        if(event.getTwitter() != null) {
+            return DatasiftTwitterActivitySerializer.getInstance();
+        } else if(event.getInstagram() != null) {
+            return DatasiftInstagramActivitySerializer.getInstance();
+        } else {
+            return DatasiftInteractionActivitySerializer.getInstance();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java
deleted file mode 100644
index eade439..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.serializer;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.instagram.From;
-import org.apache.streams.datasift.instagram.Instagram;
-import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Image;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftInstagramActivityConverter extends 
DatasiftInteractionActivityConverter {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DatasiftInstagramActivityConverter.class);
-
-    private static DatasiftInstagramActivityConverter instance = new 
DatasiftInstagramActivityConverter();
-
-    public static DatasiftInstagramActivityConverter getInstance() {
-        return instance;
-    }
-
-    @Override
-    public Activity convert(Datasift event) {
-        Activity activity = super.convert(event);
-
-        Instagram instagram = event.getInstagram();
-
-        activity.setActor(buildActor(event, instagram));
-        activity.setId(formatId(activity.getVerb(), 
event.getInteraction().getId()));
-        activity.setProvider(InstagramActivityUtil.getProvider());
-        activity.setLinks(getLinks(event.getInstagram()));
-
-        activity.setVerb(selectVerb(event));
-        addInstagramExtensions(activity, instagram);
-        return activity;
-    }
-
-    /**
-     * Gets links from the object
-     * @return
-     */
-    private List<String> getLinks(Instagram instagram) {
-        List<String> result = Lists.newLinkedList();
-        if( instagram.getMedia() != null ) {
-            result.add(instagram.getMedia().getImage());
-            result.add(instagram.getMedia().getVideo());
-        }
-        return result;
-    }
-
-    public Actor buildActor(Datasift event, Instagram instagram) {
-        Actor actor = super.buildActor(event.getInteraction());
-        From user = instagram.getFrom();
-
-        actor.setDisplayName(firstStringIfNotNull(user.getFullName()));
-        actor.setId(formatId(Optional.fromNullable(
-                user.getId())
-                .or(actor.getId())));
-
-        Image profileImage = new Image();
-        String profileUrl = null;
-        profileUrl = user.getProfilePicture();
-        profileImage.setUrl(profileUrl);
-        actor.setImage(profileImage);
-
-        return actor;
-    }
-
-    public void addInstagramExtensions(Activity activity, Instagram instagram) 
{
-        Map<String, Object> extensions = ensureExtensions(activity);
-        List<String> hashTags;
-        if(instagram.getMedia() != null) {
-            hashTags = instagram.getMedia().getTags();
-            extensions.put("hashtags", hashTags);
-            extensions.put("keywords", activity.getContent());
-        } else {
-            extensions.put("keywords", activity.getContent());
-
-        }
-
-    }
-
-    private String selectVerb(Datasift event) {
-        if( event.getInteraction().getSubtype().equals("like"))
-            return "like";
-        else
-            return "post";
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:instagram", idparts));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
new file mode 100644
index 0000000..d121d65
--- /dev/null
+++ 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.serializer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.From;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftInstagramActivitySerializer extends 
DatasiftInteractionActivitySerializer {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DatasiftInstagramActivitySerializer.class);
+
+    private static DatasiftInstagramActivitySerializer instance = new 
DatasiftInstagramActivitySerializer();
+
+    public static DatasiftInstagramActivitySerializer getInstance() {
+        return instance;
+    }
+
+    @Override
+    public Activity convert(Datasift event) {
+        Activity activity = super.convert(event);
+
+        Instagram instagram = event.getInstagram();
+
+        activity.setActor(buildActor(event, instagram));
+        activity.setId(formatId(activity.getVerb(), 
event.getInteraction().getId()));
+        activity.setProvider(InstagramActivityUtil.getProvider());
+        activity.setLinks(getLinks(event.getInstagram()));
+
+        activity.setVerb(selectVerb(event));
+        addInstagramExtensions(activity, instagram);
+        return activity;
+    }
+
+    /**
+     * Gets links from the object
+     * @return
+     */
+    private List<String> getLinks(Instagram instagram) {
+        List<String> result = Lists.newLinkedList();
+        if( instagram.getMedia() != null ) {
+            result.add(instagram.getMedia().getImage());
+            result.add(instagram.getMedia().getVideo());
+        }
+        return result;
+    }
+
+    public Actor buildActor(Datasift event, Instagram instagram) {
+        Actor actor = super.buildActor(event.getInteraction());
+        From user = instagram.getFrom();
+
+        actor.setDisplayName(firstStringIfNotNull(user.getFullName()));
+        actor.setId(formatId(Optional.fromNullable(
+                user.getId())
+                .or(actor.getId())));
+
+        Image profileImage = new Image();
+        String profileUrl = null;
+        profileUrl = user.getProfilePicture();
+        profileImage.setUrl(profileUrl);
+        actor.setImage(profileImage);
+
+        return actor;
+    }
+
+    public void addInstagramExtensions(Activity activity, Instagram instagram) 
{
+        Map<String, Object> extensions = ensureExtensions(activity);
+        List<String> hashTags;
+        if(instagram.getMedia() != null) {
+            hashTags = instagram.getMedia().getTags();
+            extensions.put("hashtags", hashTags);
+            extensions.put("keywords", activity.getContent());
+        } else {
+            extensions.put("keywords", activity.getContent());
+
+        }
+
+    }
+
+    private String selectVerb(Datasift event) {
+        if( event.getInteraction().getSubtype().equals("like"))
+            return "like";
+        else
+            return "post";
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:instagram", idparts));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java
deleted file mode 100644
index da21006..0000000
--- 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.links.Links;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftInteractionActivityConverter implements 
ActivityConverter<Datasift> {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DatasiftInteractionActivityConverter.class);
-
-    private static DatasiftInteractionActivityConverter instance = new 
DatasiftInteractionActivityConverter();
-
-    public static DatasiftInteractionActivityConverter getInstance() {
-        return instance;
-    }
-
-    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+datasift.com.v1.1";
-    }
-
-    @Override
-    public Datasift serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to 
Datasift JSON");
-    }
-
-    public Activity deserialize(String datasiftJson) {
-        try {
-            return deserialize(this.mapper.readValue(datasiftJson, 
Datasift.class));
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying convert,\n {},\n to a 
Datasift object.", datasiftJson);
-            LOGGER.error("Exception : {}", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Activity deserialize(Datasift serialized) {
-
-        try {
-
-            Activity activity = convert(serialized);
-
-            return activity;
-
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Datasift> datasifts) {
-        List<Activity> activities = Lists.newArrayList();
-        for( Datasift datasift : datasifts ) {
-            activities.add(deserialize(datasift));
-        }
-        return activities;
-    }
-
-    public static Generator buildGenerator(Interaction interaction) {
-        Generator generator = new Generator();
-        generator.setDisplayName(interaction.getSource());
-        generator.setId(interaction.getSource());
-        return generator;
-    }
-
-    public static Icon getIcon(Interaction interaction) {
-        return null;
-    }
-
-    public static Provider buildProvider(Interaction interaction) {
-        Provider provider = new Provider();
-        provider.setId("id:providers:"+interaction.getType());
-        provider.setDisplayName(interaction.getType());
-        return provider;
-    }
-
-    public static String getUrls(Interaction interaction) {
-        return null;
-    }
-
-    public static void addDatasiftExtension(Activity activity, Datasift 
datasift) {
-        Map<String, Object> extensions = 
org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
-        extensions.put("datasift", datasift);
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
-    }
-
-    public Activity convert(Datasift event) {
-
-        Preconditions.checkNotNull(event);
-        Preconditions.checkNotNull(event.getInteraction());
-
-        Activity activity = new Activity();
-        activity.setActor(buildActor(event.getInteraction()));
-        activity.setVerb(selectVerb(event));
-        activity.setObject(buildActivityObject(event.getInteraction()));
-        activity.setId(formatId(activity.getVerb(), 
event.getInteraction().getId()));
-        activity.setTarget(buildTarget(event.getInteraction()));
-        activity.setPublished(event.getInteraction().getCreatedAt());
-        activity.setGenerator(buildGenerator(event.getInteraction()));
-        activity.setIcon(getIcon(event.getInteraction()));
-        activity.setProvider(buildProvider(event.getInteraction()));
-        activity.setTitle(event.getInteraction().getTitle());
-        activity.setContent(event.getInteraction().getContent());
-        activity.setUrl(event.getInteraction().getLink());
-        activity.setLinks(getLinks(event));
-        addDatasiftExtension(activity, event);
-        if( event.getInteraction().getGeo() != null) {
-            addLocationExtension(activity, event.getInteraction());
-        }
-        return activity;
-    }
-
-    private String selectVerb(Datasift event) {
-        return "post";
-    }
-
-    public Actor buildActor(Interaction interaction) {
-        Actor actor = new Actor();
-        org.apache.streams.datasift.interaction.Author author = 
interaction.getAuthor();
-        if(author == null) {
-            LOGGER.warn("Interaction does not contain author information.");
-            return actor;
-        }
-        String userName = author.getUsername();
-        String name = author.getName();
-        Long id  = author.getId();
-        if(userName != null) {
-            actor.setDisplayName(userName);
-        } else {
-            actor.setDisplayName(name);
-        }
-
-        if(id != null) {
-            actor.setId(id.toString());
-        } else {
-            if(userName != null)
-                actor.setId(userName);
-            else
-                actor.setId(name);
-        }
-        Image image = new Image();
-        image.setUrl(interaction.getAuthor().getAvatar());
-        actor.setImage(image);
-        if (interaction.getAuthor().getLink()!=null){
-            actor.setUrl(interaction.getAuthor().getLink());
-        }
-        return actor;
-    }
-
-    public static ActivityObject buildActivityObject(Interaction interaction) {
-        ActivityObject actObj = new ActivityObject();
-        actObj.setObjectType(interaction.getContenttype());
-        actObj.setUrl(interaction.getLink());
-        actObj.setId(formatId("post",interaction.getId()));
-        actObj.setContent(interaction.getContent());
-
-        return actObj;
-    }
-
-    public static List<String> getLinks(Datasift event) {
-        List<String> result = Lists.newArrayList();
-        Links links = event.getLinks();
-        if(links == null)
-            return null;
-        for(Object link : links.getNormalizedUrl()) {
-            if(link != null) {
-                if(link instanceof String) {
-                    result.add((String) link);
-                } else {
-                    LOGGER.warn("link is not of type String : {}", 
link.getClass().getName());
-                }
-            }
-        }
-        return result;
-    }
-
-    public static ActivityObject buildTarget(Interaction interaction) {
-        return null;
-    }
-
-    public static void addLocationExtension(Activity activity, Interaction 
interaction) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        Map<String, Object> location = new HashMap<String, Object>();
-        Map<String, Double> coordinates = new HashMap<String, Double>();
-        coordinates.put("latitude", interaction.getGeo().getLatitude());
-        coordinates.put("longitude", interaction.getGeo().getLongitude());
-        location.put("coordinates", coordinates);
-        extensions.put("location", location);
-    }
-
-    public static String firstStringIfNotNull(List<Object> list) {
-        if( list != null && list.size() > 0) {
-            return (String) list.get(0);
-        } else return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
new file mode 100644
index 0000000..c856dc2
--- /dev/null
+++ 
b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
@@ -0,0 +1,222 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.links.Links;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.pojo.json.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftInteractionActivitySerializer implements 
ActivitySerializer<Datasift>, Serializable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DatasiftInteractionActivitySerializer.class);
+
+    private static DatasiftInteractionActivitySerializer instance = new 
DatasiftInteractionActivitySerializer();
+
+    public static DatasiftInteractionActivitySerializer getInstance() {
+        return instance;
+    }
+
+    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+datasift.com.v1.1";
+    }
+
+    @Override
+    public Datasift serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to 
Datasift JSON");
+    }
+
+    public Activity deserialize(String datasiftJson) {
+        try {
+            return deserialize(this.mapper.readValue(datasiftJson, 
Datasift.class));
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying convert,\n {},\n to a 
Datasift object.", datasiftJson);
+            LOGGER.error("Exception : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Activity deserialize(Datasift serialized) {
+
+        try {
+
+            Activity activity = convert(serialized);
+
+            return activity;
+
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Datasift> datasifts) {
+        List<Activity> activities = Lists.newArrayList();
+        for( Datasift datasift : datasifts ) {
+            activities.add(deserialize(datasift));
+        }
+        return activities;
+    }
+
+    public static Generator buildGenerator(Interaction interaction) {
+        Generator generator = new Generator();
+        generator.setDisplayName(interaction.getSource());
+        generator.setId(interaction.getSource());
+        return generator;
+    }
+
+    public static Icon getIcon(Interaction interaction) {
+        return null;
+    }
+
+    public static Provider buildProvider(Interaction interaction) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:"+interaction.getType());
+        provider.setDisplayName(interaction.getType());
+        return provider;
+    }
+
+    public static String getUrls(Interaction interaction) {
+        return null;
+    }
+
+    public static void addDatasiftExtension(Activity activity, Datasift 
datasift) {
+        Map<String, Object> extensions = 
org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("datasift", datasift);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+    }
+
+    public Activity convert(Datasift event) {
+
+        Preconditions.checkNotNull(event);
+        Preconditions.checkNotNull(event.getInteraction());
+
+        Activity activity = new Activity();
+        activity.setActor(buildActor(event.getInteraction()));
+        activity.setVerb(selectVerb(event));
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), 
event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(event.getInteraction().getCreatedAt());
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(buildProvider(event.getInteraction()));
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        activity.setLinks(getLinks(event));
+        addDatasiftExtension(activity, event);
+        if( event.getInteraction().getGeo() != null) {
+            addLocationExtension(activity, event.getInteraction());
+        }
+        return activity;
+    }
+
+    private String selectVerb(Datasift event) {
+        return "post";
+    }
+
+    public Actor buildActor(Interaction interaction) {
+        Actor actor = new Actor();
+        org.apache.streams.datasift.interaction.Author author = 
interaction.getAuthor();
+        if(author == null) {
+            LOGGER.warn("Interaction does not contain author information.");
+            return actor;
+        }
+        String userName = author.getUsername();
+        String name = author.getName();
+        Long id  = author.getId();
+        if(userName != null) {
+            actor.setDisplayName(userName);
+        } else {
+            actor.setDisplayName(name);
+        }
+
+        if(id != null) {
+            actor.setId(id.toString());
+        } else {
+            if(userName != null)
+                actor.setId(userName);
+            else
+                actor.setId(name);
+        }
+        Image image = new Image();
+        image.setUrl(interaction.getAuthor().getAvatar());
+        actor.setImage(image);
+        if (interaction.getAuthor().getLink()!=null){
+            actor.setUrl(interaction.getAuthor().getLink());
+        }
+        return actor;
+    }
+
+    public static ActivityObject buildActivityObject(Interaction interaction) {
+        ActivityObject actObj = new ActivityObject();
+        actObj.setObjectType(interaction.getContenttype());
+        actObj.setUrl(interaction.getLink());
+        actObj.setId(formatId("post",interaction.getId()));
+        actObj.setContent(interaction.getContent());
+
+        return actObj;
+    }
+
+    public static List<String> getLinks(Datasift event) {
+        List<String> result = Lists.newArrayList();
+        Links links = event.getLinks();
+        if(links == null)
+            return null;
+        for(Object link : links.getNormalizedUrl()) {
+            if(link != null) {
+                if(link instanceof String) {
+                    result.add((String) link);
+                } else {
+                    LOGGER.warn("link is not of type String : {}", 
link.getClass().getName());
+                }
+            }
+        }
+        return result;
+    }
+
+    public static ActivityObject buildTarget(Interaction interaction) {
+        return null;
+    }
+
+    public static void addLocationExtension(Activity activity, Interaction 
interaction) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = new HashMap<String, Object>();
+        Map<String, Double> coordinates = new HashMap<String, Double>();
+        coordinates.put("latitude", interaction.getGeo().getLatitude());
+        coordinates.put("longitude", interaction.getGeo().getLongitude());
+        location.put("coordinates", coordinates);
+        extensions.put("location", location);
+    }
+
+    public static String firstStringIfNotNull(List<Object> list) {
+        if( list != null && list.size() > 0) {
+            return (String) list.get(0);
+        } else return null;
+    }
+}

Reply via email to