http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
index ab2f55c..4754353 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
@@ -19,6 +19,13 @@
 
 package com.youtube.serializer;
 
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.pojo.json.Provider;
+
 import com.google.api.client.util.Maps;
 import com.google.api.services.youtube.model.Channel;
 import com.google.api.services.youtube.model.Thumbnail;
@@ -27,12 +34,6 @@ import com.google.api.services.youtube.model.Video;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Image;
-import org.apache.streams.pojo.json.Provider;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,153 +42,160 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class YoutubeActivityUtil {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeActivityUtil.class);
-
-    /**
-     * Given a {@link com.google.api.services.youtube.YouTube.Videos} object 
and an
-     * {@link org.apache.streams.pojo.json.Activity} object, fill out the 
appropriate details
-     *
-     * @param video
-     * @param activity
-     * @throws org.apache.streams.exceptions.ActivitySerializerException
-     */
-    public static void updateActivity(Video video, Activity activity, String 
channelId) throws ActivitySerializerException {
-        activity.setActor(buildActor(video, 
video.getSnippet().getChannelId()));
-        activity.setVerb("post");
-
-        activity.setId(formatId(activity.getVerb(),
-                Optional.fromNullable(
-                        video.getId())
-                        .orNull()));
-
-        activity.setPublished(new 
DateTime(video.getSnippet().getPublishedAt().getValue()));
-        activity.setTitle(video.getSnippet().getTitle());
-        activity.setContent(video.getSnippet().getDescription());
-        activity.setUrl("https://www.youtube.com/watch?v="; + video.getId());
-
-        activity.setProvider(getProvider());
-
-        activity.setObject(buildActivityObject(video));
-
-        addYoutubeExtensions(activity, video);
-    }
-
 
-    /**
-     * Given a {@link com.google.api.services.youtube.model.Channel} object 
and an
-     * {@link org.apache.streams.pojo.json.Activity} object, fill out the 
appropriate details
-     *
-     * @param channel
-     * @param activity
-     * @throws org.apache.streams.exceptions.ActivitySerializerException
-     */
-    public static void updateActivity(Channel channel, Activity activity, 
String channelId) throws ActivitySerializerException {
-        try {
-            activity.setProvider(getProvider());
-            activity.setVerb("post");
-            activity.setActor(createActorForChannel(channel));
-            Map<String, Object> extensions = Maps.newHashMap();
-            extensions.put("youtube", channel);
-            activity.setAdditionalProperty("extensions", extensions);
-        } catch (Throwable t) {
-            throw new ActivitySerializerException(t);
-        }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeActivityUtil.class);
+
+  /**
+   * Given a {@link com.google.api.services.youtube.YouTube.Videos} object and 
an
+   * {@link org.apache.streams.pojo.json.Activity} object, fill out the 
appropriate details
+   *
+   * @param video Video
+   * @param activity Activity
+   * @throws ActivitySerializerException ActivitySerializerException
+   */
+  public static void updateActivity(Video video, Activity activity, String 
channelId) throws ActivitySerializerException {
+    activity.setActor(buildActor(video, video.getSnippet().getChannelId()));
+    activity.setVerb("post");
+
+    activity.setId(formatId(activity.getVerb(),
+        Optional.fromNullable(
+            video.getId())
+            .orNull()));
+
+    activity.setPublished(new 
DateTime(video.getSnippet().getPublishedAt().getValue()));
+    activity.setTitle(video.getSnippet().getTitle());
+    activity.setContent(video.getSnippet().getDescription());
+    activity.setUrl("https://www.youtube.com/watch?v="; + video.getId());
+
+    activity.setProvider(getProvider());
+
+    activity.setObject(buildActivityObject(video));
+
+    addYoutubeExtensions(activity, video);
+  }
+
+
+  /**
+   * Given a {@link com.google.api.services.youtube.model.Channel} object and 
an
+   * {@link org.apache.streams.pojo.json.Activity} object, fill out the 
appropriate details
+   *
+   * @param channel Channel
+   * @param activity Activity
+   * @throws ActivitySerializerException ActivitySerializerException
+   */
+  public static void updateActivity(Channel channel, Activity activity, String 
channelId) throws ActivitySerializerException {
+    try {
+      activity.setProvider(getProvider());
+      activity.setVerb("post");
+      activity.setActor(createActorForChannel(channel));
+      Map<String, Object> extensions = Maps.newHashMap();
+      extensions.put("youtube", channel);
+      activity.setAdditionalProperty("extensions", extensions);
+    } catch (Throwable throwable) {
+      throw new ActivitySerializerException(throwable);
     }
-
-    public static ActivityObject createActorForChannel(Channel channel) {
-        ActivityObject actor = new ActivityObject();
-        actor.setId("id:youtube:"+channel.getId());
-        actor.setSummary(channel.getSnippet().getDescription());
-        actor.setDisplayName(channel.getSnippet().getTitle());
-        Image image = new Image();
-        image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl());
-        actor.setImage(image);
-        actor.setUrl("https://youtube.com/user/"; + channel.getId());
-        Map<String, Object> actorExtensions = Maps.newHashMap();
-        actorExtensions.put("followers", 
channel.getStatistics().getSubscriberCount());
-        actorExtensions.put("posts", channel.getStatistics().getVideoCount());
-        actor.setAdditionalProperty("extensions", actorExtensions);
-        return actor;
+  }
+
+  /**
+   * createActorForChannel.
+   * @param channel Channel
+   * @return $.actor
+   */
+  public static ActivityObject createActorForChannel(Channel channel) {
+    ActivityObject actor = new ActivityObject();
+    // TODO: use generic provider id concatenator
+    actor.setId("id:youtube:" + channel.getId());
+    actor.setSummary(channel.getSnippet().getDescription());
+    actor.setDisplayName(channel.getSnippet().getTitle());
+    Image image = new Image();
+    image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl());
+    actor.setImage(image);
+    actor.setUrl("https://youtube.com/user/"; + channel.getId());
+    Map<String, Object> actorExtensions = Maps.newHashMap();
+    actorExtensions.put("followers", 
channel.getStatistics().getSubscriberCount());
+    actorExtensions.put("posts", channel.getStatistics().getVideoCount());
+    actor.setAdditionalProperty("extensions", actorExtensions);
+    return actor;
+  }
+
+  /**
+   * Given a video object, create the appropriate activity object with a valid 
image
+   * (thumbnail) and video URL.
+   * @param video Video
+   * @return Activity Object with Video URL and a thumbnail image
+   */
+  private static ActivityObject buildActivityObject(Video video) {
+    ActivityObject activityObject = new ActivityObject();
+
+    ThumbnailDetails thumbnailDetails = video.getSnippet().getThumbnails();
+    Thumbnail thumbnail = thumbnailDetails.getDefault();
+
+    if (thumbnail != null) {
+      Image image = new Image();
+      image.setUrl(thumbnail.getUrl());
+      image.setHeight(thumbnail.getHeight());
+      image.setWidth(thumbnail.getWidth());
+
+      activityObject.setImage(image);
     }
 
-    /**
-     * Given a video object, create the appropriate activity object with a 
valid image
-     * (thumbnail) and video URL
-     * @param video
-     * @return Activity Object with Video URL and a thumbnail image
-     */
-    private static ActivityObject buildActivityObject(Video video) {
-        ActivityObject activityObject = new ActivityObject();
-
-        ThumbnailDetails thumbnailDetails = video.getSnippet().getThumbnails();
-        Thumbnail thumbnail = thumbnailDetails.getDefault();
-
-        if(thumbnail != null) {
-            Image image = new Image();
-            image.setUrl(thumbnail.getUrl());
-            image.setHeight(thumbnail.getHeight());
-            image.setWidth(thumbnail.getWidth());
-
-            activityObject.setImage(image);
-        }
+    activityObject.setUrl("https://www.youtube.com/watch?v="; + video.getId());
+    activityObject.setObjectType("video");
 
-        activityObject.setUrl("https://www.youtube.com/watch?v="; + 
video.getId());
-        activityObject.setObjectType("video");
+    return activityObject;
+  }
 
-        return activityObject;
-    }
-
-    /**
-     * Add the Youtube extensions to the Activity object that we're building
-     * @param activity
-     * @param video
-     */
-    private static void addYoutubeExtensions(Activity activity, Video video) {
-        Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
-
-        extensions.put("youtube", video);
-
-        if(video.getStatistics() != null) {
-            Map<String, Object> likes = new HashMap<>();
-            likes.put("count", video.getStatistics().getCommentCount());
-            extensions.put("likes", likes);
-        }
-    }
+  /**
+   * Add the Youtube extensions to the Activity object that we're building.
+   * @param activity Activity
+   * @param video Video
+   */
+  private static void addYoutubeExtensions(Activity activity, Video video) {
+    Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
 
-    /**
-     * Build an {@link org.apache.streams.pojo.json.ActivityObject} actor 
given the video object
-     * @param video
-     * @param id
-     * @return Actor object
-     */
-    private static ActivityObject buildActor(Video video, String id) {
-        ActivityObject actor = new ActivityObject();
-
-        actor.setId("id:youtube:" + id);
-        actor.setDisplayName(video.getSnippet().getChannelTitle());
-        actor.setSummary(video.getSnippet().getDescription());
-        actor.setAdditionalProperty("handle", 
video.getSnippet().getChannelTitle());
-
-        return actor;
-    }
-
-    /**
-     * Gets the common youtube {@link org.apache.streams.pojo.json.Provider} 
object
-     * @return a provider object representing YouTube
-     */
-    public static Provider getProvider() {
-        Provider provider = new Provider();
-        provider.setId("id:providers:youtube");
-        provider.setDisplayName("YouTube");
-        return provider;
-    }
+    extensions.put("youtube", video);
 
-    /**
-     * Formats the ID to conform with the Apache Streams activity ID convention
-     * @param idparts the parts of the ID to join
-     * @return a valid Activity ID in format "id:youtube:part1:part2:...partN"
-     */
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:youtube", idparts));
+    if (video.getStatistics() != null) {
+      Map<String, Object> likes = new HashMap<>();
+      likes.put("count", video.getStatistics().getCommentCount());
+      extensions.put("likes", likes);
     }
+  }
+
+  /**
+   * Build an {@link org.apache.streams.pojo.json.ActivityObject} actor given 
the video object
+   * @param video Video
+   * @param id id
+   * @return Actor object
+   */
+  private static ActivityObject buildActor(Video video, String id) {
+    ActivityObject actor = new ActivityObject();
+
+    actor.setId("id:youtube:" + id);
+    actor.setDisplayName(video.getSnippet().getChannelTitle());
+    actor.setSummary(video.getSnippet().getDescription());
+    actor.setAdditionalProperty("handle", 
video.getSnippet().getChannelTitle());
+
+    return actor;
+  }
+
+  /**
+   * Gets the common youtube {@link org.apache.streams.pojo.json.Provider} 
object
+   * @return a provider object representing YouTube
+   */
+  public static Provider getProvider() {
+    Provider provider = new Provider();
+    provider.setId("id:providers:youtube");
+    provider.setDisplayName("YouTube");
+    return provider;
+  }
+
+  /**
+   * Formats the ID to conform with the Apache Streams activity ID convention
+   * @param idparts the parts of the ID to join
+   * @return a valid Activity ID in format "id:youtube:part1:part2:...partN"
+   */
+  public static String formatId(String... idparts) {
+    return Joiner.on(":").join(Lists.asList("id:youtube", idparts));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
index 019c7c5..ea9a49d 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
@@ -40,111 +40,116 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- *
+ * YoutubeChannelDeserializer is a JsonDeserializer for Channel.
  */
 public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> {
 
-
-    @Override
-    public Channel deserialize(JsonParser jp, DeserializationContext ctxt) 
throws IOException, JsonProcessingException {
-        JsonNode node = jp.getCodec().readTree(jp);
-        try {
-            Channel channel = new Channel();
-            if(node.findPath("etag") != null)
-                channel.setEtag(node.get("etag").asText());
-            if(node.findPath("kind") != null)
-                channel.setKind(node.get("kind").asText());
-            channel.setId(node.get("id").asText());
-            
channel.setTopicDetails(setTopicDetails(node.findValue("topicDetails")));
-            
channel.setStatistics(setChannelStatistics(node.findValue("statistics")));
-            
channel.setContentDetails(setContentDetails(node.findValue("contentDetails")));
-            channel.setSnippet(setChannelSnippet(node.findValue("snippet")));
-            return channel;
-        } catch (Throwable t) {
-            throw new IOException(t);
-        }
+  @Override
+  public Channel deserialize(JsonParser jp, DeserializationContext ctxt) 
throws IOException, JsonProcessingException {
+    JsonNode node = jp.getCodec().readTree(jp);
+    try {
+      Channel channel = new Channel();
+      if (node.findPath("etag") != null) {
+        channel.setEtag(node.get("etag").asText());
+      }
+      if (node.findPath("kind") != null) {
+        channel.setKind(node.get("kind").asText());
+      }
+      channel.setId(node.get("id").asText());
+      channel.setTopicDetails(setTopicDetails(node.findValue("topicDetails")));
+      
channel.setStatistics(setChannelStatistics(node.findValue("statistics")));
+      
channel.setContentDetails(setContentDetails(node.findValue("contentDetails")));
+      channel.setSnippet(setChannelSnippet(node.findValue("snippet")));
+      return channel;
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
     }
+  }
 
-    protected ChannelSnippet setChannelSnippet(JsonNode node) {
-        ChannelSnippet snippet = new ChannelSnippet();
-        snippet.setTitle(node.get("title").asText());
-        snippet.setDescription(node.get("description").asText());
-        snippet.setPublishedAt(new 
DateTime(node.get("publishedAt").get("value").longValue()));
-        snippet.setLocalized(setLocalized(node.findValue("localized")));
-        snippet.setThumbnails(setThumbnails(node.findValue("thumbnails")));
-        return snippet;
-    }
+  protected ChannelSnippet setChannelSnippet(JsonNode node) {
+    ChannelSnippet snippet = new ChannelSnippet();
+    snippet.setTitle(node.get("title").asText());
+    snippet.setDescription(node.get("description").asText());
+    snippet.setPublishedAt(new 
DateTime(node.get("publishedAt").get("value").longValue()));
+    snippet.setLocalized(setLocalized(node.findValue("localized")));
+    snippet.setThumbnails(setThumbnails(node.findValue("thumbnails")));
+    return snippet;
+  }
 
-    protected ThumbnailDetails setThumbnails(JsonNode node) {
-        ThumbnailDetails details = new ThumbnailDetails();
-        if(node == null) {
-            return details;
-        }
-        details.setDefault(new 
Thumbnail().setUrl(node.get("default").get("url").asText()));
-        details.setHigh(new 
Thumbnail().setUrl(node.get("high").get("url").asText()));
-        details.setMedium(new 
Thumbnail().setUrl(node.get("medium").get("url").asText()));
-        return details;
+  protected ThumbnailDetails setThumbnails(JsonNode node) {
+    ThumbnailDetails details = new ThumbnailDetails();
+    if (node == null) {
+      return details;
     }
+    details.setDefault(new 
Thumbnail().setUrl(node.get("default").get("url").asText()));
+    details.setHigh(new 
Thumbnail().setUrl(node.get("high").get("url").asText()));
+    details.setMedium(new 
Thumbnail().setUrl(node.get("medium").get("url").asText()));
+    return details;
+  }
 
-    protected ChannelLocalization setLocalized(JsonNode node) {
-        if(node == null) {
-            return new ChannelLocalization();
-        }
-        ChannelLocalization localization = new ChannelLocalization();
-        localization.setDescription(node.get("description").asText());
-        localization.setTitle(node.get("title").asText());
-        return localization;
+  protected ChannelLocalization setLocalized(JsonNode node) {
+    if (node == null) {
+      return new ChannelLocalization();
     }
+    ChannelLocalization localization = new ChannelLocalization();
+    localization.setDescription(node.get("description").asText());
+    localization.setTitle(node.get("title").asText());
+    return localization;
+  }
 
-    protected ChannelContentDetails setContentDetails(JsonNode node) {
-        ChannelContentDetails contentDetails = new ChannelContentDetails();
-        if(node == null) {
-            return contentDetails;
-        }
-        if(node.findValue("googlePlusUserId") != null)
-            
contentDetails.setGooglePlusUserId(node.get("googlePlusUserId").asText());
-        
contentDetails.setRelatedPlaylists(setRelatedPlaylists(node.findValue("relatedPlaylists")));
-        return contentDetails;
+  protected ChannelContentDetails setContentDetails(JsonNode node) {
+    ChannelContentDetails contentDetails = new ChannelContentDetails();
+    if (node == null) {
+      return contentDetails;
+    }
+    if (node.findValue("googlePlusUserId") != null) {
+      
contentDetails.setGooglePlusUserId(node.get("googlePlusUserId").asText());
     }
+    
contentDetails.setRelatedPlaylists(setRelatedPlaylists(node.findValue("relatedPlaylists")));
+    return contentDetails;
+  }
 
-    protected ChannelContentDetails.RelatedPlaylists 
setRelatedPlaylists(JsonNode node) {
-        ChannelContentDetails.RelatedPlaylists playlists = new 
ChannelContentDetails.RelatedPlaylists();
-        if(node == null) {
-            return playlists;
-        }
-        if(node.findValue("favorites") != null)
-            playlists.setFavorites(node.get("favorites").asText());
-        if(node.findValue("likes") != null)
-            playlists.setLikes(node.get("likes").asText());
-        if(node.findValue("uploads") != null)
-            playlists.setUploads(node.get("uploads").asText());
-        return playlists;
+  protected ChannelContentDetails.RelatedPlaylists 
setRelatedPlaylists(JsonNode node) {
+    ChannelContentDetails.RelatedPlaylists playlists = new 
ChannelContentDetails.RelatedPlaylists();
+    if (node == null) {
+      return playlists;
     }
+    if (node.findValue("favorites") != null) {
+      playlists.setFavorites(node.get("favorites").asText());
+    }
+    if (node.findValue("likes") != null) {
+      playlists.setLikes(node.get("likes").asText());
+    }
+    if (node.findValue("uploads") != null) {
+      playlists.setUploads(node.get("uploads").asText());
+    }
+    return playlists;
+  }
 
-    protected ChannelStatistics setChannelStatistics(JsonNode node) {
-        ChannelStatistics stats = new ChannelStatistics();
-        if(node == null) {
-            return stats;
-        }
-        stats.setCommentCount(node.get("commentCount").bigIntegerValue());
-        
stats.setHiddenSubscriberCount(node.get("hiddenSubscriberCount").asBoolean());
-        
stats.setSubscriberCount(node.get("subscriberCount").bigIntegerValue());
-        stats.setVideoCount(node.get("videoCount").bigIntegerValue());
-        stats.setViewCount(node.get("viewCount").bigIntegerValue());
-        return stats;
+  protected ChannelStatistics setChannelStatistics(JsonNode node) {
+    ChannelStatistics stats = new ChannelStatistics();
+    if (node == null) {
+      return stats;
     }
+    stats.setCommentCount(node.get("commentCount").bigIntegerValue());
+    
stats.setHiddenSubscriberCount(node.get("hiddenSubscriberCount").asBoolean());
+    stats.setSubscriberCount(node.get("subscriberCount").bigIntegerValue());
+    stats.setVideoCount(node.get("videoCount").bigIntegerValue());
+    stats.setViewCount(node.get("viewCount").bigIntegerValue());
+    return stats;
+  }
 
-    protected ChannelTopicDetails setTopicDetails(JsonNode node) {
-        ChannelTopicDetails details = new ChannelTopicDetails();
-        if(node == null) {
-            return details;
-        }
-        List<String> topicIds = Lists.newLinkedList();
-        Iterator<JsonNode> it = node.get("topicIds").iterator();
-        while(it.hasNext()) {
-            topicIds.add(it.next().asText());
-        }
-        details.setTopicIds(topicIds);
-        return  details;
+  protected ChannelTopicDetails setTopicDetails(JsonNode node) {
+    ChannelTopicDetails details = new ChannelTopicDetails();
+    if (node == null) {
+      return details;
+    }
+    List<String> topicIds = Lists.newLinkedList();
+    Iterator<JsonNode> it = node.get("topicIds").iterator();
+    while (it.hasNext()) {
+      topicIds.add(it.next().asText());
     }
+    details.setTopicIds(topicIds);
+    return  details;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
index de7bc3a..65a454c 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
@@ -18,38 +18,44 @@
 
 package com.youtube.serializer;
 
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.api.services.youtube.model.Video;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import java.io.IOException;
 
 public class YoutubeEventClassifier {
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-    private static final String VIDEO_IDENTIFIER = "\"youtube#video\"";
-    private static final String CHANNEL_IDENTIFIER = "youtube#channel";
-
-    public static Class detectClass(String json) {
-        Preconditions.checkNotNull(json);
-        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
-        ObjectNode objectNode;
-        try {
-            objectNode = (ObjectNode) mapper.readTree(json);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return null;
-        }
-
-        if (objectNode.findValue("kind") != null && 
objectNode.get("kind").toString().equals(VIDEO_IDENTIFIER)) {
-            return Video.class;
-        }  else if (objectNode.findValue("kind") != null && 
objectNode.get("kind").toString().contains(CHANNEL_IDENTIFIER)){
-            return com.google.api.services.youtube.model.Channel.class;
-        }  else {
-            return ObjectNode.class;
-        }
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private static final String VIDEO_IDENTIFIER = "\"youtube#video\"";
+  private static final String CHANNEL_IDENTIFIER = "youtube#channel";
+
+  /**
+   * detect probable Class of a json String from YouTube.
+   * @param json json
+   * @return Class
+   */
+  public static Class detectClass(String json) {
+    Preconditions.checkNotNull(json);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+    ObjectNode objectNode;
+    try {
+      objectNode = (ObjectNode) mapper.readTree(json);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      return null;
+    }
+
+    if (objectNode.findValue("kind") != null && 
objectNode.get("kind").toString().equals(VIDEO_IDENTIFIER)) {
+      return Video.class;
+    } else if (objectNode.findValue("kind") != null && 
objectNode.get("kind").toString().contains(CHANNEL_IDENTIFIER)) {
+      return com.google.api.services.youtube.model.Channel.class;
+    } else {
+      return ObjectNode.class;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
index dbe3303..43fe8c6 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
@@ -24,89 +24,95 @@ import 
com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.api.client.util.DateTime;
-import com.google.api.services.youtube.model.*;
+import com.google.api.services.youtube.model.Thumbnail;
+import com.google.api.services.youtube.model.ThumbnailDetails;
+import com.google.api.services.youtube.model.Video;
+import com.google.api.services.youtube.model.VideoSnippet;
+import com.google.api.services.youtube.model.VideoStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-public class YoutubeVideoDeserializer extends JsonDeserializer<Video> {
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(YoutubeVideoDeserializer.class);
-
-    /**
-     * Because the Youtube Video object contains complex objects within its 
hierarchy, we have to use
-     * a custom deserializer
-     *
-     * @param jsonParser
-     * @param deserializationContext
-     * @return The deserialized {@link 
com.google.api.services.youtube.YouTube.Videos} object
-     * @throws java.io.IOException
-     * @throws com.fasterxml.jackson.core.JsonProcessingException
-     */
-    @Override
-    public Video deserialize(JsonParser jsonParser, DeserializationContext 
deserializationContext) throws IOException, JsonProcessingException {
-        JsonNode node = jsonParser.getCodec().readTree(jsonParser);
-        Video video = new Video();
-
-        try {
-            video.setId(node.get("id").asText());
-            video.setEtag(node.get("etag").asText());
-            video.setKind(node.get("kind").asText());
-
-            video.setSnippet(buildSnippet(node));
-            video.setStatistics(buildStatistics(node));
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to deserialize a Video 
object: {}", e);
-        }
-
-        return video;
-    }
 
-    /**
-     * Given the raw JsonNode, construct a video snippet object
-     * @param node
-     * @return VideoSnippet
-     */
-    private VideoSnippet buildSnippet(JsonNode node) {
-        VideoSnippet snippet = new VideoSnippet();
-        JsonNode snippetNode = node.get("snippet");
+public class YoutubeVideoDeserializer extends JsonDeserializer<Video> {
 
-        snippet.setChannelId(snippetNode.get("channelId").asText());
-        snippet.setChannelTitle(snippetNode.get("channelTitle").asText());
-        snippet.setDescription(snippetNode.get("description").asText());
-        snippet.setTitle(snippetNode.get("title").asText());
-        snippet.setPublishedAt(new 
DateTime(snippetNode.get("publishedAt").get("value").asLong()));
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeVideoDeserializer.class);
+
+  /**
+   * Because the Youtube Video object contains complex objects within its 
hierarchy, we have to use
+   * a custom deserializer
+   *
+   * @param jsonParser jsonParser
+   * @param deserializationContext deserializationContext
+   * @return The deserialized {@link 
com.google.api.services.youtube.YouTube.Videos} object
+   * @throws java.io.IOException IOException
+   * @throws com.fasterxml.jackson.core.JsonProcessingException 
JsonProcessingException
+   */
+  @Override
+  public Video deserialize(JsonParser jsonParser, DeserializationContext 
deserializationContext) throws IOException, JsonProcessingException {
+    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+    Video video = new Video();
+
+    try {
+      video.setId(node.get("id").asText());
+      video.setEtag(node.get("etag").asText());
+      video.setKind(node.get("kind").asText());
+
+      video.setSnippet(buildSnippet(node));
+      video.setStatistics(buildStatistics(node));
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserialize a Video object: {}", 
ex);
+    }
 
-        ThumbnailDetails thumbnailDetails = new ThumbnailDetails();
-        for(JsonNode t : snippetNode.get("thumbnails")) {
-            Thumbnail thumbnail = new Thumbnail();
+    return video;
+  }
+
+  /**
+   * Given the raw JsonNode, construct a video snippet object.
+   * @param node JsonNode
+   * @return VideoSnippet
+   */
+  private VideoSnippet buildSnippet(JsonNode node) {
+    VideoSnippet snippet = new VideoSnippet();
+    JsonNode snippetNode = node.get("snippet");
+
+    snippet.setChannelId(snippetNode.get("channelId").asText());
+    snippet.setChannelTitle(snippetNode.get("channelTitle").asText());
+    snippet.setDescription(snippetNode.get("description").asText());
+    snippet.setTitle(snippetNode.get("title").asText());
+    snippet.setPublishedAt(new 
DateTime(snippetNode.get("publishedAt").get("value").asLong()));
+
+    ThumbnailDetails thumbnailDetails = new ThumbnailDetails();
+    for (JsonNode t : snippetNode.get("thumbnails")) {
+      Thumbnail thumbnail = new Thumbnail();
+
+      thumbnail.setHeight(t.get("height").asLong());
+      thumbnail.setUrl(t.get("url").asText());
+      thumbnail.setWidth(t.get("width").asLong());
+
+      thumbnailDetails.setDefault(thumbnail);
+    }
 
-            thumbnail.setHeight(t.get("height").asLong());
-            thumbnail.setUrl(t.get("url").asText());
-            thumbnail.setWidth(t.get("width").asLong());
+    snippet.setThumbnails(thumbnailDetails);
 
-            thumbnailDetails.setDefault(thumbnail);
-        }
+    return snippet;
+  }
 
-        snippet.setThumbnails(thumbnailDetails);
+  /**
+   * Given the raw JsonNode, construct a statistics object.
+   * @param node JsonNode
+   * @return VideoStatistics
+   */
+  private VideoStatistics buildStatistics(JsonNode node) {
+    VideoStatistics statistics = new VideoStatistics();
+    JsonNode statisticsNode = node.get("statistics");
 
-        return snippet;
-    }
+    
statistics.setCommentCount(statisticsNode.get("commentCount").bigIntegerValue());
+    
statistics.setDislikeCount(statisticsNode.get("dislikeCount").bigIntegerValue());
+    
statistics.setFavoriteCount(statisticsNode.get("favoriteCount").bigIntegerValue());
+    statistics.setLikeCount(statisticsNode.get("likeCount").bigIntegerValue());
+    statistics.setViewCount(statisticsNode.get("viewCount").bigIntegerValue());
 
-    /**
-     * Given the raw JsonNode, construct a statistics object
-     * @param node
-     * @return VideoStatistics
-     */
-    private VideoStatistics buildStatistics(JsonNode node) {
-        VideoStatistics statistics = new VideoStatistics();
-        JsonNode statisticsNode = node.get("statistics");
-
-        
statistics.setCommentCount(statisticsNode.get("commentCount").bigIntegerValue());
-        
statistics.setDislikeCount(statisticsNode.get("dislikeCount").bigIntegerValue());
-        
statistics.setFavoriteCount(statisticsNode.get("favoriteCount").bigIntegerValue());
-        
statistics.setLikeCount(statisticsNode.get("likeCount").bigIntegerValue());
-        
statistics.setViewCount(statisticsNode.get("viewCount").bigIntegerValue());
-
-        return statistics;
-    }
+    return statistics;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
index 8b53776..469b8d0 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/processor/YoutubeTypeConverterTest.java
@@ -15,17 +15,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package com.youtube.processor;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.youtube.model.Video;
 import com.youtube.serializer.YoutubeActivityUtil;
 import com.youtube.serializer.YoutubeVideoDeserializer;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -36,67 +38,74 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Test for YoutubeTypeConverter.
+ */
 public class YoutubeTypeConverterTest {
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(YoutubeTypeConverterTest.class);
-    private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-
-    private YoutubeTypeConverter youtubeTypeConverter;
-    private ObjectMapper objectMapper;
-
-    @Before
-    public void setup() {
-        objectMapper = StreamsJacksonMapper.getInstance();
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Video.class, new 
YoutubeVideoDeserializer());
-        objectMapper.registerModule(simpleModule);
-        
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-
-        youtubeTypeConverter = new YoutubeTypeConverter();
-        youtubeTypeConverter.prepare(null);
-    }
-
-    @Test
-    public void testVideoConversion() {
-        try {
-            LOGGER.info("raw: {}", testVideo);
-            Activity activity = new Activity();
-
-            Video video = objectMapper.readValue(testVideo, Video.class);
-            StreamsDatum streamsDatum = new StreamsDatum(video);
-
-            assertNotNull(streamsDatum.getDocument());
-
-            List<StreamsDatum> retList = 
youtubeTypeConverter.process(streamsDatum);
-            YoutubeActivityUtil.updateActivity(video, activity, 
"testChannelId");
 
-            assertEquals(retList.size(), 1);
-            assert (retList.get(0).getDocument() instanceof Activity);
-            assertEquals(activity, retList.get(0).getDocument());
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to convert video to activity: 
{}", e);
-        }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeTypeConverterTest.class);
+  private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+
+  private YoutubeTypeConverter youtubeTypeConverter;
+  private ObjectMapper objectMapper;
+
+  /**
+   * setup for test.
+   */
+  @Before
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+    youtubeTypeConverter = new YoutubeTypeConverter();
+    youtubeTypeConverter.prepare(null);
+  }
+
+  @Test
+  public void testVideoConversion() {
+    try {
+      LOGGER.info("raw: {}", testVideo);
+      Activity activity = new Activity();
+
+      Video video = objectMapper.readValue(testVideo, Video.class);
+      StreamsDatum streamsDatum = new StreamsDatum(video);
+
+      assertNotNull(streamsDatum.getDocument());
+
+      List<StreamsDatum> retList = youtubeTypeConverter.process(streamsDatum);
+      YoutubeActivityUtil.updateActivity(video, activity, "testChannelId");
+
+      assertEquals(retList.size(), 1);
+      assert (retList.get(0).getDocument() instanceof Activity);
+      assertEquals(activity, retList.get(0).getDocument());
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to convert video to activity: {}", 
ex);
     }
+  }
 
-    @Test
-    public void testStringVideoConversion() {
-        try {
-            LOGGER.info("raw: {}", testVideo);
-            Activity activity = new Activity();
+  @Test
+  public void testStringVideoConversion() {
+    try {
+      LOGGER.info("raw: {}", testVideo);
+      Activity activity = new Activity();
 
-            Video video = objectMapper.readValue(testVideo, Video.class);
-            StreamsDatum streamsDatum = new StreamsDatum(testVideo);
+      Video video = objectMapper.readValue(testVideo, Video.class);
+      StreamsDatum streamsDatum = new StreamsDatum(testVideo);
 
-            assertNotNull(streamsDatum.getDocument());
+      assertNotNull(streamsDatum.getDocument());
 
-            List<StreamsDatum> retList = 
youtubeTypeConverter.process(streamsDatum);
-            YoutubeActivityUtil.updateActivity(video, activity, 
"testChannelId");
+      List<StreamsDatum> retList = youtubeTypeConverter.process(streamsDatum);
+      YoutubeActivityUtil.updateActivity(video, activity, "testChannelId");
 
-            assertEquals(retList.size(), 1);
-            assert (retList.get(0).getDocument() instanceof Activity);
-            assertEquals(activity, retList.get(0).getDocument());
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to convert video to activity: 
{}", e);
-        }
+      assertEquals(retList.size(), 1);
+      assert (retList.get(0).getDocument() instanceof Activity);
+      assertEquals(activity, retList.get(0).getDocument());
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to convert video to activity: {}", 
ex);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
index 04c9456..4751f00 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
@@ -19,15 +19,16 @@
 
 package com.youtube.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import 
org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
+
 import com.google.api.services.youtube.YouTube;
 import com.google.api.services.youtube.model.Channel;
 import com.google.api.services.youtube.model.ChannelListResponse;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import 
org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.junit.Test;
 
@@ -41,61 +42,61 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Created by rebanks on 2/17/15.
+ * YoutubeChannelDataCollectorTest tests YoutubeChannelDataCollector.
  */
 public class YoutubeChannelDataCollectorTest {
 
-    private static final String ID = "12345";
+  private static final String ID = "12345";
 
-    @Test
-    public void testDataCollector() throws Exception {
-        YouTube youTube = createMockYoutube();
-        BlockingQueue<StreamsDatum> queue = Queues.newLinkedBlockingQueue();
-        BackOffStrategy strategy = new LinearTimeBackOffStrategy(1);
-        UserInfo userInfo = new UserInfo();
-        userInfo.setUserId(ID);
-        YoutubeConfiguration config = new YoutubeConfiguration();
-        config.setApiKey(ID);
-        YoutubeChannelDataCollector collector = new 
YoutubeChannelDataCollector(youTube, queue, strategy, userInfo, config);
-        collector.run();
-        assertEquals(1, queue.size());
-        StreamsDatum datum = queue.take();
-        assertNotNull(datum);
-        String document = (String) datum.getDocument();
-        assertNotNull(document);
-    }
+  @Test
+  public void testDataCollector() throws Exception {
+    YouTube youTube = createMockYoutube();
+    BlockingQueue<StreamsDatum> queue = Queues.newLinkedBlockingQueue();
+    BackOffStrategy strategy = new LinearTimeBackOffStrategy(1);
+    UserInfo userInfo = new UserInfo();
+    userInfo.setUserId(ID);
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey(ID);
+    YoutubeChannelDataCollector collector = new 
YoutubeChannelDataCollector(youTube, queue, strategy, userInfo, config);
+    collector.run();
+    assertEquals(1, queue.size());
+    StreamsDatum datum = queue.take();
+    assertNotNull(datum);
+    String document = (String) datum.getDocument();
+    assertNotNull(document);
+  }
 
-    private YouTube createMockYoutube() throws Exception {
-        YouTube mockYouTube = mock(YouTube.class);
-        YouTube.Channels channels = createMockChannels();
-        when(mockYouTube.channels()).thenReturn(channels);
-        return mockYouTube;
-    }
+  private YouTube createMockYoutube() throws Exception {
+    YouTube mockYouTube = mock(YouTube.class);
+    YouTube.Channels channels = createMockChannels();
+    when(mockYouTube.channels()).thenReturn(channels);
+    return mockYouTube;
+  }
 
-    private YouTube.Channels createMockChannels() throws Exception {
-        YouTube.Channels mockChannels = mock(YouTube.Channels.class);
-        YouTube.Channels.List channelLists = createMockChannelsList();
-        when(mockChannels.list(anyString())).thenReturn(channelLists);
-        return mockChannels;
-    }
+  private YouTube.Channels createMockChannels() throws Exception {
+    YouTube.Channels mockChannels = mock(YouTube.Channels.class);
+    YouTube.Channels.List channelLists = createMockChannelsList();
+    when(mockChannels.list(anyString())).thenReturn(channelLists);
+    return mockChannels;
+  }
 
-    private YouTube.Channels.List createMockChannelsList() throws Exception {
-        YouTube.Channels.List mockList = mock(YouTube.Channels.List.class);
-        when(mockList.setId(anyString())).thenReturn(mockList);
-        when(mockList.setKey(anyString())).thenReturn(mockList);
-        ChannelListResponse response = createMockResponse();
-        when(mockList.execute()).thenReturn(response);
-        return mockList;
-    }
+  private YouTube.Channels.List createMockChannelsList() throws Exception {
+    YouTube.Channels.List mockList = mock(YouTube.Channels.List.class);
+    when(mockList.setId(anyString())).thenReturn(mockList);
+    when(mockList.setKey(anyString())).thenReturn(mockList);
+    ChannelListResponse response = createMockResponse();
+    when(mockList.execute()).thenReturn(response);
+    return mockList;
+  }
 
-    private ChannelListResponse createMockResponse() {
-        ChannelListResponse response = new ChannelListResponse();
-        List<Channel> channelList = Lists.newLinkedList();
-        response.setItems(channelList);
-        Channel channel = new Channel();
-        channel.setId(ID);
-        channelList.add(channel);
-        return response;
-    }
+  private ChannelListResponse createMockResponse() {
+    ChannelListResponse response = new ChannelListResponse();
+    List<Channel> channelList = Lists.newLinkedList();
+    response.setItems(channelList);
+    Channel channel = new Channel();
+    channel.setId(ID);
+    channelList.add(channel);
+    return response;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
index 02f1d52..5a2af8a 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
@@ -19,13 +19,14 @@
 
 package com.youtube.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import com.google.api.client.util.Maps;
 import com.google.api.client.util.Sets;
 import com.google.api.services.youtube.YouTube;
 import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.joda.time.DateTime;
 import org.junit.Test;
@@ -41,125 +42,128 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Test for YoutubeProvider.
+ */
 public class YoutubeProviderTest {
 
-    /**
-     * Test that every collector will be run and that data queued from the 
collectors will be processed.
-     */
-    @Test
-    public void testDataCollectorRunsPerUser() {
-        Random r = new Random(System.currentTimeMillis());
-        int numUsers = r.nextInt(1000);
-        List<UserInfo> userList = Lists.newLinkedList();
-
-        for(int i=0; i < numUsers; ++i) {
-            userList.add(new UserInfo());
-        }
-
-        YoutubeConfiguration config = new YoutubeConfiguration();
-        config.setYoutubeUsers(userList);
-        config.setApiKey("API_KEY");
-        YoutubeProvider provider = buildProvider(config);
-
-        try {
-            provider.prepare(null);
-            provider.startStream();
-            int datumCount = 0;
-            while(provider.isRunning()) {
-                datumCount += provider.readCurrent().size();
-            }
-            assertEquals(numUsers, datumCount);
-        } finally {
-            provider.cleanUp();
-        }
+  /**
+   * Test that every collector will be run and that data queued from the 
collectors will be processed.
+   */
+  @Test
+  public void testDataCollectorRunsPerUser() {
+    Random random = new Random(System.currentTimeMillis());
+    int numUsers = random.nextInt(1000);
+    List<UserInfo> userList = Lists.newLinkedList();
+
+    for ( int i = 0; i < numUsers; ++i ) {
+      userList.add(new UserInfo());
     }
 
-    @Test
-    public void testConfigSetterGetter() {
-        YoutubeConfiguration config = new YoutubeConfiguration();
-        config.setApiKey("API_KEY");
-        config.setVersion("fake_version_1");
-        YoutubeConfiguration newConfig = new YoutubeConfiguration();
-        newConfig.setApiKey("API_KEY");
-        config.setVersion("fake_version_2");
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setYoutubeUsers(userList);
+    config.setApiKey("API_KEY");
+    YoutubeProvider provider = buildProvider(config);
+
+    try {
+      provider.prepare(null);
+      provider.startStream();
+      int datumCount = 0;
+      while (provider.isRunning()) {
+        datumCount += provider.readCurrent().size();
+      }
+      assertEquals(numUsers, datumCount);
+    } finally {
+      provider.cleanUp();
+    }
+  }
 
-        YoutubeProvider provider = buildProvider(config);
+  @Test
+  public void testConfigSetterGetter() {
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey("API_KEY");
+    config.setVersion("fake_version_1");
+    YoutubeConfiguration newConfig = new YoutubeConfiguration();
+    newConfig.setApiKey("API_KEY");
+    config.setVersion("fake_version_2");
 
-        assertEquals(provider.getConfig(), config);
+    YoutubeProvider provider = buildProvider(config);
 
-        provider.setConfig(newConfig);
-        assertEquals(provider.getConfig(), newConfig);
-    }
+    assertEquals(provider.getConfig(), config);
 
-    @Test
-    public void testUserInfoWithDefaultDates() {
-        YoutubeConfiguration config = new YoutubeConfiguration();
-        config.setApiKey("API_KEY");
-        YoutubeProvider provider = buildProvider(config);
+    provider.setConfig(newConfig);
+    assertEquals(provider.getConfig(), newConfig);
+  }
 
-        DateTime afterDate = new DateTime(System.currentTimeMillis());
-        DateTime beforeDate = afterDate.minus(10000);
+  @Test
+  public void testUserInfoWithDefaultDates() {
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey("API_KEY");
+    YoutubeProvider provider = buildProvider(config);
 
-        provider.setDefaultAfterDate(afterDate);
-        provider.setDefaultBeforeDate(beforeDate);
+    DateTime afterDate = new DateTime(System.currentTimeMillis());
+    DateTime beforeDate = afterDate.minus(10000);
 
-        Set<String> users = Sets.newHashSet();
-        users.add("test_user_1");
-        users.add("test_user_2");
-        users.add("test_user_3");
+    provider.setDefaultAfterDate(afterDate);
+    provider.setDefaultBeforeDate(beforeDate);
 
-        provider.setUserInfoWithDefaultDates(users);
+    Set<String> users = Sets.newHashSet();
+    users.add("test_user_1");
+    users.add("test_user_2");
+    users.add("test_user_3");
 
-        List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
+    provider.setUserInfoWithDefaultDates(users);
 
-        for(UserInfo user : youtubeUsers) {
-            assert(user.getAfterDate().equals(afterDate));
-            assert(user.getBeforeDate().equals(beforeDate));
-        }
+    List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
+
+    for (UserInfo user : youtubeUsers) {
+      assert (user.getAfterDate().equals(afterDate));
+      assert (user.getBeforeDate().equals(beforeDate));
     }
+  }
 
-    @Test
-    public void testUserInfoWithAfterDate() {
-        YoutubeConfiguration config = new YoutubeConfiguration();
-        config.setApiKey("API_KEY");
-        YoutubeProvider provider = buildProvider(config);
+  @Test
+  public void testUserInfoWithAfterDate() {
+    YoutubeConfiguration config = new YoutubeConfiguration();
+    config.setApiKey("API_KEY");
+    YoutubeProvider provider = buildProvider(config);
 
-        Map<String, DateTime> users = Maps.newHashMap();
-        users.put("user1", new DateTime(System.currentTimeMillis()));
-        users.put("user3", new DateTime(System.currentTimeMillis()));
-        users.put("user4", new DateTime(System.currentTimeMillis()));
+    Map<String, DateTime> users = Maps.newHashMap();
+    users.put("user1", new DateTime(System.currentTimeMillis()));
+    users.put("user3", new DateTime(System.currentTimeMillis()));
+    users.put("user4", new DateTime(System.currentTimeMillis()));
 
-        provider.setUserInfoWithAfterDate(users);
+    provider.setUserInfoWithAfterDate(users);
 
-        List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
+    List<UserInfo> youtubeUsers = provider.getConfig().getYoutubeUsers();
 
-        for(UserInfo user : youtubeUsers) {
-            assert(user.getAfterDate().equals(users.get(user.getUserId())));
-        }
+    for (UserInfo user : youtubeUsers) {
+      assert (user.getAfterDate().equals(users.get(user.getUserId())));
     }
-
-    private YoutubeProvider buildProvider(YoutubeConfiguration config) {
-        return new YoutubeProvider(config) {
-
-            @Override
-            protected YouTube createYouTubeClient() throws IOException {
-                return mock(YouTube.class);
-            }
-
-            @Override
-            protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
-                final BlockingQueue<StreamsDatum> q = queue;
-                return new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            q.put(new StreamsDatum(null));
-                        } catch (InterruptedException ie) {
-                            fail("Test was interrupted");
-                        }
-                    }
-                };
+  }
+
+  private YoutubeProvider buildProvider(YoutubeConfiguration config) {
+    return new YoutubeProvider(config) {
+
+      @Override
+      protected YouTube createYouTubeClient() throws IOException {
+        return mock(YouTube.class);
+      }
+
+      @Override
+      protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
+        final BlockingQueue<StreamsDatum> q = queue;
+        return new Runnable() {
+          @Override
+          public void run() {
+            try {
+              q.put(new StreamsDatum(null));
+            } catch (InterruptedException ie) {
+              fail("Test was interrupted");
             }
+          }
         };
-    }
+      }
+    };
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
index 7d46274..1870c14 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
@@ -19,13 +19,20 @@
 
 package com.youtube.provider;
 
-import com.google.api.client.util.Lists;
-import com.google.api.services.youtube.YouTube;
-import com.google.api.services.youtube.model.*;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.local.queues.ThroughputQueue;
 import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+
+import com.google.api.client.util.Lists;
+import com.google.api.services.youtube.YouTube;
+import com.google.api.services.youtube.model.Activity;
+import com.google.api.services.youtube.model.ActivityContentDetails;
+import com.google.api.services.youtube.model.ActivityContentDetailsUpload;
+import com.google.api.services.youtube.model.ActivityListResponse;
+import com.google.api.services.youtube.model.Video;
+import com.google.api.services.youtube.model.VideoListResponse;
+import com.google.api.services.youtube.model.VideoSnippet;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.joda.time.DateTime;
 import org.junit.Before;
@@ -37,308 +44,316 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+/**
+ * Test for YoutubeUserActivityCollector.
+ */
 public class YoutubeUserActivityCollectorTest {
-    private final String USER_ID = "fake_user_id";
-    private static final String IN_RANGE_IDENTIFIER = "data in range";
-    private YoutubeConfiguration config;
-
-    @Before
-    public void setup() {
-        this.config = new YoutubeConfiguration();
-        this.config.setApiKey("API_KEY");
-    }
+  private static final String USER_ID = "fake_user_id";
+  private static final String IN_RANGE_IDENTIFIER = "data in range";
+  private YoutubeConfiguration config;
 
-    @Test
-    public void testGetVideos() throws IOException {
-        DateTime now = new DateTime(System.currentTimeMillis());
-        YouTube youtube = buildYouTube(0, 1, 0, now, now.minus(10000));
+  @Before
+  public void setup() {
+    this.config = new YoutubeConfiguration();
+    this.config.setApiKey("API_KEY");
+  }
 
-        BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-        YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+  @Test
+  public void testGetVideos() throws IOException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 1, 0, now, now.minus(10000));
 
-        List<Video> video = collector.getVideoList("fake_video_id");
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
 
-        assertNotNull(video.get(0));
-    }
+    List<Video> video = collector.getVideoList("fake_video_id");
 
-    @Test
-    public void testGetVideosNull() throws IOException {
-        DateTime now = new DateTime(System.currentTimeMillis());
-        YouTube youtube = buildYouTube(0, 0, 0, now.plus(10000), 
now.minus(10000));
+    assertNotNull(video.get(0));
+  }
 
-        BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-        YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+  @Test
+  public void testGetVideosNull() throws IOException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 0, 0, now.plus(10000), now.minus(10000));
 
-        List<Video> video = collector.getVideoList("fake_video_id");
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
 
-        assertEquals(video.size(), 0);
-    }
+    List<Video> video = collector.getVideoList("fake_video_id");
 
-    @Test
-    public void testProcessActivityFeed() throws IOException, 
InterruptedException {
-        DateTime now = new DateTime(System.currentTimeMillis());
-        YouTube youtube = buildYouTube(0, 0, 5, now.plus(3000000), 
now.minus(1000000));
+    assertEquals(video.size(), 0);
+  }
 
-        BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-        YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+  @Test
+  public void testProcessActivityFeed() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 0, 5, now.plus(3000000), 
now.minus(1000000));
 
-        ActivityListResponse feed = buildActivityListResponse(1);
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
 
-        collector.processActivityFeed(feed, new 
DateTime(System.currentTimeMillis()), null);
+    ActivityListResponse feed = buildActivityListResponse(1);
 
-        assertEquals(collector.getDatumQueue().size(), 5);
-    }
+    collector.processActivityFeed(feed, new 
DateTime(System.currentTimeMillis()), null);
 
-    @Test
-    public void testProcessActivityFeedBefore() throws IOException, 
InterruptedException {
-        DateTime now = new DateTime(System.currentTimeMillis());
-        YouTube youtube = buildYouTube(5, 0, 0, now, now);
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
 
-        BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-        YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+  @Test
+  public void testProcessActivityFeedBefore() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(5, 0, 0, now, now);
 
-        ActivityListResponse feed = buildActivityListResponse(1);
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
 
-        collector.processActivityFeed(feed, new 
DateTime(System.currentTimeMillis()), null);
+    ActivityListResponse feed = buildActivityListResponse(1);
 
-        assertEquals(collector.getDatumQueue().size(), 0);
-    }
+    collector.processActivityFeed(feed, new 
DateTime(System.currentTimeMillis()), null);
 
-    @Test
-    public void testProcessActivityFeedAfter() throws IOException, 
InterruptedException {
-        DateTime now = new DateTime(System.currentTimeMillis());
-        YouTube youtube = buildYouTube(0, 5, 0, now, now);
+    assertEquals(collector.getDatumQueue().size(), 0);
+  }
 
-        BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-        YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+  @Test
+  public void testProcessActivityFeedAfter() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(0, 5, 0, now, now);
 
-        ActivityListResponse feed = buildActivityListResponse(1);
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
 
-        collector.processActivityFeed(feed, new 
DateTime(now.getMillis()-100000), null);
+    ActivityListResponse feed = buildActivityListResponse(1);
 
-        assertEquals(collector.getDatumQueue().size(), 5);
-    }
+    collector.processActivityFeed(feed, new DateTime(now.getMillis() - 
100000), null);
 
-    @Test
-    public void testProcessActivityFeedMismatchCount() throws IOException, 
InterruptedException {
-        DateTime now = new DateTime(System.currentTimeMillis());
-        YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
 
-        BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-        YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+  @Test
+  public void testProcessActivityFeedMismatchCount() throws IOException, 
InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
 
-        ActivityListResponse feed = buildActivityListResponse(1);
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
 
-        collector.processActivityFeed(feed, new DateTime(now), null);
+    ActivityListResponse feed = buildActivityListResponse(1);
 
-        assertEquals(collector.getDatumQueue().size(), 5);
-    }
+    collector.processActivityFeed(feed, new DateTime(now), null);
 
-    @Test
-    public void testProcessActivityFeedMismatchCountInRange() throws 
IOException, InterruptedException {
-        DateTime now = new DateTime(System.currentTimeMillis());
-        YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
 
-        BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
-        YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
+  @Test
+  public void testProcessActivityFeedMismatchCountInRange() throws 
IOException, InterruptedException {
+    DateTime now = new DateTime(System.currentTimeMillis());
+    YouTube youtube = buildYouTube(5, 5, 5, now, now.minus(100000));
 
-        ActivityListResponse feed = buildActivityListResponse(1);
+    BlockingQueue<StreamsDatum> datumQueue = new ThroughputQueue<>();
+    YoutubeUserActivityCollector collector = new 
YoutubeUserActivityCollector(youtube, datumQueue, new 
ExponentialBackOffStrategy(2), new UserInfo().withUserId(USER_ID), this.config);
 
-        collector.processActivityFeed(feed, new DateTime(now), new 
DateTime(now).minus(100000));
+    ActivityListResponse feed = buildActivityListResponse(1);
 
-        assertEquals(collector.getDatumQueue().size(), 5);
-    }
+    collector.processActivityFeed(feed, new DateTime(now), new 
DateTime(now).minus(100000));
 
-    private ActivityListResponse buildActivityListResponse(int num) {
-        ActivityListResponse activityListResponse = new ActivityListResponse();
-        List<Activity> items = Lists.newArrayList();
+    assertEquals(collector.getDatumQueue().size(), 5);
+  }
 
-        for(int x = 0; x < num; x ++ ) {
-            Activity activity = new Activity();
+  private ActivityListResponse buildActivityListResponse(int num) {
+    ActivityListResponse activityListResponse = new ActivityListResponse();
+    List<Activity> items = Lists.newArrayList();
 
-            ActivityContentDetails contentDetails = new 
ActivityContentDetails();
-            ActivityContentDetailsUpload upload = new 
ActivityContentDetailsUpload();
-            upload.setVideoId("video_id_" + x);
-            contentDetails.setUpload(upload);
+    for ( int x = 0; x < num; x++ ) {
+      Activity activity = new Activity();
 
-            activity.setId("id_" + x);
-            activity.setContentDetails(contentDetails);
+      ActivityContentDetails contentDetails = new ActivityContentDetails();
+      ActivityContentDetailsUpload upload = new ActivityContentDetailsUpload();
+      upload.setVideoId("video_id_" + x);
+      contentDetails.setUpload(upload);
 
-            items.add(activity);
-        }
+      activity.setId("id_" + x);
+      activity.setContentDetails(contentDetails);
 
-        activityListResponse.setItems(items);
-
-        return activityListResponse;
+      items.add(activity);
     }
 
-    private YouTube buildYouTube(int numBeforeRange, int numAfterRange, int 
numInRange, DateTime afterDate, DateTime beforeDate) {
+    activityListResponse.setItems(items);
 
-        YouTube youtube = createYoutubeMock(numBeforeRange, numAfterRange, 
numInRange, afterDate, beforeDate);
+    return activityListResponse;
+  }
 
-        return youtube;
-    }
+  private YouTube buildYouTube(int numBeforeRange, int numAfterRange, int 
numInRange, DateTime afterDate, DateTime beforeDate) {
 
-    private YouTube createYoutubeMock(int numBefore, int numAfter, int 
numInRange,  DateTime after, DateTime before) {
-        YouTube youtube = mock(YouTube.class);
+    YouTube youtube = createYoutubeMock(numBeforeRange, numAfterRange, 
numInRange, afterDate, beforeDate);
 
-        final YouTube.Videos videos = createMockVideos(numBefore, numAfter, 
numInRange, after, before);
-        doAnswer(new Answer() {
-            @Override
-            public YouTube.Videos answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                return videos;
-            }
-        }).when(youtube).videos();
+    return youtube;
+  }
 
-        return youtube;
-    }
+  private YouTube createYoutubeMock(int numBefore, int numAfter, int 
numInRange,  DateTime after, DateTime before) {
+    YouTube youtube = mock(YouTube.class);
+
+    final YouTube.Videos videos = createMockVideos(numBefore, numAfter, 
numInRange, after, before);
+    doAnswer(new Answer() {
+      @Override
+      public YouTube.Videos answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+        return videos;
+      }
+    }).when(youtube).videos();
 
-    private YouTube.Videos createMockVideos(int numBefore, int numAfter, int 
numInRange,  DateTime after, DateTime before) {
-        YouTube.Videos videos = mock(YouTube.Videos.class);
+    return youtube;
+  }
 
-        try {
-            YouTube.Videos.List list = createMockVideosList(numBefore, 
numAfter, numInRange, after, before);
-            when(videos.list(anyString())).thenReturn(list);
-        } catch (IOException e) {
-            fail("Exception thrown while creating mock");
-        }
+  private YouTube.Videos createMockVideos(int numBefore, int numAfter, int 
numInRange,  DateTime after, DateTime before) {
+    YouTube.Videos videos = mock(YouTube.Videos.class);
 
-        return videos;
+    try {
+      YouTube.Videos.List list = createMockVideosList(numBefore, numAfter, 
numInRange, after, before);
+      when(videos.list(anyString())).thenReturn(list);
+    } catch (IOException ex) {
+      fail("Exception thrown while creating mock");
     }
 
-    private YouTube.Videos.List createMockVideosList(int numBefore, int 
numAfter, int numInRange,  DateTime after, DateTime before) {
-        YouTube.Videos.List list = mock(YouTube.Videos.List.class);
+    return videos;
+  }
 
-        when(list.setMaxResults(anyLong())).thenReturn(list);
-        when(list.setPageToken(anyString())).thenReturn(list);
-        when(list.setId(anyString())).thenReturn(list);
-        when(list.setKey(anyString())).thenReturn(list);
+  private YouTube.Videos.List createMockVideosList(int numBefore, int 
numAfter, int numInRange,  DateTime after, DateTime before) {
+    YouTube.Videos.List list = mock(YouTube.Videos.List.class);
 
-        VideoListResponseAnswer answer = new 
VideoListResponseAnswer(numBefore, numAfter, numInRange, after, before);
-        try {
-            doAnswer(answer).when(list).execute();
-        } catch (IOException ioe) {
-            fail("Should not have thrown exception while creating mock. : 
"+ioe.getMessage());
-        }
+    when(list.setMaxResults(anyLong())).thenReturn(list);
+    when(list.setPageToken(anyString())).thenReturn(list);
+    when(list.setId(anyString())).thenReturn(list);
+    when(list.setKey(anyString())).thenReturn(list);
 
-        return list;
+    VideoListResponseAnswer answer = new VideoListResponseAnswer(numBefore, 
numAfter, numInRange, after, before);
+    try {
+      doAnswer(answer).when(list).execute();
+    } catch (IOException ioe) {
+      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
     }
 
-    private static VideoListResponse createMockVideoListResponse(int 
numBefore, int numAfter, int numInRange,  DateTime after, DateTime before, 
boolean page) {
-        VideoListResponse feed = new VideoListResponse();
-        List<Video> list = com.google.common.collect.Lists.newLinkedList();
+    return list;
+  }
 
-        for(int i=0; i < numAfter; ++i) {
-            com.google.api.client.util.DateTime published = new 
com.google.api.client.util.DateTime(after.getMillis() + 1000000);
-            Video video = new Video();
-            video.setSnippet(new VideoSnippet());
-            video.getSnippet().setPublishedAt(published);
-            list.add(video);
-        }
-        for(int i=0; i < numInRange; ++i) {
-            DateTime published = null;
-            if((before == null && after == null) || before == null) {
-                published = DateTime.now(); // no date range or end time date 
range so just make the time now.
-            } else if(after == null) {
-                published = before.minusMillis(100000); //no beginning to range
-            } else { // has to be in range
-                long range = before.getMillis() - after.getMillis();
-                published = after.plus(range / 2); //in the middle
-            }
-            com.google.api.client.util.DateTime gPublished = new 
com.google.api.client.util.DateTime(published.getMillis());
-            Video video = new Video();
-            video.setSnippet(new VideoSnippet());
-            video.getSnippet().setPublishedAt(gPublished);
-            video.getSnippet().setTitle(IN_RANGE_IDENTIFIER);
-            list.add(video);
-        }
-        for(int i=0; i < numBefore; ++i) {
-            com.google.api.client.util.DateTime published = new 
com.google.api.client.util.DateTime((after.minusMillis(100000)).getMillis());
-            Video video = new Video();
-            video.setSnippet(new VideoSnippet());
-            video.getSnippet().setPublishedAt(published);
-            list.add(video);
-        }
-        if(page) {
-            feed.setNextPageToken("A");
-        } else {
-            feed.setNextPageToken(null);
-        }
+  private static VideoListResponse createMockVideoListResponse(int numBefore, 
int numAfter, int numInRange,  DateTime after, DateTime before, boolean page) {
+    VideoListResponse feed = new VideoListResponse();
+    List<Video> list = com.google.common.collect.Lists.newLinkedList();
 
-        feed.setItems(list);
+    for (int i = 0; i < numAfter; ++i) {
+      com.google.api.client.util.DateTime published = new 
com.google.api.client.util.DateTime(after.getMillis() + 1000000);
+      Video video = new Video();
+      video.setSnippet(new VideoSnippet());
+      video.getSnippet().setPublishedAt(published);
+      list.add(video);
+    }
+    for (int i = 0; i < numInRange; ++i) {
+      DateTime published = null;
+      if ((before == null && after == null) || before == null) {
+        published = DateTime.now(); // no date range or end time date range so 
just make the time now.
+      } else if (after == null) {
+        published = before.minusMillis(100000); //no beginning to range
+      } else { // has to be in range
+        long range = before.getMillis() - after.getMillis();
+        published = after.plus(range / 2); //in the middle
+      }
+      com.google.api.client.util.DateTime ytPublished = new 
com.google.api.client.util.DateTime(published.getMillis());
+      Video video = new Video();
+      video.setSnippet(new VideoSnippet());
+      video.getSnippet().setPublishedAt(ytPublished);
+      video.getSnippet().setTitle(IN_RANGE_IDENTIFIER);
+      list.add(video);
+    }
+    for (int i = 0; i < numBefore; ++i) {
+      com.google.api.client.util.DateTime published = new 
com.google.api.client.util.DateTime((after.minusMillis(100000)).getMillis());
+      Video video = new Video();
+      video.setSnippet(new VideoSnippet());
+      video.getSnippet().setPublishedAt(published);
+      list.add(video);
+    }
+    if (page) {
+      feed.setNextPageToken("A");
+    } else {
+      feed.setNextPageToken(null);
+    }
 
-        return feed;
+    feed.setItems(list);
+
+    return feed;
+  }
+
+  private static class VideoListResponseAnswer implements 
Answer<VideoListResponse> {
+    private int afterCount = 0;
+    private int beforeCount = 0;
+    private int inCount = 0;
+    private int maxBatch = 100;
+
+    private int numAfter;
+    private int numInRange;
+    private int numBefore;
+    private DateTime after;
+    private DateTime before;
+
+    private VideoListResponseAnswer(int numBefore, int numAfter, int 
numInRange, DateTime after, DateTime before) {
+      this.numBefore = numBefore;
+      this.numAfter = numAfter;
+      this.numInRange = numInRange;
+      this.after = after;
+      this.before = before;
     }
 
-    private static class VideoListResponseAnswer implements 
Answer<VideoListResponse> {
-        private int afterCount = 0;
-        private int beforeCount = 0;
-        private int inCount = 0;
-        private int maxBatch = 100;
-
-        private int numAfter;
-        private int numInRange;
-        private int numBefore;
-        private DateTime after;
-        private DateTime before;
-
-        private VideoListResponseAnswer(int numBefore, int numAfter, int 
numInRange, DateTime after, DateTime before) {
-            this.numBefore = numBefore;
-            this.numAfter = numAfter;
-            this.numInRange = numInRange;
-            this.after = after;
-            this.before = before;
+    @Override
+    public VideoListResponse answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+      int totalCount = 0;
+      int batchAfter = 0;
+      int batchBefore = 0;
+      int batchIn = 0;
+      inCount = 0;
+      afterCount = 0;
+      beforeCount = 0;
+
+      if (afterCount != numAfter) {
+        if (numAfter - afterCount >= maxBatch) {
+          afterCount += maxBatch;
+          batchAfter += maxBatch;
+          totalCount += batchAfter;
+        } else {
+          batchAfter += numAfter - afterCount;
+          totalCount += numAfter - afterCount;
+          afterCount = numAfter;
         }
-
-        @Override
-        public VideoListResponse answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-            int totalCount = 0;
-            int batchAfter = 0;
-            int batchBefore = 0;
-            int batchIn = 0;
-            inCount = 0;
-            afterCount = 0;
-            beforeCount = 0;
-
-            if(afterCount != numAfter) {
-                if(numAfter - afterCount >= maxBatch) {
-                    afterCount += maxBatch;
-                    batchAfter += maxBatch;
-                    totalCount += batchAfter;
-                } else {
-                    batchAfter += numAfter - afterCount;
-                    totalCount += numAfter - afterCount;
-                    afterCount = numAfter;
-                }
-            }
-            if(totalCount < maxBatch && inCount != numInRange) {
-                if(numInRange - inCount >= maxBatch - totalCount) {
-                    inCount += maxBatch - totalCount;
-                    batchIn += maxBatch - totalCount;
-                    totalCount += batchIn;
-                } else {
-                    batchIn += numInRange - inCount;
-                    totalCount += numInRange - inCount;
-                    inCount = numInRange;
-                }
-            }
-            if(totalCount < maxBatch && beforeCount != numBefore) {
-                if(numBefore - batchBefore >= maxBatch - totalCount) {
-                    batchBefore += maxBatch - totalCount;
-                    totalCount = maxBatch;
-                    beforeCount +=batchBefore;
-                } else {
-                    batchBefore += numBefore - beforeCount;
-                    totalCount += numBefore - beforeCount;
-                    beforeCount = numBefore;
-                }
-            }
-
-            return createMockVideoListResponse(batchBefore, batchAfter, 
batchIn, after, before, numAfter != afterCount || inCount != numInRange || 
beforeCount != numBefore);
+      }
+      if (totalCount < maxBatch && inCount != numInRange) {
+        if (numInRange - inCount >= maxBatch - totalCount) {
+          inCount += maxBatch - totalCount;
+          batchIn += maxBatch - totalCount;
+          totalCount += batchIn;
+        } else {
+          batchIn += numInRange - inCount;
+          totalCount += numInRange - inCount;
+          inCount = numInRange;
+        }
+      }
+      if (totalCount < maxBatch && beforeCount != numBefore) {
+        if (numBefore - batchBefore >= maxBatch - totalCount) {
+          batchBefore += maxBatch - totalCount;
+          totalCount = maxBatch;
+          beforeCount += batchBefore;
+        } else {
+          batchBefore += numBefore - beforeCount;
+          totalCount += numBefore - beforeCount;
+          beforeCount = numBefore;
         }
+      }
+
+      return createMockVideoListResponse(batchBefore, batchAfter, batchIn, 
after, before, numAfter != afterCount || inCount != numInRange || beforeCount 
!= numBefore);
     }
+  }
 }
\ No newline at end of file

Reply via email to