STREAMS-279 | Youtube Channel data. Youtubes version of biohistory
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/850ae97d Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/850ae97d Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/850ae97d Branch: refs/heads/asf-master Commit: 850ae97dd2ccf15455778f8c95e49afc35e103de Parents: 1ac8c83 Author: Ryan Ebanks <ryaneba...@gmail.com> Authored: Tue Feb 17 10:56:48 2015 -0600 Committer: Ryan Ebanks <ryaneba...@gmail.com> Committed: Tue Feb 17 10:56:48 2015 -0600 ---------------------------------------------------------------------- .../youtube/processor/YoutubeTypeConverter.java | 15 ++- .../provider/YoutubeChannelDataCollector.java | 79 +++++++++++ .../provider/YoutubeChannelProvider.java | 19 +++ .../youtube/serializer/YoutubeActivityUtil.java | 37 ++++++ .../serializer/YoutubeChannelDeserializer.java | 131 +++++++++++++++++++ .../serializer/YoutubeEventClassifier.java | 5 +- .../YoutubeChannelDataCollectorTest.java | 82 ++++++++++++ 7 files changed, 366 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/850ae97d/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java index a99fef1..35a3aec 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java @@ -19,11 +19,14 @@ package com.youtube.processor; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.api.services.youtube.model.Channel; import com.google.api.services.youtube.model.Video; import com.google.common.collect.Lists; import com.youtube.serializer.YoutubeActivityUtil; +import com.youtube.serializer.YoutubeChannelDeserializer; import com.youtube.serializer.YoutubeEventClassifier; import com.youtube.serializer.YoutubeVideoDeserializer; +import org.apache.commons.lang.NotImplementedException; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -64,6 +67,11 @@ public class YoutubeTypeConverter implements StreamsProcessor { if(item instanceof Video) { activity = new Activity(); youtubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId()); + } else if(item instanceof Channel) { + activity = new Activity(); + this.youtubeActivityUtil.updateActivity((Channel)item, activity, null); + } else { + throw new NotImplementedException("Type conversion not implement for type : "+item.getClass().getName()); } if(activity != null) { @@ -83,9 +91,11 @@ public class YoutubeTypeConverter implements StreamsProcessor { private Object deserializeItem(Object item) { try { Class klass = YoutubeEventClassifier.detectClass((String) item); - + System.out.println(klass.getName()); if (klass.equals(Video.class)) { item = mapper.readValue((String) item, Video.class); + } else if(klass.equals(Channel.class)) { + item = mapper.readValue((String) item, Channel.class); } } catch (Exception e) { LOGGER.error("Exception while trying to deserializeItem: {}", e); @@ -102,6 +112,9 @@ public class YoutubeTypeConverter implements StreamsProcessor { SimpleModule simpleModule = new SimpleModule(); simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer()); mapper.registerModule(simpleModule); + simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Channel.class, new YoutubeChannelDeserializer()); + mapper.registerModule(simpleModule); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/850ae97d/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java new file mode 100644 index 0000000..0ab7445 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java @@ -0,0 +1,79 @@ +package com.youtube.provider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpRequest; +import com.google.api.services.youtube.YouTube; +import com.google.api.services.youtube.model.Channel; +import com.google.api.services.youtube.model.ChannelListResponse; +import org.apache.commons.lang3.StringUtils; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.youtube.pojo.YoutubeConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/** + * + */ +public class YoutubeChannelDataCollector extends YoutubeDataCollector{ + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeChannelDataCollector.class); + private static final String CONTENT = "snippet,contentDetails,statistics,topicDetails"; + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final int MAX_ATTEMPTS= 5; + + private YouTube youTube; + private BlockingQueue<StreamsDatum> queue; + private BackOffStrategy strategy; + private UserInfo userInfo; + private YoutubeConfiguration youtubeConfig; + + public YoutubeChannelDataCollector(YouTube youTube, BlockingQueue<StreamsDatum> queue, BackOffStrategy strategy, UserInfo userInfo, YoutubeConfiguration youtubeConfig) { + this.youTube = youTube; + this.queue = queue; + this.strategy = strategy; + this.userInfo = userInfo; + this.youtubeConfig = youtubeConfig; + } + + @Override + public void run() { + try { + int attempt = 0; + YouTube.Channels.List channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey()); + boolean tryAgain = false; + do { + try { + List<Channel> channels = channelLists.execute().getItems(); + for (Channel channel : channels) { + this.queue.put(new StreamsDatum(MAPPER.writeValueAsString(channel), channel.getId())); + } + if (StringUtils.isEmpty(channelLists.getPageToken())) { + channelLists = null; + } else { + channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setOauthToken(this.youtubeConfig.getApiKey()) + .setPageToken(channelLists.getPageToken()); + } + } catch (GoogleJsonResponseException gjre) { + LOGGER.warn("GoogleJsonResposneException caught : {}", gjre); + tryAgain = backoffAndIdentifyIfRetry(gjre, this.strategy); + ++attempt; + } catch (Throwable t) { + LOGGER.warn("Unable to get channel info for id : {}", this.userInfo.getUserId()); + LOGGER.warn("Excpection thrown while trying to get channel info : {}", t); + } + } while((tryAgain && attempt < MAX_ATTEMPTS) || channelLists != null); + + } catch (Throwable t) { + LOGGER.warn(t.getMessage()); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/850ae97d/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java new file mode 100644 index 0000000..807edab --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java @@ -0,0 +1,19 @@ +package com.youtube.provider; + +import com.google.api.services.youtube.YouTube; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + +import java.util.concurrent.BlockingQueue; + +/** + * + */ +public class YoutubeChannelProvider extends YoutubeProvider { + + @Override + protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { + return new YoutubeChannelDataCollector(youtube, queue, strategy, userInfo, this.config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/850ae97d/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java index bd10db7..cf57e55 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java @@ -19,6 +19,8 @@ package com.youtube.serializer; +import com.google.api.client.util.Maps; +import com.google.api.services.youtube.model.Channel; import com.google.api.services.youtube.model.Thumbnail; import com.google.api.services.youtube.model.ThumbnailDetails; import com.google.api.services.youtube.model.Video; @@ -28,6 +30,7 @@ import com.google.common.collect.Lists; import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.*; +import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +70,40 @@ public class YoutubeActivityUtil { addYoutubeExtensions(activity, video); } + + /** + * Given a {@link com.google.api.services.youtube.model.Channel} object and an + * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details + * + * @param channel + * @param activity + * @throws org.apache.streams.exceptions.ActivitySerializerException + */ + public static void updateActivity(Channel channel, Activity activity, String channelId) throws ActivitySerializerException { + try { + activity.setProvider(getProvider()); + Actor actor = new Actor(); + activity.setVerb("post"); + actor.setId("id:youtube:"+channel.getId()); + actor.setSummary(channel.getSnippet().getDescription()); + actor.setDisplayName(channel.getSnippet().getTitle()); + Image image = new Image(); + image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl()); + actor.setImage(image); + actor.setUrl("https://youtube.com/user/" + channel.getId()); + Map<String, Object> actorExtensions = Maps.newHashMap(); + actorExtensions.put("followers", channel.getStatistics().getSubscriberCount()); + actorExtensions.put("posts", channel.getStatistics().getVideoCount()); + actor.setAdditionalProperty("extensions", actorExtensions); + activity.setActor(actor); + Map<String, Object> extensions = Maps.newHashMap(); + extensions.put("youtube", channel); + activity.setAdditionalProperty("extensions", extensions); + } catch (Throwable t) { + throw new ActivitySerializerException(t); + } + } + /** * Given a video object, create the appropriate activity object with a valid image * (thumbnail) and video URL http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/850ae97d/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java new file mode 100644 index 0000000..2a70594 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java @@ -0,0 +1,131 @@ +package com.youtube.serializer; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.api.client.util.DateTime; +import com.google.api.services.youtube.model.Channel; +import com.google.api.services.youtube.model.ChannelContentDetails; +import com.google.api.services.youtube.model.ChannelLocalization; +import com.google.api.services.youtube.model.ChannelSnippet; +import com.google.api.services.youtube.model.ChannelStatistics; +import com.google.api.services.youtube.model.ChannelTopicDetails; +import com.google.api.services.youtube.model.Thumbnail; +import com.google.api.services.youtube.model.ThumbnailDetails; +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +/** + * + */ +public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> { + + + @Override + public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { + JsonNode node = jp.getCodec().readTree(jp); + try { + Channel channel = new Channel(); + if(node.findPath("etag") != null) + channel.setEtag(node.get("etag").asText()); + if(node.findPath("kind") != null) + channel.setKind(node.get("kind").asText()); + channel.setId(node.get("id").asText()); + channel.setTopicDetails(setTopicDetails(node.findValue("topicDetails"))); + channel.setStatistics(setChannelStatistics(node.findValue("statistics"))); + channel.setContentDetails(setContentDetails(node.findValue("contentDetails"))); + channel.setSnippet(setChannelSnippet(node.findValue("snippet"))); + return channel; + } catch (Throwable t) { + throw new IOException(t); + } + } + + protected ChannelSnippet setChannelSnippet(JsonNode node) { + ChannelSnippet snippet = new ChannelSnippet(); + snippet.setTitle(node.get("title").asText()); + snippet.setDescription(node.get("description").asText()); + snippet.setPublishedAt(new DateTime(node.get("publishedAt").get("value").longValue())); + snippet.setLocalized(setLocalized(node.findValue("localized"))); + snippet.setThumbnails(setThumbnails(node.findValue("thumbnails"))); + return snippet; + } + + protected ThumbnailDetails setThumbnails(JsonNode node) { + ThumbnailDetails details = new ThumbnailDetails(); + if(node == null) { + return details; + } + details.setDefault(new Thumbnail().setUrl(node.get("default").get("url").asText())); + details.setHigh(new Thumbnail().setUrl(node.get("high").get("url").asText())); + details.setMedium(new Thumbnail().setUrl(node.get("medium").get("url").asText())); + return details; + } + + protected ChannelLocalization setLocalized(JsonNode node) { + if(node == null) { + return new ChannelLocalization(); + } + ChannelLocalization localization = new ChannelLocalization(); + localization.setDescription(node.get("description").asText()); + localization.setTitle(node.get("title").asText()); + return localization; + } + + protected ChannelContentDetails setContentDetails(JsonNode node) { + ChannelContentDetails contentDetails = new ChannelContentDetails(); + if(node == null) { + return contentDetails; + } + if(node.findValue("googlePlusUserId") != null) + contentDetails.setGooglePlusUserId(node.get("googlePlusUserId").asText()); + contentDetails.setRelatedPlaylists(setRelatedPlaylists(node.findValue("relatedPlaylists"))); + return contentDetails; + } + + protected ChannelContentDetails.RelatedPlaylists setRelatedPlaylists(JsonNode node) { + ChannelContentDetails.RelatedPlaylists playlists = new ChannelContentDetails.RelatedPlaylists(); + if(node == null) { + return playlists; + } + if(node.findValue("favorites") != null) + playlists.setFavorites(node.get("favorites").asText()); + if(node.findValue("likes") != null) + playlists.setLikes(node.get("likes").asText()); + if(node.findValue("uploads") != null) + playlists.setUploads(node.get("uploads").asText()); + return playlists; + } + + protected ChannelStatistics setChannelStatistics(JsonNode node) { + ChannelStatistics stats = new ChannelStatistics(); + if(node == null) { + return stats; + } + stats.setCommentCount(node.get("commentCount").bigIntegerValue()); + stats.setHiddenSubscriberCount(node.get("hiddenSubscriberCount").asBoolean()); + stats.setSubscriberCount(node.get("subscriberCount").bigIntegerValue()); + stats.setVideoCount(node.get("videoCount").bigIntegerValue()); + stats.setViewCount(node.get("viewCount").bigIntegerValue()); + return stats; + } + + protected ChannelTopicDetails setTopicDetails(JsonNode node) { + ChannelTopicDetails details = new ChannelTopicDetails(); + if(node == null) { + return details; + } + List<String> topicIds = Lists.newLinkedList(); + Iterator<JsonNode> it = node.get("topicIds").iterator(); + while(it.hasNext()) { + topicIds.add(it.next().asText()); + } + details.setTopicIds(topicIds); + return details; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/850ae97d/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java index 08c5b09..c3860f4 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java @@ -30,6 +30,7 @@ import java.io.IOException; public class YoutubeEventClassifier { private static ObjectMapper mapper = new StreamsJacksonMapper(); private static final String VIDEO_IDENTIFIER = "\"youtube#video\""; + private static final String CHANNEL_IDENTIFIER = "youtube#channel"; public static Class detectClass(String json) { Preconditions.checkNotNull(json); @@ -45,7 +46,9 @@ public class YoutubeEventClassifier { if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(VIDEO_IDENTIFIER)) { return Video.class; - } else { + } else if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().contains(CHANNEL_IDENTIFIER)){ + return com.google.api.services.youtube.model.Channel.class; + } else { return ObjectNode.class; } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/850ae97d/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java new file mode 100644 index 0000000..2f256e2 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java @@ -0,0 +1,82 @@ +package com.youtube.provider; + +import com.google.api.services.youtube.YouTube; +import com.google.api.services.youtube.model.Channel; +import com.google.api.services.youtube.model.ChannelListResponse; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy; +import org.apache.youtube.pojo.YoutubeConfiguration; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.BlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Created by rebanks on 2/17/15. + */ +public class YoutubeChannelDataCollectorTest { + + private static final String ID = "12345"; + + @Test + public void testDataCollector() throws Exception { + YouTube youTube = createMockYoutube(); + BlockingQueue<StreamsDatum> queue = Queues.newLinkedBlockingQueue(); + BackOffStrategy strategy = new LinearTimeBackOffStrategy(1); + UserInfo userInfo = new UserInfo(); + userInfo.setUserId(ID); + YoutubeConfiguration config = new YoutubeConfiguration(); + config.setApiKey(ID); + YoutubeChannelDataCollector collector = new YoutubeChannelDataCollector(youTube, queue, strategy, userInfo, config); + collector.run(); + assertEquals(1, queue.size()); + StreamsDatum datum = queue.take(); + assertNotNull(datum); + String document = (String) datum.getDocument(); + assertNotNull(document); + } + + private YouTube createMockYoutube() throws Exception { + YouTube mockYouTube = mock(YouTube.class); + YouTube.Channels channels = createMockChannels(); + when(mockYouTube.channels()).thenReturn(channels); + return mockYouTube; + } + + private YouTube.Channels createMockChannels() throws Exception { + YouTube.Channels mockChannels = mock(YouTube.Channels.class); + YouTube.Channels.List channelLists = createMockChannelsList(); + when(mockChannels.list(anyString())).thenReturn(channelLists); + return mockChannels; + } + + private YouTube.Channels.List createMockChannelsList() throws Exception { + YouTube.Channels.List mockList = mock(YouTube.Channels.List.class); + when(mockList.setId(anyString())).thenReturn(mockList); + when(mockList.setKey(anyString())).thenReturn(mockList); + ChannelListResponse response = createMockResponse(); + when(mockList.execute()).thenReturn(response); + return mockList; + } + + private ChannelListResponse createMockResponse() { + ChannelListResponse response = new ChannelListResponse(); + List<Channel> channelList = Lists.newLinkedList(); + response.setItems(channelList); + Channel channel = new Channel(); + channel.setId(ID); + channelList.add(channel); + return response; + } + +}