http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
new file mode 100644
index 0000000..a53eaa5
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailMessageActivitySerializer.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.google.gmail.provider;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.googlecode.gmail4j.GmailException;
+import com.googlecode.gmail4j.GmailMessage;
+import com.googlecode.gmail4j.javamail.JavaMailGmailMessage;
+import com.sun.mail.imap.IMAPFolder;
+import com.sun.mail.imap.IMAPMessage;
+import com.sun.mail.imap.IMAPSSLStore;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.*;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.mail.internet.MimeMultipart;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class GMailMessageActivitySerializer implements 
ActivitySerializer<GmailMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(GMailMessageActivitySerializer.class);
+
+    GMailProvider provider;
+
+    ObjectMapper mapper = new ObjectMapper();
+
+    public GMailMessageActivitySerializer(GMailProvider provider) {
+
+        this.provider = provider;
+
+        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
Boolean.FALSE);
+
+        mapper.addMixInAnnotations(IMAPSSLStore.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(IMAPFolder.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(IMAPMessage.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(MimeMultipart.class, MessageMixIn.class);
+        mapper.addMixInAnnotations(JavaMailGmailMessage.class, 
MessageMixIn.class);
+
+    }
+
+    public GMailMessageActivitySerializer() {
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "gmail.v1";
+    }
+
+    @Override
+    public GmailMessage serialize(Activity activity) {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(GmailMessage gmailMessage) {
+
+        Activity activity = new Activity();
+        activity.setId(formatId(this.provider.getConfig().getUserName(), 
String.valueOf(gmailMessage.getMessageNumber())));
+        activity.setPublished(new DateTime(gmailMessage.getSendDate()));
+        Provider provider = new Provider();
+        provider.setId("http://gmail.com";);
+        provider.setDisplayName("GMail");
+        activity.setProvider(provider);
+        Actor actor = new Actor();
+        actor.setId(gmailMessage.getFrom().getEmail());
+        actor.setDisplayName(gmailMessage.getFrom().getName());
+        activity.setActor(actor);
+        activity.setVerb("email");
+        ActivityObject object = new ActivityObject();
+        try {
+            object.setId(gmailMessage.getTo().get(0).getEmail());
+            object.setDisplayName(gmailMessage.getTo().get(0).getName());
+        } catch( GmailException e ) {
+            LOGGER.warn(e.getMessage());
+        }
+        activity.setTitle(gmailMessage.getSubject());
+        try {
+            activity.setContent(gmailMessage.getContentText());
+        } catch( GmailException e ) {
+            LOGGER.warn(e.getMessage());
+        }
+        activity.setObject(object);
+
+//        try {
+//            // if jackson can't serialize the object, find out now
+//            String jsonString = mapper.writeValueAsString(gmailMessage);
+//            ObjectNode jsonObject = mapper.valueToTree(gmailMessage);
+//            // since it can, write the entire source object to 
extensions.gmail
+//            Map<String, Object> extensions = Maps.newHashMap();
+//            extensions.put("gmail", gmailMessage);
+//            activity.setAdditionalProperty("extensions", extensions);
+//        } catch (JsonProcessingException e) {
+//            LOGGER.debug("Failed Json Deserialization");
+//            e.printStackTrace();
+//        }
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<GmailMessage> serializedList) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+
+    public Activity convert(ObjectNode event) {
+        return null;
+    }
+
+    public static Generator buildGenerator(ObjectNode event) {
+        return null;
+    }
+
+    public static Icon getIcon(ObjectNode event) {
+        return null;
+    }
+
+    public static Provider buildProvider(ObjectNode event) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:gmail");
+        return provider;
+    }
+
+    public static List<Object> getLinks(ObjectNode event) {
+        return null;
+    }
+
+    public static String getUrls(ObjectNode event) {
+        return null;
+    }
+
+    public static void addGMailExtension(Activity activity, GmailMessage 
gmailMessage) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        extensions.put("gmail", gmailMessage);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:gmail", idparts));
+    }
+
+    interface MessageMixIn {
+        @JsonManagedReference
+        @JsonIgnore
+        IMAPSSLStore getDefaultFolder(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        IMAPSSLStore getPersonalNamespaces(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        IMAPFolder getStore(); // we don't need it!
+        //        @JsonManagedReference
+//        @JsonIgnore
+//        @JsonBackReference
+        //IMAPFolder getParent(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        @JsonBackReference
+        IMAPMessage getFolder(); // we don't need it!
+        @JsonManagedReference
+        @JsonIgnore
+        @JsonProperty("parent")
+        @JsonBackReference
+        MimeMultipart getParent();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
index 8f0f491..583c741 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
@@ -32,9 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-/**
- * Consider replacing this with a simpler Processer which extends 
SimpleHTTPGetProcessor
- */
 public class GooglePlusCommentProcessor implements StreamsProcessor {
     private final static String STREAMS_ID = "GooglePlusCommentProcessor";
     private final static Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusCommentProcessor.class);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
index 52b29dd..73e261f 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
@@ -38,10 +38,6 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Queue;
 
-@Deprecated
-/*
- * Modules and streams should adopt TypeConverterProcessor and 
ActivityConverterProcessor
- */
 public class GooglePlusTypeConverter implements StreamsProcessor {
     public final static String STREAMS_ID = "GooglePlusTypeConverter";
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java
deleted file mode 100644
index e4a1f5d..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivityConverter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-
-public class GPlusActivityConverter implements 
ActivityConverter<com.google.api.services.plus.model.Activity> {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusActivityConverter.class);
-
-    AbstractGPlusProvider provider;
-
-    public GPlusActivityConverter(AbstractGPlusProvider provider) {
-
-        this.provider = provider;
-    }
-
-    public GPlusActivityConverter() {
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "gplus.v1";
-    }
-
-    @Override
-    public com.google.api.services.plus.model.Activity serialize(Activity 
deserialized) {
-        throw new NotImplementedException("Not currently implemented");
-    }
-
-    @Override
-    public Activity deserialize(com.google.api.services.plus.model.Activity 
gplusActivity) {
-        Activity activity = new Activity();
-
-        GooglePlusActivityUtil.updateActivity(gplusActivity, activity);
-        return activity;
-    }
-
-    @Override
-    public List<Activity> 
deserializeAll(List<com.google.api.services.plus.model.Activity> 
serializedList) {
-        throw new NotImplementedException("Not currently implemented");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
new file mode 100644
index 0000000..4991e94
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.google.gplus.provider;
+
+import com.google.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class GPlusActivitySerializer implements 
ActivitySerializer<com.google.api.services.plus.model.Activity> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusActivitySerializer.class);
+
+    AbstractGPlusProvider provider;
+
+    public GPlusActivitySerializer(AbstractGPlusProvider provider) {
+
+        this.provider = provider;
+    }
+
+    public GPlusActivitySerializer() {
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "gplus.v1";
+    }
+
+    @Override
+    public com.google.api.services.plus.model.Activity serialize(Activity 
deserialized) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+
+    @Override
+    public Activity deserialize(com.google.api.services.plus.model.Activity 
gplusActivity) {
+        Activity activity = new Activity();
+
+        GooglePlusActivityUtil.updateActivity(gplusActivity, activity);
+        return activity;
+    }
+
+    @Override
+    public List<Activity> 
deserializeAll(List<com.google.api.services.plus.model.Activity> 
serializedList) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
new file mode 100644
index 0000000..6ed2ae1
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.google.gplus.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+
+public class GPlusEventProcessor implements Runnable {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(GPlusEventProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private BlockingQueue<String> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private GPlusActivitySerializer gPlusActivitySerializer = new 
GPlusActivitySerializer();
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public GPlusEventProcessor(BlockingQueue<String> inQueue, 
Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public GPlusEventProcessor(BlockingQueue<String> inQueue, 
Queue<StreamsDatum> outQueue, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.outClass = outClass;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            try {
+                String item = inQueue.take();
+                Thread.sleep(new Random().nextInt(100));
+                if(item==TERMINATE) {
+                    LOGGER.info("Terminating!");
+                    break;
+                }
+
+                // first check for valid json
+                ObjectNode node = (ObjectNode)mapper.readTree(item);
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass))
+                    outQueue.offer(new StreamsDatum(item));
+                else {
+                    // convert to desired format
+                    com.google.api.services.plus.model.Activity gplusActivity 
= (com.google.api.services.plus.model.Activity)mapper.readValue(item, 
com.google.api.services.plus.model.Activity.class);
+
+                    Activity streamsActivity = 
gPlusActivitySerializer.deserialize(gplusActivity);
+
+                    outQueue.offer(new StreamsDatum(streamsActivity));
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 6acbbdb..f0101fd 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -21,7 +21,7 @@ package org.apache.streams.instagram.processor;
 import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.instagram.serializer.InstagramUserInfoConverter;
+import org.apache.streams.instagram.serializer.InstagramUserInfoSerializer;
 import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.jinstagram.entity.users.basicinfo.UserInfoData;
@@ -32,10 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Queue;
 
-@Deprecated
-/*
- * Modules and streams should adopt TypeConverterProcessor and 
ActivityConverterProcessor
- */
 public class InstagramTypeConverter implements StreamsProcessor {
 
     public final static String STREAMS_ID = "InstagramTypeConverter";
@@ -46,7 +42,7 @@ public class InstagramTypeConverter implements 
StreamsProcessor {
     private Queue<StreamsDatum> outQueue;
 
     private InstagramActivityUtil instagramActivityUtil;
-    private InstagramUserInfoConverter userInfoSerializer;
+    private InstagramUserInfoSerializer userInfoSerializer;
 
     private int count = 0;
 
@@ -103,7 +99,7 @@ public class InstagramTypeConverter implements 
StreamsProcessor {
     @Override
     public void prepare(Object o) {
         instagramActivityUtil = new InstagramActivityUtil();
-        this.userInfoSerializer = new InstagramUserInfoConverter();
+        this.userInfoSerializer = new InstagramUserInfoSerializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java
deleted file mode 100644
index 7b9dd09..0000000
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivityConverter.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.instagram.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static 
org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
-
-public class InstagramJsonActivityConverter implements 
ActivityConverter<String>, Serializable
-{
-
-    public InstagramJsonActivityConverter() {
-
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public String serialize(Activity deserialized) throws 
ActivitySerializerException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public Activity deserialize(String serialized) throws 
ActivitySerializerException {
-
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-        MediaFeedData mediaFeedData = null;
-
-        try {
-            mediaFeedData = mapper.readValue(serialized, MediaFeedData.class);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        Activity activity = new Activity();
-
-        updateActivity(mediaFeedData, activity);
-
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        throw new NotImplementedException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
new file mode 100644
index 0000000..c5bbdf1
--- /dev/null
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static 
org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
+
+public class InstagramJsonActivitySerializer implements 
ActivitySerializer<String>, Serializable
+{
+
+    public InstagramJsonActivitySerializer() {
+
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws 
ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws 
ActivitySerializerException {
+
+        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+        MediaFeedData mediaFeedData = null;
+
+        try {
+            mediaFeedData = mapper.readValue(serialized, MediaFeedData.class);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        Activity activity = new Activity();
+
+        updateActivity(mediaFeedData, activity);
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        throw new NotImplementedException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java
deleted file mode 100644
index 95456c1..0000000
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoConverter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.streams.instagram.serializer;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Image;
-import org.apache.streams.pojo.json.Provider;
-import org.jinstagram.entity.users.basicinfo.UserInfoData;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public class InstagramUserInfoConverter implements 
ActivityConverter<UserInfoData> {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramUserInfoConverter.class);
-
-    private static final String STREAMS_ID_PREFIX = "id:instagram:";
-    private static final String PROVIDER_ID = "id:provider:instagram";
-    private static final String DISPLAY_NAME = "Instagram";
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public UserInfoData serialize(Activity deserialized) throws 
ActivitySerializerException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public Activity deserialize(UserInfoData serialized) throws 
ActivitySerializerException {
-        Activity activity = new Activity();
-        Provider provider = new Provider();
-        provider.setId(PROVIDER_ID);
-        provider.setDisplayName(DISPLAY_NAME);
-        activity.setProvider(provider);
-        activity.setPublished(DateTime.now().withZone(DateTimeZone.UTC));
-        Actor actor = new Actor();
-        Image image = new Image();
-        image.setUrl(serialized.getProfile_picture());
-        actor.setImage(image);
-        actor.setId(STREAMS_ID_PREFIX+serialized.getId());
-        actor.setSummary(serialized.getBio());
-        actor.setAdditionalProperty("handle", serialized.getUsername());
-        actor.setDisplayName(serialized.getFullName());
-        Map<String, Object> extensions = Maps.newHashMap();
-        actor.setAdditionalProperty("extensions", extensions);
-        extensions.put("screenName", serialized.getUsername());
-        extensions.put("posts", serialized.getCounts().getMedia());
-        extensions.put("followers", serialized.getCounts().getFollwed_by());
-        extensions.put("website", serialized.getWebsite());
-        extensions.put("following", serialized.getCounts().getFollows());
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<UserInfoData> serializedList) {
-        List<Activity> result = Lists.newLinkedList();
-        for(UserInfoData data : serializedList) {
-            try {
-                result.add(deserialize(data));
-            } catch (ActivitySerializerException ase) {
-                LOGGER.error("Caught ActivitySerializerException, dropping 
user info data : {}", data.getId());
-                LOGGER.error("Exception : {}", ase);
-            }
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
new file mode 100644
index 0000000..055169b
--- /dev/null
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
@@ -0,0 +1,83 @@
+package org.apache.streams.instagram.serializer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.instagram.UsersInfo;
+import 
org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.pojo.json.Provider;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class InstagramUserInfoSerializer implements 
ActivitySerializer<UserInfoData> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramUserInfoSerializer.class);
+
+    private static final String STREAMS_ID_PREFIX = "id:instagram:";
+    private static final String PROVIDER_ID = "id:provider:instagram";
+    private static final String DISPLAY_NAME = "Instagram";
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public UserInfoData serialize(Activity deserialized) throws 
ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(UserInfoData serialized) throws 
ActivitySerializerException {
+        Activity activity = new Activity();
+        Provider provider = new Provider();
+        provider.setId(PROVIDER_ID);
+        provider.setDisplayName(DISPLAY_NAME);
+        activity.setProvider(provider);
+        activity.setPublished(DateTime.now().withZone(DateTimeZone.UTC));
+        Actor actor = new Actor();
+        Image image = new Image();
+        image.setUrl(serialized.getProfile_picture());
+        actor.setImage(image);
+        actor.setId(STREAMS_ID_PREFIX+serialized.getId());
+        actor.setSummary(serialized.getBio());
+        actor.setAdditionalProperty("handle", serialized.getUsername());
+        actor.setDisplayName(serialized.getFullName());
+        Map<String, Object> extensions = Maps.newHashMap();
+        actor.setAdditionalProperty("extensions", extensions);
+        extensions.put("screenName", serialized.getUsername());
+        extensions.put("posts", serialized.getCounts().getMedia());
+        extensions.put("followers", serialized.getCounts().getFollwed_by());
+        extensions.put("website", serialized.getWebsite());
+        extensions.put("following", serialized.getCounts().getFollows());
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<UserInfoData> serializedList) {
+        List<Activity> result = Lists.newLinkedList();
+        for(UserInfoData data : serializedList) {
+            try {
+                result.add(deserialize(data));
+            } catch (ActivitySerializerException ase) {
+                LOGGER.error("Caught ActivitySerializerException, dropping 
user info data : {}", data.getId());
+                LOGGER.error("Exception : {}", ase);
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
new file mode 100644
index 0000000..2f2d677
--- /dev/null
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.moreover.api.Article;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.util.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Deserializes Moreover JSON format into Activities
+ */
+public class MoreoverJsonActivitySerializer implements 
ActivitySerializer<String> {
+
+    public MoreoverJsonActivitySerializer() {
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+vnd.moreover.com.v1";
+    }
+
+    @Override
+    public String serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover JSON");
+    }
+
+    @Override
+    public Activity deserialize(String serialized) {
+        serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
+
+        System.out.println(serialized);
+
+        ObjectMapper mapper = new ObjectMapper();
+        AnnotationIntrospector introspector = new 
JaxbAnnotationIntrospector(mapper.getTypeFactory());
+        mapper.setAnnotationIntrospector(introspector);
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
+        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, 
Boolean.FALSE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
+
+        Article article;
+        try {
+            ObjectNode node = (ObjectNode)mapper.readTree(serialized);
+            node.remove("tags");
+            node.remove("locations");
+            node.remove("companies");
+            node.remove("topics");
+            node.remove("media");
+            node.remove("outboundUrls");
+            ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
+            jsonNodes.remove("editorialTopics");
+            jsonNodes.remove("tags");
+            jsonNodes.remove("autoTopics");
+            article = mapper.convertValue(node, Article.class);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+        return MoreoverUtils.convert(article);
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
new file mode 100644
index 0000000..d60bcb8
--- /dev/null
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
+import com.moreover.api.ObjectFactory;
+import org.apache.commons.lang.SerializationException;
+import org.apache.streams.data.util.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Deserializes the Moreover Article XML and converts it to an instance of 
{@link Activity}
+ */
+public class MoreoverXmlActivitySerializer implements 
ActivitySerializer<String> {
+
+    //JAXBContext is threadsafe (supposedly)
+    private final JAXBContext articleContext;
+    private final JAXBContext articlesContext;
+
+    public MoreoverXmlActivitySerializer() {
+        articleContext = createContext(Article.class);
+        articlesContext = createContext(ArticlesResponse.class);
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "application/xml+vnd.moreover.com.v1";
+    }
+
+    @Override
+    public String serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover");
+    }
+
+    @Override
+    public Activity deserialize(String serialized) {
+        Article article = deserializeMoreover(serialized);
+        return MoreoverUtils.convert(article);
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        List<Activity> activities = new LinkedList<Activity>();
+        for(String item : serializedList) {
+            ArticlesResponse response = deserializeMoreoverResponse(item);
+            for(Article article : response.getArticles().getArticle()) {
+                activities.add(MoreoverUtils.convert(article));
+            }
+        }
+        return activities;
+    }
+
+    private Article deserializeMoreover(String serialized){
+        try {
+            Unmarshaller unmarshaller = articleContext.createUnmarshaller();
+            return (Article) unmarshaller.unmarshal(new 
StringReader(serialized));
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to deserialize Moreover 
data", e);
+        }
+    }
+
+    private ArticlesResponse deserializeMoreoverResponse(String serialized){
+        try {
+            Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
+            return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new 
StringReader(serialized))).getValue();
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to deserialize Moreover 
data", e);
+        }
+    }
+
+    private JAXBContext createContext(Class articleClass) {
+        JAXBContext context;
+        try {
+            context = 
JAXBContext.newInstance(articleClass.getPackage().getName(), 
ObjectFactory.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new IllegalStateException("Unable to create JAXB Context for 
Moreover data", e);
+        }
+        return context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java
deleted file mode 100644
index 5c038ac..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverConverterResolver.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.streams.data.moreover.conversion;
-
-import com.moreover.Moreover;
-import com.moreover.api.Article;
-import org.apache.streams.data.ActivityConverterResolver;
-import org.apache.streams.exceptions.ActivitySerializerException;
-
-/**
- * Ensures moreover documents can be converted to Activity
- */
-public class MoreoverConverterResolver implements ActivityConverterResolver {
-    @Override
-    public Class bestSerializer(Class documentClass) throws 
ActivitySerializerException {
-        if( documentClass == Moreover.class )
-            return MoreoverJsonActivityConverter.class;
-        else if( documentClass == Article.class )
-            return MoreoverJsonActivityConverter.class;
-        else return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java
deleted file mode 100644
index 88ca5db..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverDocumentClassifier.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.streams.data.moreover.conversion;
-
-import com.google.common.base.Preconditions;
-import com.moreover.Moreover;
-import com.moreover.api.Article;
-import org.apache.streams.data.DocumentClassifier;
-
-/**
- * Ensures moreover documents can be converted to Activity
- */
-public class MoreoverDocumentClassifier implements DocumentClassifier {
-    @Override
-    public Class detectClass(Object document) {
-        Preconditions.checkArgument(document instanceof String);
-        String string = (String) document;
-        if( string.startsWith("{") && string.endsWith("}") )
-            return Moreover.class;
-        else if( string.startsWith("<") && string.endsWith(">") )
-            return Article.class;
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java
deleted file mode 100644
index a3070ae..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverJsonActivityConverter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover.conversion;
-
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import com.moreover.api.Article;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Deserializes Moreover JSON format into Activities
- */
-public class MoreoverJsonActivityConverter implements 
ActivityConverter<String> {
-
-    public MoreoverJsonActivityConverter() {
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+vnd.moreover.com.v1";
-    }
-
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover JSON");
-    }
-
-    @Override
-    public Activity deserialize(String serialized) {
-        serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
-
-        System.out.println(serialized);
-
-        ObjectMapper mapper = new ObjectMapper();
-        AnnotationIntrospector introspector = new 
JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
-        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, 
Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
-        Article article;
-        try {
-            ObjectNode node = (ObjectNode)mapper.readTree(serialized);
-            node.remove("tags");
-            node.remove("locations");
-            node.remove("companies");
-            node.remove("topics");
-            node.remove("media");
-            node.remove("outboundUrls");
-            ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
-            jsonNodes.remove("editorialTopics");
-            jsonNodes.remove("tags");
-            jsonNodes.remove("autoTopics");
-            article = mapper.convertValue(node, Article.class);
-        } catch (IOException e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-        return MoreoverUtils.convert(article);
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        throw new NotImplementedException("Not currently implemented");
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java
deleted file mode 100644
index 1f92a9d..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/conversion/MoreoverXmlActivityConverter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover.conversion;
-
-import com.moreover.api.Article;
-import com.moreover.api.ArticlesResponse;
-import com.moreover.api.ObjectFactory;
-import org.apache.commons.lang.SerializationException;
-import org.apache.streams.data.ActivityConverter;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Deserializes the Moreover Article XML and converts it to an instance of 
{@link Activity}
- */
-public class MoreoverXmlActivityConverter implements ActivityConverter<String> 
{
-
-    //JAXBContext is threadsafe (supposedly)
-    private final JAXBContext articleContext;
-    private final JAXBContext articlesContext;
-
-    public MoreoverXmlActivityConverter() {
-        articleContext = createContext(Article.class);
-        articlesContext = createContext(ArticlesResponse.class);
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "application/xml+vnd.moreover.com.v1";
-    }
-
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to 
Moreover");
-    }
-
-    @Override
-    public Activity deserialize(String serialized) {
-        Article article = deserializeMoreover(serialized);
-        return MoreoverUtils.convert(article);
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        List<Activity> activities = new LinkedList<Activity>();
-        for(String item : serializedList) {
-            ArticlesResponse response = deserializeMoreoverResponse(item);
-            for(Article article : response.getArticles().getArticle()) {
-                activities.add(MoreoverUtils.convert(article));
-            }
-        }
-        return activities;
-    }
-
-    private Article deserializeMoreover(String serialized){
-        try {
-            Unmarshaller unmarshaller = articleContext.createUnmarshaller();
-            return (Article) unmarshaller.unmarshal(new 
StringReader(serialized));
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover 
data", e);
-        }
-    }
-
-    private ArticlesResponse deserializeMoreoverResponse(String serialized){
-        try {
-            Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
-            return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new 
StringReader(serialized))).getValue();
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover 
data", e);
-        }
-    }
-
-    private JAXBContext createContext(Class articleClass) {
-        JAXBContext context;
-        try {
-            context = 
JAXBContext.newInstance(articleClass.getPackage().getName(), 
ObjectFactory.class.getClassLoader());
-        } catch (JAXBException e) {
-            throw new IllegalStateException("Unable to create JAXB Context for 
Moreover data", e);
-        }
-        return context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
deleted file mode 100644
index 01340f6..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.streams.data.moreover.conversion.MoreoverJsonActivityConverter;
-import org.apache.streams.data.util.JsonUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-import static org.junit.Assert.assertThat;
-
-public class MoreoverJsonActivityConverterTest {
-    JsonNode json;
-    ActivityConverter serializer = new MoreoverJsonActivityConverter();
-    ObjectMapper mapper;
-
-    @Before
-    public void setup() throws IOException {
-        json = 
JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json");
-
-        mapper = new ObjectMapper();
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
-    }
-
-    @Test
-    public void loadData() throws Exception {
-        for (JsonNode item : json) {
-            test(serializer.deserialize(getString(item)));
-        }
-    }
-
-
-    private String getString(JsonNode jsonNode)  {
-        try {
-            return new ObjectMapper().writeValueAsString(jsonNode);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
new file mode 100644
index 0000000..f5d66b1
--- /dev/null
+++ 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.util.JsonUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Pattern;
+
+import static java.util.regex.Pattern.matches;
+import static org.apache.streams.data.util.MoreoverTestUtil.test;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+public class MoreoverJsonActivitySerializerTest {
+    JsonNode json;
+    ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
+    ObjectMapper mapper;
+
+    @Before
+    public void setup() throws IOException {
+        json = 
JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json");
+
+        mapper = new ObjectMapper();
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+    }
+
+    @Test
+    public void loadData() throws Exception {
+        for (JsonNode item : json) {
+            test(serializer.deserialize(getString(item)));
+        }
+    }
+
+
+    private String getString(JsonNode jsonNode)  {
+        try {
+            return new ObjectMapper().writeValueAsString(jsonNode);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
deleted file mode 100644
index e8afc5d..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import 
org.apache.streams.data.moreover.conversion.MoreoverXmlActivityConverter;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-
-public class MoreoverXmlActivityConverterTest {
-    ActivityConverter serializer;
-    private String xml;
-
-    @Before
-    public void setup() throws IOException {
-        serializer = new MoreoverXmlActivityConverter();
-        xml = loadXml();
-    }
-
-    @Test
-    public void loadData() throws Exception {
-        List<Activity> activities = 
serializer.deserializeAll(Lists.newArrayList(xml));
-        for (Activity activity : activities) {
-            test(activity);
-        }
-    }
-
-    private String loadXml() throws IOException {
-        StringWriter writer = new StringWriter();
-        InputStream resourceAsStream = 
this.getClass().getResourceAsStream("moreover.xml");
-        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
-        return writer.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
new file mode 100644
index 0000000..dbebee2
--- /dev/null
+++ 
b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.apache.streams.data.util.MoreoverTestUtil.test;
+
+public class MoreoverXmlActivitySerializerTest {
+    ActivitySerializer serializer;
+    private String xml;
+
+    @Before
+    public void setup() throws IOException {
+        serializer = new MoreoverXmlActivitySerializer();
+        xml = loadXml();
+    }
+
+    @Test
+    public void loadData() throws Exception {
+        List<Activity> activities = 
serializer.deserializeAll(Lists.newArrayList(xml));
+        for (Activity activity : activities) {
+            test(activity);
+        }
+    }
+
+    private String loadXml() throws IOException {
+        StringWriter writer = new StringWriter();
+        InputStream resourceAsStream = 
this.getClass().getResourceAsStream("moreover.xml");
+        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+        return writer.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
index fbd16f4..339b922 100644
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
+++ 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.rss.serializer.SyndEntryActivityConverter;
+import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -32,18 +32,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-@Deprecated
 /**
  * Converts ObjectNode representations of Rome SyndEntries to activities.
- * Deprecated: Modules and streams should adopt TypeConverterProcessor and 
ActivityConverterProcessor
- * TODO: Have RSS Provider always output ObjectNode, place 
ActivityConverterProcessor afterward with RssDocumentClassifier and 
RssConverterResolver available
- * TODO: Refactor tests and examples
  */
 public class RssTypeConverter implements StreamsProcessor{
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RssTypeConverter.class);
 
-    private SyndEntryActivityConverter serializer;
+    private SyndEntryActivitySerializer serializer;
     private int successCount = 0;
     private int failCount = 0;
 
@@ -66,7 +62,7 @@ public class RssTypeConverter implements StreamsProcessor{
 
     @Override
     public void prepare(Object o) {
-        this.serializer = new SyndEntryActivityConverter();
+        this.serializer = new SyndEntryActivitySerializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java
deleted file mode 100644
index 1e19eb4..0000000
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssDocumentClassifier.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.rss.provider;
-
-import com.sun.syndication.feed.synd.SyndEntry;
-import org.apache.streams.data.DocumentClassifier;
-
-/**
- * Ensures rss documents can be converted to Activity
- */
-public class RssDocumentClassifier implements DocumentClassifier {
-
-    @Override
-    public Class detectClass(Object document) {
-        if( document instanceof SyndEntry )
-            return SyndEntry.class;
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
new file mode 100644
index 0000000..4e6efee
--- /dev/null
+++ 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventClassifier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.provider;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.sun.syndication.feed.synd.SyndEntry;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class RssEventClassifier {
+
+    public static Class detectClass( ObjectNode bean ) {
+        return SyndEntry.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
new file mode 100644
index 0000000..75d275d
--- /dev/null
+++ 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.sun.syndication.feed.synd.SyndEntry;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntrySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Random;
+
+public class RssEventProcessor implements Runnable {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(RssEventProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private Queue<SyndEntry> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private SyndEntryActivitySerializer syndEntryActivitySerializer = new 
SyndEntryActivitySerializer();
+    private SyndEntrySerializer syndEntrySerializer = new 
SyndEntrySerializer();
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> 
outQueue, Class inClass, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public RssEventProcessor(Queue<SyndEntry> inQueue, Queue<StreamsDatum> 
outQueue, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.outClass = outClass;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            Object item;
+            try {
+                item = inQueue.poll();
+                if(item instanceof String && item.equals(TERMINATE)) {
+                    LOGGER.info("Terminating!");
+                    break;
+                }
+
+                Thread.sleep(new Random().nextInt(100));
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass))
+                    outQueue.offer(new StreamsDatum(item.toString()));
+                else if( SyndEntry.class.equals(outClass))
+                {
+                    outQueue.offer(new StreamsDatum(item));
+                }
+                else if( Activity.class.equals(outClass))
+                {
+                    // convert to desired format
+                    SyndEntry entry = (SyndEntry)item;
+                    if( entry != null ) {
+                        Activity out = 
syndEntryActivitySerializer.deserialize(this.syndEntrySerializer.deserialize((SyndEntry)item));
+
+                        if( out != null )
+                            outQueue.offer(new StreamsDatum(out));
+                    }
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java
deleted file mode 100644
index 489ffec..0000000
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/RssConverterResolver.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.streams.rss.serializer;
-
-import com.sun.syndication.feed.synd.SyndEntry;
-import org.apache.streams.data.ActivityConverterResolver;
-import org.apache.streams.exceptions.ActivitySerializerException;
-
-/**
- * Ensures rss documents can be converted to Activity
- */
-public class RssConverterResolver implements ActivityConverterResolver {
-    
-    @Override
-    public Class bestSerializer(Class documentClass) throws 
ActivitySerializerException {
-        if( documentClass == SyndEntry.class )
-            return SyndEntryActivityConverter.class;
-        return null;
-    }
-}

Reply via email to