http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
index a1ca7c5..418491a 100644
--- 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
+++ 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java
@@ -24,6 +24,7 @@ import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -34,55 +35,60 @@ import java.util.List;
  */
 public class TwitterDocumentClassifierTest {
 
-    private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 
2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri
 Nov 20 19:29:02 +0000 
2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b
 
ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
-    private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 
2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon
 Jul 05 17:35:49 +0000 
2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos 
Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\
 
/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed
 Dec 11 22:25:06 +0000 
2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc
 
e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri
 Apr 13 19:00:11 +0000 
2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,
 
\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\
 ":\"name 
emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
-    private String delete = 
"{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
-    private String follow = 
"{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
-    private String user = 
"{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri
 Apr 17 12:35:56 +0000 
2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_repl
 
y_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri
 Apr 17 12:37:54 +0000 
2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";;
+  private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 
2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri
 Nov 20 19:29:02 +0000 
2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_bac
 
kground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+  private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 
2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon
 Jul 05 17:35:49 +0000 
2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos 
Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/p
 
rofile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed
 Dec 11 22:25:06 +0000 
2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"source\
 
":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri
 Apr 13 19:00:11 +0000 
2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,\"
 
profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\":
 \"name 
emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n";
+  private String delete = 
"{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n";
+  private String follow = 
"{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n";
+  private String user = 
"{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri
 Apr 17 12:35:56 +0000 
2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_reply_
 
to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri
 Apr 17 12:37:54 +0000 
2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}";;
 
-    @Test
-    public void testDetectTweet() {
-        List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(tweet);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Tweet.class) )
-            Assert.fail();
+  @Test
+  public void testDetectTweet() {
+    List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(tweet);
+    Assert.assertTrue(detected.size() == 1);
+    Class result = detected.get(0);
+    if ( !result.equals(Tweet.class) ) {
+      Assert.fail();
     }
+  }
 
-    @Test
-    public void testDetectRetweet() {
-        List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(retweet);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Retweet.class) )
-            Assert.fail();
+  @Test
+  public void testDetectRetweet() {
+    List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(retweet);
+    Assert.assertTrue(detected.size() == 1);
+    Class result = detected.get(0);
+    if ( !result.equals(Retweet.class) ) {
+      Assert.fail();
     }
+  }
 
-    @Test
-    public void testDetectDelete() {
-        List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(delete);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Delete.class) )
-            Assert.fail();
+  @Test
+  public void testDetectDelete() {
+    List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(delete);
+    Assert.assertTrue(detected.size() == 1);
+    Class result = detected.get(0);
+    if ( !result.equals(Delete.class) ) {
+      Assert.fail();
     }
+  }
 
-    @Test
-    public void testDetectFollow() {
-        List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(follow);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if( !result.equals(Follow.class) )
-            Assert.fail();
+  @Test
+  public void testDetectFollow() {
+    List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(follow);
+    Assert.assertTrue(detected.size() == 1);
+    Class result = detected.get(0);
+    if ( !result.equals(Follow.class) ) {
+      Assert.fail();
     }
+  }
 
-    @Test
-    public void testDetectUser() {
-        List<Class> detected = new 
TwitterDocumentClassifier().detectClasses(user);
-        Assert.assertTrue(detected.size() == 1);
-        Class result = detected.get(0);
-        if (!result.equals(User.class))
-            Assert.fail();
+  @Test
+  public void testDetectUser() {
+    List<Class> detected = new TwitterDocumentClassifier().detectClasses(user);
+    Assert.assertTrue(detected.size() == 1);
+    Class result = detected.get(0);
+    if (!result.equals(User.class)) {
+      Assert.fail();
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
index 5e24882..6e269e5 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
@@ -18,6 +18,11 @@
 
 package com.youtube.processor;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.youtube.model.Channel;
 import com.google.api.services.youtube.model.Video;
@@ -27,10 +32,6 @@ import com.youtube.serializer.YoutubeChannelDeserializer;
 import com.youtube.serializer.YoutubeEventClassifier;
 import com.youtube.serializer.YoutubeVideoDeserializer;
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,89 +40,90 @@ import java.util.Queue;
 
 public class YoutubeTypeConverter implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "YoutubeTypeConverter";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(YoutubeTypeConverter.class);
-
-    private StreamsJacksonMapper mapper;
-    private Queue<Video> inQueue;
-    private Queue<StreamsDatum> outQueue;
-    private YoutubeActivityUtil youtubeActivityUtil;
-    private int count = 0;
-
-    public YoutubeTypeConverter() {}
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+  public static final String STREAMS_ID = "YoutubeTypeConverter";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeTypeConverter.class);
+
+  private StreamsJacksonMapper mapper;
+  private Queue<Video> inQueue;
+  private Queue<StreamsDatum> outQueue;
+  private YoutubeActivityUtil youtubeActivityUtil;
+  private int count = 0;
+
+  public YoutubeTypeConverter() {}
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum streamsDatum) {
+    StreamsDatum result = null;
+
+    try {
+      Object item = streamsDatum.getDocument();
+
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+      Activity activity = null;
+
+      if (item instanceof String) {
+        item = deserializeItem(item);
+      }
+
+      if (item instanceof Video) {
+        activity = new Activity();
+        youtubeActivityUtil.updateActivity((Video)item, activity, 
streamsDatum.getId());
+      } else if (item instanceof Channel) {
+        activity = new Activity();
+        this.youtubeActivityUtil.updateActivity((Channel)item, activity, null);
+      } else {
+        throw new NotImplementedException("Type conversion not implement for 
type : " + item.getClass().getName());
+      }
+
+      if (activity != null) {
+        result = new StreamsDatum(activity);
+        count++;
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while converting Video to Activity: {}", ex);
     }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum streamsDatum) {
-        StreamsDatum result = null;
-
-        try {
-            Object item = streamsDatum.getDocument();
-
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-            Activity activity = null;
-
-            if(item instanceof String) {
-                item = deserializeItem(item);
-            }
-
-            if(item instanceof Video) {
-                activity = new Activity();
-                youtubeActivityUtil.updateActivity((Video)item, activity, 
streamsDatum.getId());
-            } else if(item instanceof Channel) {
-                activity = new Activity();
-                this.youtubeActivityUtil.updateActivity((Channel)item, 
activity, null);
-            } else {
-                throw new NotImplementedException("Type conversion not 
implement for type : "+item.getClass().getName());
-            }
-
-            if(activity != null) {
-                result = new StreamsDatum(activity);
-                count++;
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while converting Video to Activity: {}", 
e);
-        }
-
-        if( result != null )
-            return Lists.newArrayList(result);
-        else
-            return Lists.newArrayList();
+    if ( result != null ) {
+      return Lists.newArrayList(result);
+    } else {
+      return Lists.newArrayList();
     }
-
-    private Object deserializeItem(Object item) {
-        try {
-            Class klass = YoutubeEventClassifier.detectClass((String) item);
-            if (klass.equals(Video.class)) {
-                item = mapper.readValue((String) item, Video.class);
-            } else if(klass.equals(Channel.class)) {
-                item = mapper.readValue((String) item, Channel.class);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to deserializeItem: {}", e);
-        }
-
-        return item;
+  }
+
+  private Object deserializeItem(Object item) {
+    try {
+      Class klass = YoutubeEventClassifier.detectClass((String) item);
+      if (klass.equals(Video.class)) {
+        item = mapper.readValue((String) item, Video.class);
+      } else if (klass.equals(Channel.class)) {
+        item = mapper.readValue((String) item, Channel.class);
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserializeItem: {}", ex);
     }
 
-    @Override
-    public void prepare(Object o) {
-        youtubeActivityUtil = new YoutubeActivityUtil();
-        mapper = StreamsJacksonMapper.getInstance();
-
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Video.class, new 
YoutubeVideoDeserializer());
-        mapper.registerModule(simpleModule);
-        simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Channel.class, new 
YoutubeChannelDeserializer());
-        mapper.registerModule(simpleModule);
-    }
+    return item;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    youtubeActivityUtil = new YoutubeActivityUtil();
+    mapper = StreamsJacksonMapper.getInstance();
+
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
+    mapper.registerModule(simpleModule);
+    simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Channel.class, new 
YoutubeChannelDeserializer());
+    mapper.registerModule(simpleModule);
+  }
 
-    @Override
-    public void cleanUp() {}
+  @Override
+  public void cleanUp() {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
index 8e980a7..fd238db 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
@@ -19,18 +19,17 @@
 
 package com.youtube.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.HttpRequest;
 import com.google.api.services.youtube.YouTube;
 import com.google.api.services.youtube.model.Channel;
-import com.google.api.services.youtube.model.ChannelListResponse;
 import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,63 +38,77 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
 /**
- *
+ * Collects YoutubeChannelData on behalf of YoutubeChannelProvider.
  */
-public class YoutubeChannelDataCollector extends YoutubeDataCollector{
+public class YoutubeChannelDataCollector extends YoutubeDataCollector {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeChannelDataCollector.class);
-    private static final String CONTENT = 
"snippet,contentDetails,statistics,topicDetails";
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-    private static final int MAX_ATTEMPTS= 5;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeChannelDataCollector.class);
+  private static final String CONTENT = 
"snippet,contentDetails,statistics,topicDetails";
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  private static final int MAX_ATTEMPTS = 5;
 
-    private YouTube youTube;
-    private BlockingQueue<StreamsDatum> queue;
-    private BackOffStrategy strategy;
-    private UserInfo userInfo;
-    private YoutubeConfiguration youtubeConfig;
+  private YouTube youTube;
+  private BlockingQueue<StreamsDatum> queue;
+  private BackOffStrategy strategy;
+  private UserInfo userInfo;
+  private YoutubeConfiguration youtubeConfig;
 
-    public YoutubeChannelDataCollector(YouTube youTube, 
BlockingQueue<StreamsDatum> queue, BackOffStrategy strategy, UserInfo userInfo, 
YoutubeConfiguration youtubeConfig) {
-        this.youTube = youTube;
-        this.queue = queue;
-        this.strategy = strategy;
-        this.userInfo = userInfo;
-        this.youtubeConfig = youtubeConfig;
-    }
+  /**
+   * YoutubeChannelDataCollector constructor.
+   * @param youTube YouTube
+   * @param queue BlockingQueue of StreamsDatum
+   * @param strategy BackOffStrategy
+   * @param userInfo UserInfo
+   * @param youtubeConfig YoutubeConfiguration
+   */
+  public YoutubeChannelDataCollector(
+      YouTube youTube,
+      BlockingQueue<StreamsDatum> queue,
+      BackOffStrategy strategy,
+      UserInfo userInfo,
+      YoutubeConfiguration youtubeConfig) {
+    this.youTube = youTube;
+    this.queue = queue;
+    this.strategy = strategy;
+    this.userInfo = userInfo;
+    this.youtubeConfig = youtubeConfig;
+  }
 
-    @Override
-    public void run() {
-        Gson gson = new Gson();
+  @Override
+  public void run() {
+    Gson gson = new Gson();
+    try {
+      int attempt = 0;
+      YouTube.Channels.List channelLists = 
this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey());
+      boolean tryAgain = false;
+      do {
         try {
-            int attempt = 0;
-             YouTube.Channels.List channelLists = 
this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey());
-            boolean tryAgain = false;
-            do {
-                try {
-                    List<Channel> channels = channelLists.execute().getItems();
-                    for (Channel channel : channels) {
-                        String json = gson.toJson(channel);
-                        this.queue.put(new StreamsDatum(json, 
channel.getId()));
-                    }
-                    if (StringUtils.isEmpty(channelLists.getPageToken())) {
-                        channelLists = null;
-                    } else {
-                        channelLists = 
this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setOauthToken(this.youtubeConfig.getApiKey())
-                                .setPageToken(channelLists.getPageToken());
-                    }
-                } catch (GoogleJsonResponseException gjre) {
-                    LOGGER.warn("GoogleJsonResposneException caught : {}", 
gjre);
-                    tryAgain = backoffAndIdentifyIfRetry(gjre, this.strategy);
-                    ++attempt;
-                } catch (Throwable t) {
-                    LOGGER.warn("Unable to get channel info for id : {}", 
this.userInfo.getUserId());
-                    LOGGER.warn("Excpection thrown while trying to get channel 
info : {}", t);
-                }
-            } while((tryAgain && attempt < MAX_ATTEMPTS) || channelLists != 
null);
-
-        } catch (Throwable t) {
-            LOGGER.warn(t.getMessage());
+          List<Channel> channels = channelLists.execute().getItems();
+          for (Channel channel : channels) {
+            String json = gson.toJson(channel);
+            this.queue.put(new StreamsDatum(json, channel.getId()));
+          }
+          if (StringUtils.isEmpty(channelLists.getPageToken())) {
+            channelLists = null;
+          } else {
+            channelLists = 
this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setOauthToken(this.youtubeConfig.getApiKey())
+                .setPageToken(channelLists.getPageToken());
+          }
+        } catch (GoogleJsonResponseException gjre) {
+          LOGGER.warn("GoogleJsonResposneException caught : {}", gjre);
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.strategy);
+          ++attempt;
+        } catch (Throwable throwable) {
+          LOGGER.warn("Unable to get channel info for id : {}", 
this.userInfo.getUserId());
+          LOGGER.warn("Excpection thrown while trying to get channel info : 
{}", throwable);
         }
+      }
+      while ((tryAgain && attempt < MAX_ATTEMPTS) || channelLists != null);
+
+    } catch (Throwable throwable) {
+      LOGGER.warn(throwable.getMessage());
     }
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
index 817c98e..d9b1e14 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
@@ -19,22 +19,22 @@
 
 package com.youtube.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.services.youtube.YouTube;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 
 import java.io.BufferedOutputStream;
@@ -47,76 +47,86 @@ import java.util.concurrent.TimeUnit;
 
 /**
  *  Retrieve recent activity from a list of channels.
- *
- *  To use from command line:
- *
- *  Supply (at least) the following required configuration in application.conf:
- *
- *  youtube.oauth.pathToP12KeyFile
- *  youtube.oauth.serviceAccountEmailAddress
- *  youtube.apiKey
- *  youtube.youtubeUsers
- *
- *  Launch using:
- *
- *  mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
  */
 public class YoutubeChannelProvider extends YoutubeProvider {
 
-    public YoutubeChannelProvider() {
-        super();
-    }
+  public YoutubeChannelProvider() {
+    super();
+  }
 
-    public YoutubeChannelProvider(YoutubeConfiguration config) {
-        super(config);
-    }
+  public YoutubeChannelProvider(YoutubeConfiguration config) {
+    super(config);
+  }
 
-    @Override
-    protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
-        return new YoutubeChannelDataCollector(youtube, queue, strategy, 
userInfo, this.config);
-    }
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
+    return new YoutubeChannelDataCollector(youtube, queue, strategy, userInfo, 
this.config);
+  }
+
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply (at least) the following required configuration in 
application.conf:
+   *
+   * <p/>
+   * youtube.oauth.pathToP12KeyFile
+   * youtube.oauth.serviceAccountEmailAddress
+   * youtube.apiKey
+   * youtube.youtubeUsers
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
+   *
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
+    YoutubeChannelProvider provider = new YoutubeChannelProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
-        YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
-        YoutubeChannelProvider provider = new YoutubeChannelProvider(config);
-
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    if( datum.getDocument() instanceof String )
-                        json = (String)datum.getDocument();
-                    else
-                        json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          if ( datum.getDocument() instanceof String ) {
+            json = (String) datum.getDocument();
+          } else {
+            json = mapper.writeValueAsString(datum.getDocument());
+          }
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
     }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java
index 3a17134..3eede18 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java
@@ -19,47 +19,52 @@
 
 package com.youtube.provider;
 
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import org.apache.streams.util.api.requests.backoff.BackOffException;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Base Collector for Youtube Data.
+ */
 public abstract class YoutubeDataCollector implements Runnable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeDataCollector.class);
-
-    /**
-     * Looks at the status code of the expception.  If the code indicates that 
the request should be retried,
-     * it executes the back off strategy and returns true.
-     * @param gjre
-     * @param backOff
-     * @return returns true if the error code of the exception indicates the 
request should be retried.
-     */
-    public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, 
BackOffStrategy backOff) throws BackOffException {
-        boolean tryAgain = false;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeDataCollector.class);
 
-        switch (gjre.getStatusCode()) {
-            case 400 :
-                LOGGER.warn("Bad Request  : {}",  gjre);
-                break;
-            case 401 :
-                LOGGER.warn("Invalid Credentials : {}", gjre);
-            case 403 :
-                LOGGER.warn("Possible rate limit exception. Retrying. : {}", 
gjre.getMessage());
-                backOff.backOff();
-                tryAgain = true;
-                break;
-            case 503 :
-                LOGGER.warn("Google Backend Service Error : {}", gjre);
-                break;
-            default:
-                LOGGER.warn("Google Service returned error : {}", gjre);
-                tryAgain = true;
-                backOff.backOff();
-                break;
-        }
+  /**
+   * Looks at the status code of the expception.  If the code indicates that 
the request should be retried,
+   * it executes the back off strategy and returns true.
+   * @param gjre
+   * @param backOff
+   * @return returns true if the error code of the exception indicates the 
request should be retried.
+   */
+  public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, 
BackOffStrategy backOff) throws BackOffException {
+    boolean tryAgain = false;
 
-        return tryAgain;
+    switch (gjre.getStatusCode()) {
+      case 400 :
+        LOGGER.warn("Bad Request  : {}",  gjre);
+        break;
+      case 401 :
+        LOGGER.warn("Invalid Credentials : {}", gjre);
+        break;
+      case 403 :
+        LOGGER.warn("Possible rate limit exception. Retrying. : {}", 
gjre.getMessage());
+        backOff.backOff();
+        tryAgain = true;
+        break;
+      case 503 :
+        LOGGER.warn("Google Backend Service Error : {}", gjre);
+        break;
+      default:
+        LOGGER.warn("Google Service returned error : {}", gjre);
+        tryAgain = true;
+        backOff.backOff();
+        break;
     }
+
+    return tryAgain;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
index ab77467..1442f8b 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
@@ -19,6 +19,16 @@
 
 package com.youtube.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+
 import com.google.api.client.auth.oauth2.Credential;
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
 import com.google.api.client.http.HttpTransport;
@@ -34,15 +44,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -64,205 +65,213 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class YoutubeProvider implements StreamsProvider {
 
-    public static final String STREAMS_ID = "YoutubeProvider";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(YoutubeProvider.class);
-    private final static int MAX_BATCH_SIZE = 1000;
-
-    // This OAuth 2.0 access scope allows for full read/write access to the
-    // authenticated user's account.
-    private List<String> scopes = 
Lists.newArrayList("https://www.googleapis.com/auth/youtube";);
-
-    /**
-     * Define a global instance of the HTTP transport.
-     */
-    public static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
-
-    /**
-     * Define a global instance of the JSON factory.
-     */
-    public static final JsonFactory JSON_FACTORY = new JacksonFactory();
-
-    private static final int DEFAULT_THREAD_POOL_SIZE = 5;
-
-    private List<ListenableFuture<Object>> futures = new ArrayList<>();
-
-    private ListeningExecutorService executor;
-    private BlockingQueue<StreamsDatum> datumQueue;
-    private AtomicBoolean isComplete;
-    private boolean previousPullWasEmpty;
-
-    protected YouTube youtube;
-    protected YoutubeConfiguration config;
-
-    public YoutubeProvider() {
-        this.config = new ComponentConfigurator<>(YoutubeConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
-
-        Preconditions.checkNotNull(this.config.getApiKey());
-    }
-
-    public YoutubeProvider(YoutubeConfiguration config) {
-        this.config = config;
-
-        Preconditions.checkNotNull(this.config.getApiKey());
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+  public static final String STREAMS_ID = "YoutubeProvider";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeProvider.class);
+  private static final int MAX_BATCH_SIZE = 1000;
+
+  // This OAuth 2.0 access scope allows for full read/write access to the
+  // authenticated user's account.
+  private List<String> scopes = 
Lists.newArrayList("https://www.googleapis.com/auth/youtube";);
+
+  /**
+   * Define a global instance of the HTTP transport.
+   */
+  public static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
+
+  /**
+   * Define a global instance of the JSON factory.
+   */
+  public static final JsonFactory JSON_FACTORY = new JacksonFactory();
+
+  private static final int DEFAULT_THREAD_POOL_SIZE = 5;
+
+  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+  private ListeningExecutorService executor;
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private AtomicBoolean isComplete;
+  private boolean previousPullWasEmpty;
+
+  protected YouTube youtube;
+  protected YoutubeConfiguration config;
+
+  /**
+   * YoutubeProvider constructor.
+   * Resolves config from JVM 'youtube'.
+   */
+  public YoutubeProvider() {
+    this.config = new ComponentConfigurator<>(YoutubeConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
+
+    Preconditions.checkNotNull(this.config.getApiKey());
+  }
+
+  /**
+   * YoutubeProvider constructor - uses supplied YoutubeConfiguration.
+   * @param config YoutubeConfiguration
+   */
+  public YoutubeProvider(YoutubeConfiguration config) {
+    this.config = config;
+
+    Preconditions.checkNotNull(this.config.getApiKey());
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    try {
+      this.youtube = createYouTubeClient();
+    } catch (IOException | GeneralSecurityException ex) {
+      LOGGER.error("Failed to created oauth for YouTube : {}", ex);
+      throw new RuntimeException(ex);
     }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        try {
-            this.youtube = createYouTubeClient();
-        } catch (IOException |GeneralSecurityException e) {
-            LOGGER.error("Failed to created oauth for YouTube : {}", e);
-            throw new RuntimeException(e);
-        }
-
-        this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE));
-        this.datumQueue = new LinkedBlockingQueue<>(1000);
-        this.isComplete = new AtomicBoolean(false);
-        this.previousPullWasEmpty = false;
+    this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE));
+    this.datumQueue = new LinkedBlockingQueue<>(1000);
+    this.isComplete = new AtomicBoolean(false);
+    this.previousPullWasEmpty = false;
+  }
+
+  @Override
+  public void startStream() {
+    BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
+
+    for (UserInfo user : this.config.getYoutubeUsers()) {
+      if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == 
null) {
+        user.setAfterDate(this.config.getDefaultAfterDate());
+      }
+      if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() 
== null) {
+        user.setBeforeDate(this.config.getDefaultBeforeDate());
+      }
+
+      ListenableFuture future = 
executor.submit(getDataCollector(backOffStrategy, this.datumQueue, 
this.youtube, user));
+      futures.add(future);
     }
 
-    @Override
-    public void startStream() {
-        BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
-
-        for(UserInfo user : this.config.getYoutubeUsers()) {
-            if(this.config.getDefaultAfterDate() != null && 
user.getAfterDate() == null) {
-                user.setAfterDate(this.config.getDefaultAfterDate());
-            }
-            if(this.config.getDefaultBeforeDate() != null && 
user.getBeforeDate() == null) {
-                user.setBeforeDate(this.config.getDefaultBeforeDate());
-            }
-
-            ListenableFuture future = 
executor.submit(getDataCollector(backOffStrategy, this.datumQueue, 
this.youtube, user));
-            futures.add(future);
-        }
-
-        this.executor.shutdown();
+    this.executor.shutdown();
+  }
+
+  protected abstract Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo);
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
+    int batchCount = 0;
+    while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
+      StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
+      if (datum != null) {
+        ++batchCount;
+        ComponentUtils.offerUntilSuccess(datum, batch);
+      }
     }
-
-    protected abstract Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo);
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
-        int batchCount = 0;
-        while(!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
-            StreamsDatum datum = 
ComponentUtils.pollWhileNotEmpty(this.datumQueue);
-            if(datum != null) {
-                ++batchCount;
-                ComponentUtils.offerUntilSuccess(datum, batch);
-            }
-        }
-        return new StreamsResultSet(batch);
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @VisibleForTesting
+  protected YouTube createYouTubeClient() throws IOException, 
GeneralSecurityException {
+    GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder()
+        .setTransport(HTTP_TRANSPORT)
+        .setJsonFactory(JSON_FACTORY)
+        
.setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress())
+        .setServiceAccountScopes(scopes);
+
+    if ( !Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) 
{
+      File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile());
+      if ( p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) 
{
+        credentialBuilder = 
credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile);
+      }
     }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
+    Credential credential = credentialBuilder.build();
+    return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, 
credential).setApplicationName("Streams Application").build();
+  }
+
+  @Override
+  public void cleanUp() {
+    ComponentUtils.shutdownExecutor(this.executor, 10, 10);
+    this.executor = null;
+  }
+
+  public YoutubeConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(YoutubeConfiguration config) {
+    this.config = config;
+  }
+
+  /**
+   * Set and overwrite the default before date that was read from the 
configuration file.
+   * @param defaultBeforeDate defaultBeforeDate
+   */
+  public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
+    this.config.setDefaultBeforeDate(defaultBeforeDate);
+  }
+
+  /**
+   * Set and overwrite the default after date that was read from teh 
configuration file.
+   * @param defaultAfterDate defaultAfterDate
+   */
+  public void setDefaultAfterDate(DateTime defaultAfterDate) {
+    this.config.setDefaultAfterDate(defaultAfterDate);
+  }
+
+  /**
+   * Sets and overwrite the user info from the configuaration file.  Uses the 
defaults before and after dates.
+   * @param userIds Set of String userIds
+   */
+  public void setUserInfoWithDefaultDates(Set<String> userIds) {
+    List<UserInfo> youtubeUsers = new LinkedList<>();
+
+    for (String userId : userIds) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(this.config.getDefaultAfterDate());
+      user.setBeforeDate(this.config.getDefaultBeforeDate());
+      youtubeUsers.add(user);
     }
 
-    @VisibleForTesting
-    protected YouTube createYouTubeClient() throws IOException, 
GeneralSecurityException {
-        GoogleCredential.Builder credentialBuilder = new 
GoogleCredential.Builder()
-                .setTransport(HTTP_TRANSPORT)
-                .setJsonFactory(JSON_FACTORY)
-                
.setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress())
-                .setServiceAccountScopes(scopes);
-
-        if( 
!Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) {
-            File p12KeyFile = new 
File(getConfig().getOauth().getPathToP12KeyFile());
-            if( p12KeyFile.exists() && p12KeyFile.isFile() && 
p12KeyFile.canRead()) {
-                credentialBuilder = 
credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile);
-            }
-        }
-        Credential credential = credentialBuilder.build();
-        return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, 
credential).setApplicationName("Streams Application").build();
+    this.config.setYoutubeUsers(youtubeUsers);
+  }
+
+  /**
+   * Set and overwrite user into from teh configuration file. Only sets after 
dater.
+   * @param usersAndAfterDates usersAndAfterDates
+   */
+  public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
+    List<UserInfo> youtubeUsers = new LinkedList<>();
+
+    for (String userId : usersAndAfterDates.keySet()) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(usersAndAfterDates.get(userId));
+      youtubeUsers.add(user);
     }
 
-    @Override
-    public void cleanUp() {
-        ComponentUtils.shutdownExecutor(this.executor, 10, 10);
-        this.executor = null;
-    }
-
-    public YoutubeConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(YoutubeConfiguration config) {
-        this.config = config;
-    }
-
-    /**
-     * Set and overwrite the default before date that was read from the 
configuration file.
-     * @param defaultBeforeDate
-     */
-    public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
-        this.config.setDefaultBeforeDate(defaultBeforeDate);
-    }
-
-    /**
-     * Set and overwrite the default after date that was read from teh 
configuration file.
-     * @param defaultAfterDate
-     */
-    public void setDefaultAfterDate(DateTime defaultAfterDate) {
-        this.config.setDefaultAfterDate(defaultAfterDate);
-    }
-
-    /**
-     * Sets and overwrite the user info from the configuaration file.  Uses 
the defaults before and after dates.
-     * @param userIds
-     */
-    public void setUserInfoWithDefaultDates(Set<String> userIds) {
-        List<UserInfo> youtubeUsers = new LinkedList<>();
-
-        for(String userId : userIds) {
-            UserInfo user = new UserInfo();
-            user.setUserId(userId);
-            user.setAfterDate(this.config.getDefaultAfterDate());
-            user.setBeforeDate(this.config.getDefaultBeforeDate());
-            youtubeUsers.add(user);
-        }
-
-        this.config.setYoutubeUsers(youtubeUsers);
-    }
-
-    /**
-     * Set and overwrite user into from teh configuration file. Only sets 
after dater.
-     * @param usersAndAfterDates
-     */
-    public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
-        List<UserInfo> youtubeUsers = new LinkedList<>();
-
-        for(String userId : usersAndAfterDates.keySet()) {
-            UserInfo user = new UserInfo();
-            user.setUserId(userId);
-            user.setAfterDate(usersAndAfterDates.get(userId));
-            youtubeUsers.add(user);
-        }
-
-        this.config.setYoutubeUsers(youtubeUsers);
-    }
+    this.config.setYoutubeUsers(youtubeUsers);
+  }
 
-    @Override
-    public boolean isRunning() {
-        if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-            LOGGER.info("Completed");
-            isComplete.set(true);
-            LOGGER.info("Exiting");
-        }
-        return !isComplete.get();
+  @Override
+  public boolean isRunning() {
+    if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      isComplete.set(true);
+      LOGGER.info("Exiting");
     }
+    return !isComplete.get();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
index 76a69f3..9975dd9 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
@@ -19,6 +19,11 @@
 
 package com.youtube.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
@@ -29,10 +34,6 @@ import 
com.google.api.services.youtube.model.ActivityListResponse;
 import com.google.api.services.youtube.model.Video;
 import com.google.api.services.youtube.model.VideoListResponse;
 import com.google.gson.Gson;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -42,169 +43,187 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
+/**
+ * YoutubeDataCollector for YoutubeUserActivityProvider.
+ */
 public class YoutubeUserActivityCollector extends YoutubeDataCollector {
-    /**
-     * Max results allowed per request
-     * https://developers.google.com/+/api/latest/activities/list
-     */
-    private static final long MAX_RESULTS = 50;
-    private static final int MAX_ATTEMPTS = 5;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeUserActivityCollector.class);
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-
-    static { //set up mapper for Google Activity Object
-        SimpleModule simpleModule = new SimpleModule();
-        MAPPER.registerModule(simpleModule);
-        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-    }
 
-    private BlockingQueue<StreamsDatum> datumQueue;
-    private BackOffStrategy backOff;
-    private YouTube youtube;
-    private UserInfo userInfo;
-    private YoutubeConfiguration config;
-
-    Gson gson = new Gson();
-
-    public YoutubeUserActivityCollector(YouTube youtube, 
BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo 
userInfo, YoutubeConfiguration config) {
-        this.youtube = youtube;
-        this.datumQueue = datumQueue;
-        this.backOff = backOff;
-        this.userInfo = userInfo;
-        this.config = config;
-    }
-
-    @Override
-    public void run() {
-        collectActivityData();
-    }
-
-    /**
-     * Iterate through all users in the Youtube configuration and collect all 
videos
-     * associated with their accounts.
-     */
-    protected void collectActivityData() {
+  /**
+   * Max results allowed per request
+   * https://developers.google.com/+/api/latest/activities/list
+   */
+  private static final long MAX_RESULTS = 50;
+  private static final int MAX_ATTEMPTS = 5;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeUserActivityCollector.class);
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+
+  static { //set up mapper for Google Activity Object
+    SimpleModule simpleModule = new SimpleModule();
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private BackOffStrategy backOff;
+  private YouTube youtube;
+  private UserInfo userInfo;
+  private YoutubeConfiguration config;
+
+  Gson gson = new Gson();
+
+  /**
+   * YoutubeUserActivityCollector constructor.
+   * @param youtube YouTube
+   * @param datumQueue BlockingQueue of StreamsDatum
+   * @param backOff BackOffStrategy
+   * @param userInfo UserInfo
+   * @param config YoutubeConfiguration
+   */
+  public YoutubeUserActivityCollector(
+      YouTube youtube,
+      BlockingQueue<StreamsDatum> datumQueue,
+      BackOffStrategy backOff,
+      UserInfo userInfo,
+      YoutubeConfiguration config) {
+    this.youtube = youtube;
+    this.datumQueue = datumQueue;
+    this.backOff = backOff;
+    this.userInfo = userInfo;
+    this.config = config;
+  }
+
+  @Override
+  public void run() {
+    collectActivityData();
+  }
+
+  /**
+   * Iterate through all users in the Youtube configuration and collect all 
videos
+   * associated with their accounts.
+   */
+  protected void collectActivityData() {
+    try {
+      YouTube.Activities.List request = null;
+      ActivityListResponse feed = null;
+
+      boolean tryAgain = false;
+      int attempt = 0;
+      DateTime afterDate = userInfo.getAfterDate();
+      DateTime beforeDate = userInfo.getBeforeDate();
+
+      do {
         try {
-            YouTube.Activities.List request = null;
-            ActivityListResponse feed = null;
-
-            boolean tryAgain = false;
-            int attempt = 0;
-            DateTime afterDate = userInfo.getAfterDate();
-            DateTime beforeDate = userInfo.getBeforeDate();
-
-            do {
-                try {
-                    if(request == null) {
-                        request = 
this.youtube.activities().list("contentDetails")
-                                                            
.setChannelId(userInfo.getUserId())
-                                                            
.setMaxResults(MAX_RESULTS)
-                                                            
.setKey(config.getApiKey());
-                        feed = request.execute();
-                    } else {
-                        request = 
this.youtube.activities().list("contentDetails")
-                                                            
.setChannelId(userInfo.getUserId())
-                                                            
.setMaxResults(MAX_RESULTS)
-                                                            
.setPageToken(feed.getNextPageToken())
-                                                            
.setKey(config.getApiKey());
-                        feed = request.execute();
-                    }
-                    this.backOff.reset(); //successful pull reset api.
-
-                    processActivityFeed(feed, afterDate, beforeDate);
-                } catch (GoogleJsonResponseException gjre) {
-                    tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
-                    ++attempt;
-                }
-            } while((tryAgain || (feed != null && feed.getNextPageToken() != 
null)) && attempt < MAX_ATTEMPTS);
-        } catch (Throwable t) {
-            if(t instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-            t.printStackTrace();
-            LOGGER.warn("Unable to pull Activities for user={} : 
{}",this.userInfo.getUserId(), t);
+          if (request == null) {
+            request = this.youtube.activities().list("contentDetails")
+                .setChannelId(userInfo.getUserId())
+                .setMaxResults(MAX_RESULTS)
+                .setKey(config.getApiKey());
+            feed = request.execute();
+          } else {
+            request = this.youtube.activities().list("contentDetails")
+                .setChannelId(userInfo.getUserId())
+                .setMaxResults(MAX_RESULTS)
+                .setPageToken(feed.getNextPageToken())
+                .setKey(config.getApiKey());
+            feed = request.execute();
+          }
+          this.backOff.reset(); //successful pull reset api.
+
+          processActivityFeed(feed, afterDate, beforeDate);
+        } catch (GoogleJsonResponseException gjre) {
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
+          ++attempt;
         }
+      }
+      while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) 
&& attempt < MAX_ATTEMPTS);
+    } catch (Throwable throwable) {
+      if (throwable instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throwable.printStackTrace();
+      LOGGER.warn("Unable to pull Activities for user={} : 
{}",this.userInfo.getUserId(), throwable);
     }
-
-    /**
-     * Given a feed and an after and before date, fetch all relevant user 
videos
-     * and place them into the datumQueue for post-processing
-     * @param feed
-     * @param afterDate
-     * @param beforeDate
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    void processActivityFeed(ActivityListResponse feed, DateTime afterDate, 
DateTime beforeDate) throws IOException, InterruptedException {
-        for(com.google.api.services.youtube.model.Activity activity : 
feed.getItems()) {
-            try {
-                List<Video> videos = Lists.newArrayList();
-
-                if (activity.getContentDetails().getUpload() != null) {
-                    
videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId()));
-                }
-                if (activity.getContentDetails().getPlaylistItem() != null && 
activity.getContentDetails().getPlaylistItem().getResourceId() != null) {
-                    
videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId()));
-                }
-
-                processVideos(videos, afterDate, beforeDate, activity, feed);
-            } catch (Exception e) {
-                LOGGER.error("Error while trying to process activity: {}, {}", 
activity, e);
-            }
+  }
+
+  /**
+   * Given a feed and an after and before date, fetch all relevant user videos
+   * and place them into the datumQueue for post-processing.
+   * @param feed ActivityListResponse
+   * @param afterDate DateTime
+   * @param beforeDate DateTime
+   * @throws IOException IOException
+   * @throws InterruptedException InterruptedException
+   */
+  void processActivityFeed(ActivityListResponse feed, DateTime afterDate, 
DateTime beforeDate) throws IOException, InterruptedException {
+    for (com.google.api.services.youtube.model.Activity activity : 
feed.getItems()) {
+      try {
+        List<Video> videos = Lists.newArrayList();
+
+        if (activity.getContentDetails().getUpload() != null) {
+          
videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId()));
         }
-    }
-
-    /**
-     * Process a list of Video objects
-     * @param videos
-     * @param afterDate
-     * @param beforeDate
-     * @param activity
-     * @param feed
-     */
-    void processVideos(List<Video> videos, DateTime afterDate, DateTime 
beforeDate, com.google.api.services.youtube.model.Activity activity, 
ActivityListResponse feed) {
-        try {
-            for (Video video : videos) {
-                if (video != null) {
-                    org.joda.time.DateTime published = new 
org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue());
-                    if ((afterDate == null && beforeDate == null)
-                            || (beforeDate == null && 
afterDate.isBefore(published))
-                            || (afterDate == null && 
beforeDate.isAfter(published))
-                            || ((afterDate != null && beforeDate != null) && 
(afterDate.isAfter(published) && beforeDate.isBefore(published)))) {
-                        LOGGER.debug("Providing Youtube Activity: {}", 
MAPPER.writeValueAsString(video));
-                        this.datumQueue.put(new 
StreamsDatum(gson.toJson(video), activity.getId()));
-                    } else if (afterDate != null && 
afterDate.isAfter(published)) {
-                        feed.setNextPageToken(null); // do not fetch next page
-                        break;
-                    }
-                }
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to process video list: {}, 
{}", videos, e);
+        if (activity.getContentDetails().getPlaylistItem() != null && 
activity.getContentDetails().getPlaylistItem().getResourceId() != null) {
+          
videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId()));
         }
-    }
 
-    /**
-     * Given a Youtube videoId, return the relevant Youtube Video object
-     * @param videoId
-     * @return
-     * @throws IOException
-     */
-    List<Video> getVideoList(String videoId) throws IOException {
-        VideoListResponse videosListResponse = 
this.youtube.videos().list("snippet,statistics")
-                                                                    
.setId(videoId)
-                                                                    
.setKey(config.getApiKey())
-                                                                    .execute();
-
-        if(videosListResponse.getItems().size() == 0) {
-            LOGGER.debug("No Youtube videos found for videoId: {}", videoId);
-            return Lists.newArrayList();
+        processVideos(videos, afterDate, beforeDate, activity, feed);
+      } catch (Exception ex) {
+        LOGGER.error("Error while trying to process activity: {}, {}", 
activity, ex);
+      }
+    }
+  }
+
+  /**
+   * Process a list of Video objects.
+   * @param videos List of Video
+   * @param afterDate afterDate
+   * @param beforeDate beforeDate
+   * @param activity com.google.api.services.youtube.model.Activity
+   * @param feed ActivityListResponse
+   */
+  void processVideos(List<Video> videos, DateTime afterDate, DateTime 
beforeDate, com.google.api.services.youtube.model.Activity activity, 
ActivityListResponse feed) {
+    try {
+      for (Video video : videos) {
+        if (video != null) {
+          org.joda.time.DateTime published = new 
org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue());
+          if ((afterDate == null && beforeDate == null)
+              || (beforeDate == null && afterDate.isBefore(published))
+              || (afterDate == null && beforeDate.isAfter(published))
+              || ((afterDate != null && beforeDate != null) && 
(afterDate.isAfter(published) && beforeDate.isBefore(published)))) {
+            LOGGER.debug("Providing Youtube Activity: {}", 
MAPPER.writeValueAsString(video));
+            this.datumQueue.put(new StreamsDatum(gson.toJson(video), 
activity.getId()));
+          } else if (afterDate != null && afterDate.isAfter(published)) {
+            feed.setNextPageToken(null); // do not fetch next page
+            break;
+          }
         }
-
-        return videosListResponse.getItems();
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to process video list: {}, {}", 
videos, ex);
     }
-
-    BlockingQueue<StreamsDatum> getDatumQueue() {
-        return this.datumQueue;
+  }
+
+  /**
+   * Given a Youtube videoId, return the relevant Youtube Video object.
+   * @param videoId videoId
+   * @return List of Video
+   * @throws IOException
+   */
+  List<Video> getVideoList(String videoId) throws IOException {
+    VideoListResponse videosListResponse = 
this.youtube.videos().list("snippet,statistics")
+        .setId(videoId)
+        .setKey(config.getApiKey())
+        .execute();
+
+    if (videosListResponse.getItems().size() == 0) {
+      LOGGER.debug("No Youtube videos found for videoId: {}", videoId);
+      return Lists.newArrayList();
     }
+
+    return videosListResponse.getItems();
+  }
+
+  BlockingQueue<StreamsDatum> getDatumQueue() {
+    return this.datumQueue;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
index ed3dc63..934a0e5 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
@@ -19,6 +19,14 @@
 
 package com.youtube.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.services.youtube.YouTube;
@@ -27,13 +35,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 
 import java.io.BufferedOutputStream;
@@ -46,76 +47,86 @@ import java.util.concurrent.TimeUnit;
 
 /**
  *  Retrieve recent activity from a list of user ids or names.
- *
- *  To use from command line:
- *
- *  Supply (at least) the following required configuration in application.conf:
- *
- *  youtube.oauth.pathToP12KeyFile
- *  youtube.oauth.serviceAccountEmailAddress
- *  youtube.apiKey
- *  youtube.youtubeUsers
- *
- *  Launch using:
- *
- *  mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
  */
 public class YoutubeUserActivityProvider extends YoutubeProvider {
 
-    public YoutubeUserActivityProvider() {
-        super();
-    }
+  public YoutubeUserActivityProvider() {
+    super();
+  }
 
-    public YoutubeUserActivityProvider(YoutubeConfiguration config) {
-        super(config);
-    }
+  public YoutubeUserActivityProvider(YoutubeConfiguration config) {
+    super(config);
+  }
 
-    @Override
-    protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
-        return new YoutubeUserActivityCollector(youtube, queue, strategy, 
userInfo, config);
-    }
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
+    return new YoutubeUserActivityCollector(youtube, queue, strategy, 
userInfo, config);
+  }
+
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply (at least) the following required configuration in 
application.conf:
+   *
+   * <p/>
+   * youtube.oauth.pathToP12KeyFile
+   * youtube.oauth.serviceAccountEmailAddress
+   * youtube.apiKey
+   * youtube.youtubeUsers
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
+   *
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
+    YoutubeUserActivityProvider provider = new 
YoutubeUserActivityProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
-        YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
-        YoutubeUserActivityProvider provider = new 
YoutubeUserActivityProvider(config);
-
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    if( datum.getDocument() instanceof String )
-                        json = (String)datum.getDocument();
-                    else
-                        json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          if ( datum.getDocument() instanceof String ) {
+            json = (String) datum.getDocument();
+          } else {
+            json = mapper.writeValueAsString(datum.getDocument());
+          }
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
     }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 }


Reply via email to