http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java index a1ca7c5..418491a 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterDocumentClassifierTest.java @@ -24,6 +24,7 @@ import org.apache.streams.twitter.pojo.Follow; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; + import org.junit.Assert; import org.junit.Test; @@ -34,55 +35,60 @@ import java.util.List; */ public class TwitterDocumentClassifierTest { - private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_b ackground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n"; - private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\ /profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sourc e\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false, \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\ ":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n"; - private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n"; - private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n"; - private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_repl y_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}"; + private String tweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile_bac kground_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n"; + private String retweet = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/p rofile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"source\ ":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":false,\" profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"name\": \"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n"; + private String delete = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n"; + private String follow = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n"; + private String user = "{\"location\":\"\",\"default_profile\":true,\"profile_background_tile\":false,\"statuses_count\":1,\"lang\":\"en\",\"profile_link_color\":\"0084B4\",\"id\":67890,\"following\":false,\"protected\":false,\"favourites_count\":0,\"profile_text_color\":\"333333\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"C0DEED\",\"name\":\"name\",\"profile_background_color\":\"C0DEED\",\"created_at\":\"Fri Apr 17 12:35:56 +0000 2009\",\"is_translation_enabled\":false,\"default_profile_image\":true,\"followers_count\":2,\"profile_image_url_https\":\"https://profile_image_url_https.png\",\"geo_enabled\":false,\"status\":{\"contributors\":null,\"text\":\"Working\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[],\"hashtags\":[],\"user_mentions\":[]},\"in_reply_to_status_id_str\":null,\"id\":67890,\"source\":\"web\",\"in_reply_ to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":0,\"created_at\":\"Fri Apr 17 12:37:54 +0000 2009\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"67890\",\"place\":null,\"coordinates\":null},\"profile_background_image_url\":\"http://abs.twimg.com/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/profile_background_image_url_https.png\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":null,\"time_zone\":null,\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1,\"profile_sidebar_fill_color\":\"DDEEF6\",\"screen_name\":\"screen_name\",\"id_str\":\"67890\",\"profile_image_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_1_normal.png\",\"listed_count\":0,\"is_translator\":false}"; - @Test - public void testDetectTweet() { - List<Class> detected = new TwitterDocumentClassifier().detectClasses(tweet); - Assert.assertTrue(detected.size() == 1); - Class result = detected.get(0); - if( !result.equals(Tweet.class) ) - Assert.fail(); + @Test + public void testDetectTweet() { + List<Class> detected = new TwitterDocumentClassifier().detectClasses(tweet); + Assert.assertTrue(detected.size() == 1); + Class result = detected.get(0); + if ( !result.equals(Tweet.class) ) { + Assert.fail(); } + } - @Test - public void testDetectRetweet() { - List<Class> detected = new TwitterDocumentClassifier().detectClasses(retweet); - Assert.assertTrue(detected.size() == 1); - Class result = detected.get(0); - if( !result.equals(Retweet.class) ) - Assert.fail(); + @Test + public void testDetectRetweet() { + List<Class> detected = new TwitterDocumentClassifier().detectClasses(retweet); + Assert.assertTrue(detected.size() == 1); + Class result = detected.get(0); + if ( !result.equals(Retweet.class) ) { + Assert.fail(); } + } - @Test - public void testDetectDelete() { - List<Class> detected = new TwitterDocumentClassifier().detectClasses(delete); - Assert.assertTrue(detected.size() == 1); - Class result = detected.get(0); - if( !result.equals(Delete.class) ) - Assert.fail(); + @Test + public void testDetectDelete() { + List<Class> detected = new TwitterDocumentClassifier().detectClasses(delete); + Assert.assertTrue(detected.size() == 1); + Class result = detected.get(0); + if ( !result.equals(Delete.class) ) { + Assert.fail(); } + } - @Test - public void testDetectFollow() { - List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow); - Assert.assertTrue(detected.size() == 1); - Class result = detected.get(0); - if( !result.equals(Follow.class) ) - Assert.fail(); + @Test + public void testDetectFollow() { + List<Class> detected = new TwitterDocumentClassifier().detectClasses(follow); + Assert.assertTrue(detected.size() == 1); + Class result = detected.get(0); + if ( !result.equals(Follow.class) ) { + Assert.fail(); } + } - @Test - public void testDetectUser() { - List<Class> detected = new TwitterDocumentClassifier().detectClasses(user); - Assert.assertTrue(detected.size() == 1); - Class result = detected.get(0); - if (!result.equals(User.class)) - Assert.fail(); + @Test + public void testDetectUser() { + List<Class> detected = new TwitterDocumentClassifier().detectClasses(user); + Assert.assertTrue(detected.size() == 1); + Class result = detected.get(0); + if (!result.equals(User.class)) { + Assert.fail(); } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java index 5e24882..6e269e5 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java @@ -18,6 +18,11 @@ package com.youtube.processor; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; + import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.api.services.youtube.model.Channel; import com.google.api.services.youtube.model.Video; @@ -27,10 +32,6 @@ import com.youtube.serializer.YoutubeChannelDeserializer; import com.youtube.serializer.YoutubeEventClassifier; import com.youtube.serializer.YoutubeVideoDeserializer; import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,89 +40,90 @@ import java.util.Queue; public class YoutubeTypeConverter implements StreamsProcessor { - public final static String STREAMS_ID = "YoutubeTypeConverter"; - - private final static Logger LOGGER = LoggerFactory.getLogger(YoutubeTypeConverter.class); - - private StreamsJacksonMapper mapper; - private Queue<Video> inQueue; - private Queue<StreamsDatum> outQueue; - private YoutubeActivityUtil youtubeActivityUtil; - private int count = 0; - - public YoutubeTypeConverter() {} - - @Override - public String getId() { - return STREAMS_ID; + public static final String STREAMS_ID = "YoutubeTypeConverter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeTypeConverter.class); + + private StreamsJacksonMapper mapper; + private Queue<Video> inQueue; + private Queue<StreamsDatum> outQueue; + private YoutubeActivityUtil youtubeActivityUtil; + private int count = 0; + + public YoutubeTypeConverter() {} + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List<StreamsDatum> process(StreamsDatum streamsDatum) { + StreamsDatum result = null; + + try { + Object item = streamsDatum.getDocument(); + + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + Activity activity = null; + + if (item instanceof String) { + item = deserializeItem(item); + } + + if (item instanceof Video) { + activity = new Activity(); + youtubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId()); + } else if (item instanceof Channel) { + activity = new Activity(); + this.youtubeActivityUtil.updateActivity((Channel)item, activity, null); + } else { + throw new NotImplementedException("Type conversion not implement for type : " + item.getClass().getName()); + } + + if (activity != null) { + result = new StreamsDatum(activity); + count++; + } + } catch (Exception ex) { + LOGGER.error("Exception while converting Video to Activity: {}", ex); } - @Override - public List<StreamsDatum> process(StreamsDatum streamsDatum) { - StreamsDatum result = null; - - try { - Object item = streamsDatum.getDocument(); - - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - Activity activity = null; - - if(item instanceof String) { - item = deserializeItem(item); - } - - if(item instanceof Video) { - activity = new Activity(); - youtubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId()); - } else if(item instanceof Channel) { - activity = new Activity(); - this.youtubeActivityUtil.updateActivity((Channel)item, activity, null); - } else { - throw new NotImplementedException("Type conversion not implement for type : "+item.getClass().getName()); - } - - if(activity != null) { - result = new StreamsDatum(activity); - count++; - } - } catch (Exception e) { - LOGGER.error("Exception while converting Video to Activity: {}", e); - } - - if( result != null ) - return Lists.newArrayList(result); - else - return Lists.newArrayList(); + if ( result != null ) { + return Lists.newArrayList(result); + } else { + return Lists.newArrayList(); } - - private Object deserializeItem(Object item) { - try { - Class klass = YoutubeEventClassifier.detectClass((String) item); - if (klass.equals(Video.class)) { - item = mapper.readValue((String) item, Video.class); - } else if(klass.equals(Channel.class)) { - item = mapper.readValue((String) item, Channel.class); - } - } catch (Exception e) { - LOGGER.error("Exception while trying to deserializeItem: {}", e); - } - - return item; + } + + private Object deserializeItem(Object item) { + try { + Class klass = YoutubeEventClassifier.detectClass((String) item); + if (klass.equals(Video.class)) { + item = mapper.readValue((String) item, Video.class); + } else if (klass.equals(Channel.class)) { + item = mapper.readValue((String) item, Channel.class); + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to deserializeItem: {}", ex); } - @Override - public void prepare(Object o) { - youtubeActivityUtil = new YoutubeActivityUtil(); - mapper = StreamsJacksonMapper.getInstance(); - - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer()); - mapper.registerModule(simpleModule); - simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Channel.class, new YoutubeChannelDeserializer()); - mapper.registerModule(simpleModule); - } + return item; + } + + @Override + public void prepare(Object configurationObject) { + youtubeActivityUtil = new YoutubeActivityUtil(); + mapper = StreamsJacksonMapper.getInstance(); + + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer()); + mapper.registerModule(simpleModule); + simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Channel.class, new YoutubeChannelDeserializer()); + mapper.registerModule(simpleModule); + } - @Override - public void cleanUp() {} + @Override + public void cleanUp() {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java index 8e980a7..fd238db 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java @@ -19,18 +19,17 @@ package com.youtube.provider; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpRequest; import com.google.api.services.youtube.YouTube; import com.google.api.services.youtube.model.Channel; -import com.google.api.services.youtube.model.ChannelListResponse; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.youtube.pojo.YoutubeConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,63 +38,77 @@ import java.util.List; import java.util.concurrent.BlockingQueue; /** - * + * Collects YoutubeChannelData on behalf of YoutubeChannelProvider. */ -public class YoutubeChannelDataCollector extends YoutubeDataCollector{ +public class YoutubeChannelDataCollector extends YoutubeDataCollector { - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeChannelDataCollector.class); - private static final String CONTENT = "snippet,contentDetails,statistics,topicDetails"; - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private static final int MAX_ATTEMPTS= 5; + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeChannelDataCollector.class); + private static final String CONTENT = "snippet,contentDetails,statistics,topicDetails"; + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final int MAX_ATTEMPTS = 5; - private YouTube youTube; - private BlockingQueue<StreamsDatum> queue; - private BackOffStrategy strategy; - private UserInfo userInfo; - private YoutubeConfiguration youtubeConfig; + private YouTube youTube; + private BlockingQueue<StreamsDatum> queue; + private BackOffStrategy strategy; + private UserInfo userInfo; + private YoutubeConfiguration youtubeConfig; - public YoutubeChannelDataCollector(YouTube youTube, BlockingQueue<StreamsDatum> queue, BackOffStrategy strategy, UserInfo userInfo, YoutubeConfiguration youtubeConfig) { - this.youTube = youTube; - this.queue = queue; - this.strategy = strategy; - this.userInfo = userInfo; - this.youtubeConfig = youtubeConfig; - } + /** + * YoutubeChannelDataCollector constructor. + * @param youTube YouTube + * @param queue BlockingQueue of StreamsDatum + * @param strategy BackOffStrategy + * @param userInfo UserInfo + * @param youtubeConfig YoutubeConfiguration + */ + public YoutubeChannelDataCollector( + YouTube youTube, + BlockingQueue<StreamsDatum> queue, + BackOffStrategy strategy, + UserInfo userInfo, + YoutubeConfiguration youtubeConfig) { + this.youTube = youTube; + this.queue = queue; + this.strategy = strategy; + this.userInfo = userInfo; + this.youtubeConfig = youtubeConfig; + } - @Override - public void run() { - Gson gson = new Gson(); + @Override + public void run() { + Gson gson = new Gson(); + try { + int attempt = 0; + YouTube.Channels.List channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey()); + boolean tryAgain = false; + do { try { - int attempt = 0; - YouTube.Channels.List channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey()); - boolean tryAgain = false; - do { - try { - List<Channel> channels = channelLists.execute().getItems(); - for (Channel channel : channels) { - String json = gson.toJson(channel); - this.queue.put(new StreamsDatum(json, channel.getId())); - } - if (StringUtils.isEmpty(channelLists.getPageToken())) { - channelLists = null; - } else { - channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setOauthToken(this.youtubeConfig.getApiKey()) - .setPageToken(channelLists.getPageToken()); - } - } catch (GoogleJsonResponseException gjre) { - LOGGER.warn("GoogleJsonResposneException caught : {}", gjre); - tryAgain = backoffAndIdentifyIfRetry(gjre, this.strategy); - ++attempt; - } catch (Throwable t) { - LOGGER.warn("Unable to get channel info for id : {}", this.userInfo.getUserId()); - LOGGER.warn("Excpection thrown while trying to get channel info : {}", t); - } - } while((tryAgain && attempt < MAX_ATTEMPTS) || channelLists != null); - - } catch (Throwable t) { - LOGGER.warn(t.getMessage()); + List<Channel> channels = channelLists.execute().getItems(); + for (Channel channel : channels) { + String json = gson.toJson(channel); + this.queue.put(new StreamsDatum(json, channel.getId())); + } + if (StringUtils.isEmpty(channelLists.getPageToken())) { + channelLists = null; + } else { + channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setOauthToken(this.youtubeConfig.getApiKey()) + .setPageToken(channelLists.getPageToken()); + } + } catch (GoogleJsonResponseException gjre) { + LOGGER.warn("GoogleJsonResposneException caught : {}", gjre); + tryAgain = backoffAndIdentifyIfRetry(gjre, this.strategy); + ++attempt; + } catch (Throwable throwable) { + LOGGER.warn("Unable to get channel info for id : {}", this.userInfo.getUserId()); + LOGGER.warn("Excpection thrown while trying to get channel info : {}", throwable); } + } + while ((tryAgain && attempt < MAX_ATTEMPTS) || channelLists != null); + + } catch (Throwable throwable) { + LOGGER.warn(throwable.getMessage()); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java index 817c98e..d9b1e14 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java @@ -19,22 +19,22 @@ package com.youtube.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.services.youtube.YouTube; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.youtube.pojo.YoutubeConfiguration; import java.io.BufferedOutputStream; @@ -47,76 +47,86 @@ import java.util.concurrent.TimeUnit; /** * Retrieve recent activity from a list of channels. - * - * To use from command line: - * - * Supply (at least) the following required configuration in application.conf: - * - * youtube.oauth.pathToP12KeyFile - * youtube.oauth.serviceAccountEmailAddress - * youtube.apiKey - * youtube.youtubeUsers - * - * Launch using: - * - * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" */ public class YoutubeChannelProvider extends YoutubeProvider { - public YoutubeChannelProvider() { - super(); - } + public YoutubeChannelProvider() { + super(); + } - public YoutubeChannelProvider(YoutubeConfiguration config) { - super(config); - } + public YoutubeChannelProvider(YoutubeConfiguration config) { + super(config); + } - @Override - protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { - return new YoutubeChannelDataCollector(youtube, queue, strategy, userInfo, this.config); - } + @Override + protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { + return new YoutubeChannelDataCollector(youtube, queue, strategy, userInfo, this.config); + } + + /** + * To use from command line: + * + * <p/> + * Supply (at least) the following required configuration in application.conf: + * + * <p/> + * youtube.oauth.pathToP12KeyFile + * youtube.oauth.serviceAccountEmailAddress + * youtube.apiKey + * youtube.youtubeUsers + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); + YoutubeChannelProvider provider = new YoutubeChannelProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); - YoutubeChannelProvider provider = new YoutubeChannelProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - if( datum.getDocument() instanceof String ) - json = (String)datum.getDocument(); - else - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + if ( datum.getDocument() instanceof String ) { + json = (String) datum.getDocument(); + } else { + json = mapper.writeValueAsString(datum.getDocument()); + } + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java index 3a17134..3eede18 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeDataCollector.java @@ -19,47 +19,52 @@ package com.youtube.provider; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; import org.apache.streams.util.api.requests.backoff.BackOffException; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Base Collector for Youtube Data. + */ public abstract class YoutubeDataCollector implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeDataCollector.class); - - /** - * Looks at the status code of the expception. If the code indicates that the request should be retried, - * it executes the back off strategy and returns true. - * @param gjre - * @param backOff - * @return returns true if the error code of the exception indicates the request should be retried. - */ - public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException { - boolean tryAgain = false; + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeDataCollector.class); - switch (gjre.getStatusCode()) { - case 400 : - LOGGER.warn("Bad Request : {}", gjre); - break; - case 401 : - LOGGER.warn("Invalid Credentials : {}", gjre); - case 403 : - LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage()); - backOff.backOff(); - tryAgain = true; - break; - case 503 : - LOGGER.warn("Google Backend Service Error : {}", gjre); - break; - default: - LOGGER.warn("Google Service returned error : {}", gjre); - tryAgain = true; - backOff.backOff(); - break; - } + /** + * Looks at the status code of the expception. If the code indicates that the request should be retried, + * it executes the back off strategy and returns true. + * @param gjre + * @param backOff + * @return returns true if the error code of the exception indicates the request should be retried. + */ + public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException { + boolean tryAgain = false; - return tryAgain; + switch (gjre.getStatusCode()) { + case 400 : + LOGGER.warn("Bad Request : {}", gjre); + break; + case 401 : + LOGGER.warn("Invalid Credentials : {}", gjre); + break; + case 403 : + LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage()); + backOff.backOff(); + tryAgain = true; + break; + case 503 : + LOGGER.warn("Google Backend Service Error : {}", gjre); + break; + default: + LOGGER.warn("Google Service returned error : {}", gjre); + tryAgain = true; + backOff.backOff(); + break; } + + return tryAgain; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java index ab77467..1442f8b 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java @@ -19,6 +19,16 @@ package com.youtube.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.util.ComponentUtils; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; + import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.http.HttpTransport; @@ -34,15 +44,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.util.ComponentUtils; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; -import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; import org.apache.youtube.pojo.YoutubeConfiguration; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -64,205 +65,213 @@ import java.util.concurrent.atomic.AtomicBoolean; public abstract class YoutubeProvider implements StreamsProvider { - public static final String STREAMS_ID = "YoutubeProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class); - private final static int MAX_BATCH_SIZE = 1000; - - // This OAuth 2.0 access scope allows for full read/write access to the - // authenticated user's account. - private List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube"); - - /** - * Define a global instance of the HTTP transport. - */ - public static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport(); - - /** - * Define a global instance of the JSON factory. - */ - public static final JsonFactory JSON_FACTORY = new JacksonFactory(); - - private static final int DEFAULT_THREAD_POOL_SIZE = 5; - - private List<ListenableFuture<Object>> futures = new ArrayList<>(); - - private ListeningExecutorService executor; - private BlockingQueue<StreamsDatum> datumQueue; - private AtomicBoolean isComplete; - private boolean previousPullWasEmpty; - - protected YouTube youtube; - protected YoutubeConfiguration config; - - public YoutubeProvider() { - this.config = new ComponentConfigurator<>(YoutubeConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube")); - - Preconditions.checkNotNull(this.config.getApiKey()); - } - - public YoutubeProvider(YoutubeConfiguration config) { - this.config = config; - - Preconditions.checkNotNull(this.config.getApiKey()); - } - - @Override - public String getId() { - return STREAMS_ID; + public static final String STREAMS_ID = "YoutubeProvider"; + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class); + private static final int MAX_BATCH_SIZE = 1000; + + // This OAuth 2.0 access scope allows for full read/write access to the + // authenticated user's account. + private List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube"); + + /** + * Define a global instance of the HTTP transport. + */ + public static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport(); + + /** + * Define a global instance of the JSON factory. + */ + public static final JsonFactory JSON_FACTORY = new JacksonFactory(); + + private static final int DEFAULT_THREAD_POOL_SIZE = 5; + + private List<ListenableFuture<Object>> futures = new ArrayList<>(); + + private ListeningExecutorService executor; + private BlockingQueue<StreamsDatum> datumQueue; + private AtomicBoolean isComplete; + private boolean previousPullWasEmpty; + + protected YouTube youtube; + protected YoutubeConfiguration config; + + /** + * YoutubeProvider constructor. + * Resolves config from JVM 'youtube'. + */ + public YoutubeProvider() { + this.config = new ComponentConfigurator<>(YoutubeConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube")); + + Preconditions.checkNotNull(this.config.getApiKey()); + } + + /** + * YoutubeProvider constructor - uses supplied YoutubeConfiguration. + * @param config YoutubeConfiguration + */ + public YoutubeProvider(YoutubeConfiguration config) { + this.config = config; + + Preconditions.checkNotNull(this.config.getApiKey()); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + try { + this.youtube = createYouTubeClient(); + } catch (IOException | GeneralSecurityException ex) { + LOGGER.error("Failed to created oauth for YouTube : {}", ex); + throw new RuntimeException(ex); } - @Override - public void prepare(Object configurationObject) { - try { - this.youtube = createYouTubeClient(); - } catch (IOException |GeneralSecurityException e) { - LOGGER.error("Failed to created oauth for YouTube : {}", e); - throw new RuntimeException(e); - } - - this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE)); - this.datumQueue = new LinkedBlockingQueue<>(1000); - this.isComplete = new AtomicBoolean(false); - this.previousPullWasEmpty = false; + this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE)); + this.datumQueue = new LinkedBlockingQueue<>(1000); + this.isComplete = new AtomicBoolean(false); + this.previousPullWasEmpty = false; + } + + @Override + public void startStream() { + BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); + + for (UserInfo user : this.config.getYoutubeUsers()) { + if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) { + user.setAfterDate(this.config.getDefaultAfterDate()); + } + if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { + user.setBeforeDate(this.config.getDefaultBeforeDate()); + } + + ListenableFuture future = executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.youtube, user)); + futures.add(future); } - @Override - public void startStream() { - BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); - - for(UserInfo user : this.config.getYoutubeUsers()) { - if(this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) { - user.setAfterDate(this.config.getDefaultAfterDate()); - } - if(this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { - user.setBeforeDate(this.config.getDefaultBeforeDate()); - } - - ListenableFuture future = executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.youtube, user)); - futures.add(future); - } - - this.executor.shutdown(); + this.executor.shutdown(); + } + + protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo); + + @Override + public StreamsResultSet readCurrent() { + BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>(); + int batchCount = 0; + while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) { + StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue); + if (datum != null) { + ++batchCount; + ComponentUtils.offerUntilSuccess(datum, batch); + } } - - protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo); - - @Override - public StreamsResultSet readCurrent() { - BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>(); - int batchCount = 0; - while(!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) { - StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue); - if(datum != null) { - ++batchCount; - ComponentUtils.offerUntilSuccess(datum, batch); - } - } - return new StreamsResultSet(batch); + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @VisibleForTesting + protected YouTube createYouTubeClient() throws IOException, GeneralSecurityException { + GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder() + .setTransport(HTTP_TRANSPORT) + .setJsonFactory(JSON_FACTORY) + .setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress()) + .setServiceAccountScopes(scopes); + + if ( !Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) { + File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile()); + if ( p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) { + credentialBuilder = credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile); + } } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; + Credential credential = credentialBuilder.build(); + return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential).setApplicationName("Streams Application").build(); + } + + @Override + public void cleanUp() { + ComponentUtils.shutdownExecutor(this.executor, 10, 10); + this.executor = null; + } + + public YoutubeConfiguration getConfig() { + return config; + } + + public void setConfig(YoutubeConfiguration config) { + this.config = config; + } + + /** + * Set and overwrite the default before date that was read from the configuration file. + * @param defaultBeforeDate defaultBeforeDate + */ + public void setDefaultBeforeDate(DateTime defaultBeforeDate) { + this.config.setDefaultBeforeDate(defaultBeforeDate); + } + + /** + * Set and overwrite the default after date that was read from teh configuration file. + * @param defaultAfterDate defaultAfterDate + */ + public void setDefaultAfterDate(DateTime defaultAfterDate) { + this.config.setDefaultAfterDate(defaultAfterDate); + } + + /** + * Sets and overwrite the user info from the configuaration file. Uses the defaults before and after dates. + * @param userIds Set of String userIds + */ + public void setUserInfoWithDefaultDates(Set<String> userIds) { + List<UserInfo> youtubeUsers = new LinkedList<>(); + + for (String userId : userIds) { + UserInfo user = new UserInfo(); + user.setUserId(userId); + user.setAfterDate(this.config.getDefaultAfterDate()); + user.setBeforeDate(this.config.getDefaultBeforeDate()); + youtubeUsers.add(user); } - @VisibleForTesting - protected YouTube createYouTubeClient() throws IOException, GeneralSecurityException { - GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder() - .setTransport(HTTP_TRANSPORT) - .setJsonFactory(JSON_FACTORY) - .setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress()) - .setServiceAccountScopes(scopes); - - if( !Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) { - File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile()); - if( p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) { - credentialBuilder = credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile); - } - } - Credential credential = credentialBuilder.build(); - return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential).setApplicationName("Streams Application").build(); + this.config.setYoutubeUsers(youtubeUsers); + } + + /** + * Set and overwrite user into from teh configuration file. Only sets after dater. + * @param usersAndAfterDates usersAndAfterDates + */ + public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { + List<UserInfo> youtubeUsers = new LinkedList<>(); + + for (String userId : usersAndAfterDates.keySet()) { + UserInfo user = new UserInfo(); + user.setUserId(userId); + user.setAfterDate(usersAndAfterDates.get(userId)); + youtubeUsers.add(user); } - @Override - public void cleanUp() { - ComponentUtils.shutdownExecutor(this.executor, 10, 10); - this.executor = null; - } - - public YoutubeConfiguration getConfig() { - return config; - } - - public void setConfig(YoutubeConfiguration config) { - this.config = config; - } - - /** - * Set and overwrite the default before date that was read from the configuration file. - * @param defaultBeforeDate - */ - public void setDefaultBeforeDate(DateTime defaultBeforeDate) { - this.config.setDefaultBeforeDate(defaultBeforeDate); - } - - /** - * Set and overwrite the default after date that was read from teh configuration file. - * @param defaultAfterDate - */ - public void setDefaultAfterDate(DateTime defaultAfterDate) { - this.config.setDefaultAfterDate(defaultAfterDate); - } - - /** - * Sets and overwrite the user info from the configuaration file. Uses the defaults before and after dates. - * @param userIds - */ - public void setUserInfoWithDefaultDates(Set<String> userIds) { - List<UserInfo> youtubeUsers = new LinkedList<>(); - - for(String userId : userIds) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(this.config.getDefaultAfterDate()); - user.setBeforeDate(this.config.getDefaultBeforeDate()); - youtubeUsers.add(user); - } - - this.config.setYoutubeUsers(youtubeUsers); - } - - /** - * Set and overwrite user into from teh configuration file. Only sets after dater. - * @param usersAndAfterDates - */ - public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { - List<UserInfo> youtubeUsers = new LinkedList<>(); - - for(String userId : usersAndAfterDates.keySet()) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(usersAndAfterDates.get(userId)); - youtubeUsers.add(user); - } - - this.config.setYoutubeUsers(youtubeUsers); - } + this.config.setYoutubeUsers(youtubeUsers); + } - @Override - public boolean isRunning() { - if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - isComplete.set(true); - LOGGER.info("Exiting"); - } - return !isComplete.get(); + @Override + public boolean isRunning() { + if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isComplete.set(true); + LOGGER.info("Exiting"); } + return !isComplete.get(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java index 76a69f3..9975dd9 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java @@ -19,6 +19,11 @@ package com.youtube.provider; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; @@ -29,10 +34,6 @@ import com.google.api.services.youtube.model.ActivityListResponse; import com.google.api.services.youtube.model.Video; import com.google.api.services.youtube.model.VideoListResponse; import com.google.gson.Gson; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.youtube.pojo.YoutubeConfiguration; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -42,169 +43,187 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.BlockingQueue; +/** + * YoutubeDataCollector for YoutubeUserActivityProvider. + */ public class YoutubeUserActivityCollector extends YoutubeDataCollector { - /** - * Max results allowed per request - * https://developers.google.com/+/api/latest/activities/list - */ - private static final long MAX_RESULTS = 50; - private static final int MAX_ATTEMPTS = 5; - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeUserActivityCollector.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - static { //set up mapper for Google Activity Object - SimpleModule simpleModule = new SimpleModule(); - MAPPER.registerModule(simpleModule); - MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - private BlockingQueue<StreamsDatum> datumQueue; - private BackOffStrategy backOff; - private YouTube youtube; - private UserInfo userInfo; - private YoutubeConfiguration config; - - Gson gson = new Gson(); - - public YoutubeUserActivityCollector(YouTube youtube, BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo userInfo, YoutubeConfiguration config) { - this.youtube = youtube; - this.datumQueue = datumQueue; - this.backOff = backOff; - this.userInfo = userInfo; - this.config = config; - } - - @Override - public void run() { - collectActivityData(); - } - - /** - * Iterate through all users in the Youtube configuration and collect all videos - * associated with their accounts. - */ - protected void collectActivityData() { + /** + * Max results allowed per request + * https://developers.google.com/+/api/latest/activities/list + */ + private static final long MAX_RESULTS = 50; + private static final int MAX_ATTEMPTS = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeUserActivityCollector.class); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + static { //set up mapper for Google Activity Object + SimpleModule simpleModule = new SimpleModule(); + MAPPER.registerModule(simpleModule); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + private BlockingQueue<StreamsDatum> datumQueue; + private BackOffStrategy backOff; + private YouTube youtube; + private UserInfo userInfo; + private YoutubeConfiguration config; + + Gson gson = new Gson(); + + /** + * YoutubeUserActivityCollector constructor. + * @param youtube YouTube + * @param datumQueue BlockingQueue of StreamsDatum + * @param backOff BackOffStrategy + * @param userInfo UserInfo + * @param config YoutubeConfiguration + */ + public YoutubeUserActivityCollector( + YouTube youtube, + BlockingQueue<StreamsDatum> datumQueue, + BackOffStrategy backOff, + UserInfo userInfo, + YoutubeConfiguration config) { + this.youtube = youtube; + this.datumQueue = datumQueue; + this.backOff = backOff; + this.userInfo = userInfo; + this.config = config; + } + + @Override + public void run() { + collectActivityData(); + } + + /** + * Iterate through all users in the Youtube configuration and collect all videos + * associated with their accounts. + */ + protected void collectActivityData() { + try { + YouTube.Activities.List request = null; + ActivityListResponse feed = null; + + boolean tryAgain = false; + int attempt = 0; + DateTime afterDate = userInfo.getAfterDate(); + DateTime beforeDate = userInfo.getBeforeDate(); + + do { try { - YouTube.Activities.List request = null; - ActivityListResponse feed = null; - - boolean tryAgain = false; - int attempt = 0; - DateTime afterDate = userInfo.getAfterDate(); - DateTime beforeDate = userInfo.getBeforeDate(); - - do { - try { - if(request == null) { - request = this.youtube.activities().list("contentDetails") - .setChannelId(userInfo.getUserId()) - .setMaxResults(MAX_RESULTS) - .setKey(config.getApiKey()); - feed = request.execute(); - } else { - request = this.youtube.activities().list("contentDetails") - .setChannelId(userInfo.getUserId()) - .setMaxResults(MAX_RESULTS) - .setPageToken(feed.getNextPageToken()) - .setKey(config.getApiKey()); - feed = request.execute(); - } - this.backOff.reset(); //successful pull reset api. - - processActivityFeed(feed, afterDate, beforeDate); - } catch (GoogleJsonResponseException gjre) { - tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff); - ++attempt; - } - } while((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS); - } catch (Throwable t) { - if(t instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - t.printStackTrace(); - LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), t); + if (request == null) { + request = this.youtube.activities().list("contentDetails") + .setChannelId(userInfo.getUserId()) + .setMaxResults(MAX_RESULTS) + .setKey(config.getApiKey()); + feed = request.execute(); + } else { + request = this.youtube.activities().list("contentDetails") + .setChannelId(userInfo.getUserId()) + .setMaxResults(MAX_RESULTS) + .setPageToken(feed.getNextPageToken()) + .setKey(config.getApiKey()); + feed = request.execute(); + } + this.backOff.reset(); //successful pull reset api. + + processActivityFeed(feed, afterDate, beforeDate); + } catch (GoogleJsonResponseException gjre) { + tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff); + ++attempt; } + } + while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS); + } catch (Throwable throwable) { + if (throwable instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throwable.printStackTrace(); + LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), throwable); } - - /** - * Given a feed and an after and before date, fetch all relevant user videos - * and place them into the datumQueue for post-processing - * @param feed - * @param afterDate - * @param beforeDate - * @throws IOException - * @throws InterruptedException - */ - void processActivityFeed(ActivityListResponse feed, DateTime afterDate, DateTime beforeDate) throws IOException, InterruptedException { - for(com.google.api.services.youtube.model.Activity activity : feed.getItems()) { - try { - List<Video> videos = Lists.newArrayList(); - - if (activity.getContentDetails().getUpload() != null) { - videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId())); - } - if (activity.getContentDetails().getPlaylistItem() != null && activity.getContentDetails().getPlaylistItem().getResourceId() != null) { - videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId())); - } - - processVideos(videos, afterDate, beforeDate, activity, feed); - } catch (Exception e) { - LOGGER.error("Error while trying to process activity: {}, {}", activity, e); - } + } + + /** + * Given a feed and an after and before date, fetch all relevant user videos + * and place them into the datumQueue for post-processing. + * @param feed ActivityListResponse + * @param afterDate DateTime + * @param beforeDate DateTime + * @throws IOException IOException + * @throws InterruptedException InterruptedException + */ + void processActivityFeed(ActivityListResponse feed, DateTime afterDate, DateTime beforeDate) throws IOException, InterruptedException { + for (com.google.api.services.youtube.model.Activity activity : feed.getItems()) { + try { + List<Video> videos = Lists.newArrayList(); + + if (activity.getContentDetails().getUpload() != null) { + videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId())); } - } - - /** - * Process a list of Video objects - * @param videos - * @param afterDate - * @param beforeDate - * @param activity - * @param feed - */ - void processVideos(List<Video> videos, DateTime afterDate, DateTime beforeDate, com.google.api.services.youtube.model.Activity activity, ActivityListResponse feed) { - try { - for (Video video : videos) { - if (video != null) { - org.joda.time.DateTime published = new org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue()); - if ((afterDate == null && beforeDate == null) - || (beforeDate == null && afterDate.isBefore(published)) - || (afterDate == null && beforeDate.isAfter(published)) - || ((afterDate != null && beforeDate != null) && (afterDate.isAfter(published) && beforeDate.isBefore(published)))) { - LOGGER.debug("Providing Youtube Activity: {}", MAPPER.writeValueAsString(video)); - this.datumQueue.put(new StreamsDatum(gson.toJson(video), activity.getId())); - } else if (afterDate != null && afterDate.isAfter(published)) { - feed.setNextPageToken(null); // do not fetch next page - break; - } - } - } - } catch (Exception e) { - LOGGER.error("Exception while trying to process video list: {}, {}", videos, e); + if (activity.getContentDetails().getPlaylistItem() != null && activity.getContentDetails().getPlaylistItem().getResourceId() != null) { + videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId())); } - } - /** - * Given a Youtube videoId, return the relevant Youtube Video object - * @param videoId - * @return - * @throws IOException - */ - List<Video> getVideoList(String videoId) throws IOException { - VideoListResponse videosListResponse = this.youtube.videos().list("snippet,statistics") - .setId(videoId) - .setKey(config.getApiKey()) - .execute(); - - if(videosListResponse.getItems().size() == 0) { - LOGGER.debug("No Youtube videos found for videoId: {}", videoId); - return Lists.newArrayList(); + processVideos(videos, afterDate, beforeDate, activity, feed); + } catch (Exception ex) { + LOGGER.error("Error while trying to process activity: {}, {}", activity, ex); + } + } + } + + /** + * Process a list of Video objects. + * @param videos List of Video + * @param afterDate afterDate + * @param beforeDate beforeDate + * @param activity com.google.api.services.youtube.model.Activity + * @param feed ActivityListResponse + */ + void processVideos(List<Video> videos, DateTime afterDate, DateTime beforeDate, com.google.api.services.youtube.model.Activity activity, ActivityListResponse feed) { + try { + for (Video video : videos) { + if (video != null) { + org.joda.time.DateTime published = new org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue()); + if ((afterDate == null && beforeDate == null) + || (beforeDate == null && afterDate.isBefore(published)) + || (afterDate == null && beforeDate.isAfter(published)) + || ((afterDate != null && beforeDate != null) && (afterDate.isAfter(published) && beforeDate.isBefore(published)))) { + LOGGER.debug("Providing Youtube Activity: {}", MAPPER.writeValueAsString(video)); + this.datumQueue.put(new StreamsDatum(gson.toJson(video), activity.getId())); + } else if (afterDate != null && afterDate.isAfter(published)) { + feed.setNextPageToken(null); // do not fetch next page + break; + } } - - return videosListResponse.getItems(); + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to process video list: {}, {}", videos, ex); } - - BlockingQueue<StreamsDatum> getDatumQueue() { - return this.datumQueue; + } + + /** + * Given a Youtube videoId, return the relevant Youtube Video object. + * @param videoId videoId + * @return List of Video + * @throws IOException + */ + List<Video> getVideoList(String videoId) throws IOException { + VideoListResponse videosListResponse = this.youtube.videos().list("snippet,statistics") + .setId(videoId) + .setKey(config.getApiKey()) + .execute(); + + if (videosListResponse.getItems().size() == 0) { + LOGGER.debug("No Youtube videos found for videoId: {}", videoId); + return Lists.newArrayList(); } + + return videosListResponse.getItems(); + } + + BlockingQueue<StreamsDatum> getDatumQueue() { + return this.datumQueue; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java index ed3dc63..934a0e5 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java @@ -19,6 +19,14 @@ package com.youtube.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.services.youtube.YouTube; @@ -27,13 +35,6 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.youtube.pojo.YoutubeConfiguration; import java.io.BufferedOutputStream; @@ -46,76 +47,86 @@ import java.util.concurrent.TimeUnit; /** * Retrieve recent activity from a list of user ids or names. - * - * To use from command line: - * - * Supply (at least) the following required configuration in application.conf: - * - * youtube.oauth.pathToP12KeyFile - * youtube.oauth.serviceAccountEmailAddress - * youtube.apiKey - * youtube.youtubeUsers - * - * Launch using: - * - * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" */ public class YoutubeUserActivityProvider extends YoutubeProvider { - public YoutubeUserActivityProvider() { - super(); - } + public YoutubeUserActivityProvider() { + super(); + } - public YoutubeUserActivityProvider(YoutubeConfiguration config) { - super(config); - } + public YoutubeUserActivityProvider(YoutubeConfiguration config) { + super(config); + } - @Override - protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { - return new YoutubeUserActivityCollector(youtube, queue, strategy, userInfo, config); - } + @Override + protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { + return new YoutubeUserActivityCollector(youtube, queue, strategy, userInfo, config); + } + + /** + * To use from command line: + * + * <p/> + * Supply (at least) the following required configuration in application.conf: + * + * <p/> + * youtube.oauth.pathToP12KeyFile + * youtube.oauth.serviceAccountEmailAddress + * youtube.apiKey + * youtube.youtubeUsers + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); + YoutubeUserActivityProvider provider = new YoutubeUserActivityProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); - YoutubeUserActivityProvider provider = new YoutubeUserActivityProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - if( datum.getDocument() instanceof String ) - json = (String)datum.getDocument(); - else - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + if ( datum.getDocument() instanceof String ) { + json = (String) datum.getDocument(); + } else { + json = mapper.writeValueAsString(datum.getDocument()); + } + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } }
