http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java deleted file mode 100644 index 401b836..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.provider; - -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.util.ComponentUtils; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; -import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.repackaged.com.google.common.base.Strings; -import com.google.api.services.youtube.YouTube; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.apache.youtube.pojo.YoutubeConfiguration; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.math.BigInteger; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class YoutubeProvider implements StreamsProvider { - - private static final String STREAMS_ID = "YoutubeProvider"; - - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class); - private static final int MAX_BATCH_SIZE = 1000; - - // This OAuth 2.0 access scope allows for full read/write access to the - // authenticated user's account. - private List<String> scopes = Collections.singletonList("https://www.googleapis.com/auth/youtube"); - - /** - * Define a global instance of the HTTP transport. - */ - private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport(); - - /** - * Define a global instance of the JSON factory. - */ - private static final JsonFactory JSON_FACTORY = new JacksonFactory(); - - private static final int DEFAULT_THREAD_POOL_SIZE = 5; - - private List<ListenableFuture<Object>> futures = new ArrayList<>(); - - private ListeningExecutorService executor; - private BlockingQueue<StreamsDatum> datumQueue; - private AtomicBoolean isComplete; - private boolean previousPullWasEmpty; - - protected YouTube youtube; - protected YoutubeConfiguration config; - - /** - * YoutubeProvider constructor. - * Resolves config from JVM 'youtube'. - */ - public YoutubeProvider() { - this.config = new ComponentConfigurator<>(YoutubeConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube")); - - Objects.requireNonNull(this.config.getApiKey()); - } - - /** - * YoutubeProvider constructor - uses supplied YoutubeConfiguration. - * @param config YoutubeConfiguration - */ - public YoutubeProvider(YoutubeConfiguration config) { - this.config = config; - - Objects.requireNonNull(this.config.getApiKey()); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public void prepare(Object configurationObject) { - try { - this.youtube = createYouTubeClient(); - } catch (IOException | GeneralSecurityException ex) { - LOGGER.error("Failed to created oauth for YouTube : {}", ex); - throw new RuntimeException(ex); - } - - this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE)); - this.datumQueue = new LinkedBlockingQueue<>(1000); - this.isComplete = new AtomicBoolean(false); - this.previousPullWasEmpty = false; - } - - @Override - public void startStream() { - BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); - - for (UserInfo user : this.config.getYoutubeUsers()) { - if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) { - user.setAfterDate(this.config.getDefaultAfterDate()); - } - if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { - user.setBeforeDate(this.config.getDefaultBeforeDate()); - } - - ListenableFuture future = executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.youtube, user)); - futures.add(future); - } - - this.executor.shutdown(); - } - - protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo); - - @Override - public StreamsResultSet readCurrent() { - BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>(); - int batchCount = 0; - while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) { - StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue); - if (datum != null) { - ++batchCount; - ComponentUtils.offerUntilSuccess(datum, batch); - } - } - return new StreamsResultSet(batch); - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @VisibleForTesting - protected YouTube createYouTubeClient() throws IOException, GeneralSecurityException { - GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder() - .setTransport(HTTP_TRANSPORT) - .setJsonFactory(JSON_FACTORY) - .setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress()) - .setServiceAccountScopes(scopes); - - if ( !Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) { - File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile()); - if ( p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) { - credentialBuilder = credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile); - } - } - Credential credential = credentialBuilder.build(); - return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential).setApplicationName("Streams Application").build(); - } - - @Override - public void cleanUp() { - ComponentUtils.shutdownExecutor(this.executor, 10, 10); - this.executor = null; - } - - public YoutubeConfiguration getConfig() { - return config; - } - - public void setConfig(YoutubeConfiguration config) { - this.config = config; - } - - /** - * Set and overwrite the default before date that was read from the configuration file. - * @param defaultBeforeDate defaultBeforeDate - */ - public void setDefaultBeforeDate(DateTime defaultBeforeDate) { - this.config.setDefaultBeforeDate(defaultBeforeDate); - } - - /** - * Set and overwrite the default after date that was read from teh configuration file. - * @param defaultAfterDate defaultAfterDate - */ - public void setDefaultAfterDate(DateTime defaultAfterDate) { - this.config.setDefaultAfterDate(defaultAfterDate); - } - - /** - * Sets and overwrite the user info from the configuaration file. Uses the defaults before and after dates. - * @param userIds Set of String userIds - */ - public void setUserInfoWithDefaultDates(Set<String> userIds) { - List<UserInfo> youtubeUsers = new LinkedList<>(); - - for (String userId : userIds) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(this.config.getDefaultAfterDate()); - user.setBeforeDate(this.config.getDefaultBeforeDate()); - youtubeUsers.add(user); - } - - this.config.setYoutubeUsers(youtubeUsers); - } - - /** - * Set and overwrite user into from teh configuration file. Only sets after dater. - * @param usersAndAfterDates usersAndAfterDates - */ - public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { - List<UserInfo> youtubeUsers = new LinkedList<>(); - - for (String userId : usersAndAfterDates.keySet()) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(usersAndAfterDates.get(userId)); - youtubeUsers.add(user); - } - - this.config.setYoutubeUsers(youtubeUsers); - } - - @Override - public boolean isRunning() { - if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - isComplete.set(true); - LOGGER.info("Exiting"); - } - return !isComplete.get(); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java deleted file mode 100644 index 518a762..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.provider; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.services.youtube.YouTube; -import com.google.api.services.youtube.model.ActivityListResponse; -import com.google.api.services.youtube.model.Video; -import com.google.api.services.youtube.model.VideoListResponse; -import com.google.gson.Gson; -import org.apache.youtube.pojo.YoutubeConfiguration; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; - -/** - * YoutubeDataCollector for YoutubeUserActivityProvider. - */ -public class YoutubeUserActivityCollector extends YoutubeDataCollector { - - /** - * Max results allowed per request - * https://developers.google.com/+/api/latest/activities/list - */ - private static final long MAX_RESULTS = 50; - private static final int MAX_ATTEMPTS = 5; - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeUserActivityCollector.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - static { //set up mapper for Google Activity Object - SimpleModule simpleModule = new SimpleModule(); - MAPPER.registerModule(simpleModule); - MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private BlockingQueue<StreamsDatum> datumQueue; - private BackOffStrategy backOff; - private YouTube youtube; - private UserInfo userInfo; - private YoutubeConfiguration config; - - Gson gson = new Gson(); - - /** - * YoutubeUserActivityCollector constructor. - * @param youtube YouTube - * @param datumQueue BlockingQueue of StreamsDatum - * @param backOff BackOffStrategy - * @param userInfo UserInfo - * @param config YoutubeConfiguration - */ - public YoutubeUserActivityCollector( - YouTube youtube, - BlockingQueue<StreamsDatum> datumQueue, - BackOffStrategy backOff, - UserInfo userInfo, - YoutubeConfiguration config) { - this.youtube = youtube; - this.datumQueue = datumQueue; - this.backOff = backOff; - this.userInfo = userInfo; - this.config = config; - } - - @Override - public void run() { - collectActivityData(); - } - - /** - * Iterate through all users in the Youtube configuration and collect all videos - * associated with their accounts. - */ - protected void collectActivityData() { - try { - YouTube.Activities.List request = null; - ActivityListResponse feed = null; - - boolean tryAgain = false; - int attempt = 0; - DateTime afterDate = userInfo.getAfterDate(); - DateTime beforeDate = userInfo.getBeforeDate(); - - do { - try { - if (request == null) { - request = this.youtube.activities().list("contentDetails") - .setChannelId(userInfo.getUserId()) - .setMaxResults(MAX_RESULTS) - .setKey(config.getApiKey()); - feed = request.execute(); - } else { - request = this.youtube.activities().list("contentDetails") - .setChannelId(userInfo.getUserId()) - .setMaxResults(MAX_RESULTS) - .setPageToken(feed.getNextPageToken()) - .setKey(config.getApiKey()); - feed = request.execute(); - } - this.backOff.reset(); //successful pull reset api. - - processActivityFeed(feed, afterDate, beforeDate); - } catch (GoogleJsonResponseException gjre) { - tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff); - ++attempt; - } - } - while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS); - } catch (Throwable throwable) { - if (throwable instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throwable.printStackTrace(); - LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), throwable); - } - } - - /** - * Given a feed and an after and before date, fetch all relevant user videos - * and place them into the datumQueue for post-processing. - * @param feed ActivityListResponse - * @param afterDate DateTime - * @param beforeDate DateTime - * @throws IOException IOException - * @throws InterruptedException InterruptedException - */ - void processActivityFeed(ActivityListResponse feed, DateTime afterDate, DateTime beforeDate) throws IOException, InterruptedException { - for (com.google.api.services.youtube.model.Activity activity : feed.getItems()) { - try { - List<Video> videos = new ArrayList<>(); - - if (activity.getContentDetails().getUpload() != null) { - videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId())); - } - if (activity.getContentDetails().getPlaylistItem() != null && activity.getContentDetails().getPlaylistItem().getResourceId() != null) { - videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId())); - } - - processVideos(videos, afterDate, beforeDate, activity, feed); - } catch (Exception ex) { - LOGGER.error("Error while trying to process activity: {}, {}", activity, ex); - } - } - } - - /** - * Process a list of Video objects. - * @param videos List of Video - * @param afterDate afterDate - * @param beforeDate beforeDate - * @param activity com.google.api.services.youtube.model.Activity - * @param feed ActivityListResponse - */ - void processVideos(List<Video> videos, DateTime afterDate, DateTime beforeDate, com.google.api.services.youtube.model.Activity activity, ActivityListResponse feed) { - try { - for (Video video : videos) { - if (video != null) { - org.joda.time.DateTime published = new org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue()); - if ((afterDate == null && beforeDate == null) - || (beforeDate == null && afterDate.isBefore(published)) - || (afterDate == null && beforeDate.isAfter(published)) - || ((afterDate != null && beforeDate != null) && (afterDate.isAfter(published) && beforeDate.isBefore(published)))) { - LOGGER.debug("Providing Youtube Activity: {}", MAPPER.writeValueAsString(video)); - this.datumQueue.put(new StreamsDatum(gson.toJson(video), activity.getId())); - } else if (afterDate != null && afterDate.isAfter(published)) { - feed.setNextPageToken(null); // do not fetch next page - break; - } - } - } - } catch (Exception ex) { - LOGGER.error("Exception while trying to process video list: {}, {}", videos, ex); - } - } - - /** - * Given a Youtube videoId, return the relevant Youtube Video object. - * @param videoId videoId - * @return List of Video - * @throws IOException - */ - List<Video> getVideoList(String videoId) throws IOException { - VideoListResponse videosListResponse = this.youtube.videos().list("snippet,statistics") - .setId(videoId) - .setKey(config.getApiKey()) - .execute(); - - if (videosListResponse.getItems().size() == 0) { - LOGGER.debug("No Youtube videos found for videoId: {}", videoId); - return new ArrayList<>(); - } - - return videosListResponse.getItems(); - } - - BlockingQueue<StreamsDatum> getDatumQueue() { - return this.datumQueue; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java deleted file mode 100644 index 934a0e5..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.provider; - -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.services.youtube.YouTube; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.youtube.pojo.YoutubeConfiguration; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * Retrieve recent activity from a list of user ids or names. - */ -public class YoutubeUserActivityProvider extends YoutubeProvider { - - public YoutubeUserActivityProvider() { - super(); - } - - public YoutubeUserActivityProvider(YoutubeConfiguration config) { - super(config); - } - - @Override - protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { - return new YoutubeUserActivityCollector(youtube, queue, strategy, userInfo, config); - } - - /** - * To use from command line: - * - * <p/> - * Supply (at least) the following required configuration in application.conf: - * - * <p/> - * youtube.oauth.pathToP12KeyFile - * youtube.oauth.serviceAccountEmailAddress - * youtube.apiKey - * youtube.youtubeUsers - * - * <p/> - * Launch using: - * - * <p/> - * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" - * - * @param args args - * @throws Exception Exception - */ - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File file = new File(configfile); - assert (file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); - YoutubeUserActivityProvider provider = new YoutubeUserActivityProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while (iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - if ( datum.getDocument() instanceof String ) { - json = (String) datum.getDocument(); - } else { - json = mapper.writeValueAsString(datum.getDocument()); - } - outStream.println(json); - } catch (JsonProcessingException ex) { - System.err.println(ex.getMessage()); - } - } - } - while ( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java deleted file mode 100644 index 32a011f..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.serializer; - -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.extensions.ExtensionUtil; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; -import org.apache.streams.pojo.json.Image; -import org.apache.streams.pojo.json.Provider; - -import com.google.api.services.youtube.YouTube; -import com.google.api.services.youtube.model.Channel; -import com.google.api.services.youtube.model.Thumbnail; -import com.google.api.services.youtube.model.ThumbnailDetails; -import com.google.api.services.youtube.model.Video; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -public class YoutubeActivityUtil { - - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeActivityUtil.class); - - /** - * Given a {@link YouTube.Videos} object and an - * {@link Activity} object, fill out the appropriate details - * - * @param video Video - * @param activity Activity - * @throws ActivitySerializerException ActivitySerializerException - */ - public static void updateActivity(Video video, Activity activity, String channelId) throws ActivitySerializerException { - activity.setActor(buildActor(video, video.getSnippet().getChannelId())); - activity.setVerb("post"); - - activity.setId(formatId(activity.getVerb(), Optional.ofNullable(video.getId()).orElse(null))); - - activity.setPublished(new DateTime(video.getSnippet().getPublishedAt().getValue())); - activity.setTitle(video.getSnippet().getTitle()); - activity.setContent(video.getSnippet().getDescription()); - activity.setUrl("https://www.youtube.com/watch?v=" + video.getId()); - - activity.setProvider(getProvider()); - - activity.setObject(buildActivityObject(video)); - - addYoutubeExtensions(activity, video); - } - - - /** - * Given a {@link Channel} object and an - * {@link Activity} object, fill out the appropriate details - * - * @param channel Channel - * @param activity Activity - * @throws ActivitySerializerException ActivitySerializerException - */ - public static void updateActivity(Channel channel, Activity activity, String channelId) throws ActivitySerializerException { - try { - activity.setProvider(getProvider()); - activity.setVerb("post"); - activity.setActor(createActorForChannel(channel)); - Map<String, Object> extensions = new HashMap<>(); - extensions.put("youtube", channel); - activity.setAdditionalProperty("extensions", extensions); - } catch (Throwable throwable) { - throw new ActivitySerializerException(throwable); - } - } - - /** - * createActorForChannel. - * @param channel Channel - * @return $.actor - */ - public static ActivityObject createActorForChannel(Channel channel) { - ActivityObject actor = new ActivityObject(); - // TODO: use generic provider id concatenator - actor.setId("id:youtube:" + channel.getId()); - actor.setSummary(channel.getSnippet().getDescription()); - actor.setDisplayName(channel.getSnippet().getTitle()); - Image image = new Image(); - image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl()); - actor.setImage(image); - actor.setUrl("https://youtube.com/user/" + channel.getId()); - Map<String, Object> actorExtensions = new HashMap<>(); - actorExtensions.put("followers", channel.getStatistics().getSubscriberCount()); - actorExtensions.put("posts", channel.getStatistics().getVideoCount()); - actor.setAdditionalProperty("extensions", actorExtensions); - return actor; - } - - /** - * Given a video object, create the appropriate activity object with a valid image - * (thumbnail) and video URL. - * @param video Video - * @return Activity Object with Video URL and a thumbnail image - */ - private static ActivityObject buildActivityObject(Video video) { - ActivityObject activityObject = new ActivityObject(); - - ThumbnailDetails thumbnailDetails = video.getSnippet().getThumbnails(); - Thumbnail thumbnail = thumbnailDetails.getDefault(); - - if (thumbnail != null) { - Image image = new Image(); - image.setUrl(thumbnail.getUrl()); - image.setHeight(thumbnail.getHeight()); - image.setWidth(thumbnail.getWidth()); - - activityObject.setImage(image); - } - - activityObject.setUrl("https://www.youtube.com/watch?v=" + video.getId()); - activityObject.setObjectType("video"); - - return activityObject; - } - - /** - * Add the Youtube extensions to the Activity object that we're building. - * @param activity Activity - * @param video Video - */ - private static void addYoutubeExtensions(Activity activity, Video video) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - - extensions.put("youtube", video); - - if (video.getStatistics() != null) { - Map<String, Object> likes = new HashMap<>(); - likes.put("count", video.getStatistics().getCommentCount()); - extensions.put("likes", likes); - } - } - - /** - * Build an {@link ActivityObject} actor given the video object - * @param video Video - * @param id id - * @return Actor object - */ - private static ActivityObject buildActor(Video video, String id) { - ActivityObject actor = new ActivityObject(); - - actor.setId("id:youtube:" + id); - actor.setDisplayName(video.getSnippet().getChannelTitle()); - actor.setSummary(video.getSnippet().getDescription()); - actor.setAdditionalProperty("handle", video.getSnippet().getChannelTitle()); - - return actor; - } - - /** - * Gets the common youtube {@link Provider} object - * @return a provider object representing YouTube - */ - public static Provider getProvider() { - Provider provider = new Provider(); - provider.setId("id:providers:youtube"); - provider.setDisplayName("YouTube"); - return provider; - } - - /** - * Formats the ID to conform with the Apache Streams activity ID convention - * @param idparts the parts of the ID to join - * @return a valid Activity ID in format "id:youtube:part1:part2:...partN" - */ - public static String formatId(String... idparts) { - return String.join(":", - Stream.concat(Arrays.stream(new String[]{"id:youtube"}), Arrays.stream(idparts)).collect(Collectors.toList())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java deleted file mode 100644 index e28b4a1..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.serializer; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.api.client.util.DateTime; -import com.google.api.services.youtube.model.Channel; -import com.google.api.services.youtube.model.ChannelContentDetails; -import com.google.api.services.youtube.model.ChannelLocalization; -import com.google.api.services.youtube.model.ChannelSnippet; -import com.google.api.services.youtube.model.ChannelStatistics; -import com.google.api.services.youtube.model.ChannelTopicDetails; -import com.google.api.services.youtube.model.Thumbnail; -import com.google.api.services.youtube.model.ThumbnailDetails; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -/** - * YoutubeChannelDeserializer is a JsonDeserializer for Channel. - */ -public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> { - - @Override - public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException { - JsonNode node = jp.getCodec().readTree(jp); - try { - Channel channel = new Channel(); - if (node.findPath("etag") != null) { - channel.setEtag(node.get("etag").asText()); - } - if (node.findPath("kind") != null) { - channel.setKind(node.get("kind").asText()); - } - channel.setId(node.get("id").asText()); - channel.setTopicDetails(setTopicDetails(node.findValue("topicDetails"))); - channel.setStatistics(setChannelStatistics(node.findValue("statistics"))); - channel.setContentDetails(setContentDetails(node.findValue("contentDetails"))); - channel.setSnippet(setChannelSnippet(node.findValue("snippet"))); - return channel; - } catch (Throwable throwable) { - throw new IOException(throwable); - } - } - - protected ChannelSnippet setChannelSnippet(JsonNode node) { - ChannelSnippet snippet = new ChannelSnippet(); - snippet.setTitle(node.get("title").asText()); - snippet.setDescription(node.get("description").asText()); - snippet.setPublishedAt(new DateTime(node.get("publishedAt").get("value").longValue())); - snippet.setLocalized(setLocalized(node.findValue("localized"))); - snippet.setThumbnails(setThumbnails(node.findValue("thumbnails"))); - return snippet; - } - - protected ThumbnailDetails setThumbnails(JsonNode node) { - ThumbnailDetails details = new ThumbnailDetails(); - if (node == null) { - return details; - } - details.setDefault(new Thumbnail().setUrl(node.get("default").get("url").asText())); - details.setHigh(new Thumbnail().setUrl(node.get("high").get("url").asText())); - details.setMedium(new Thumbnail().setUrl(node.get("medium").get("url").asText())); - return details; - } - - protected ChannelLocalization setLocalized(JsonNode node) { - if (node == null) { - return new ChannelLocalization(); - } - ChannelLocalization localization = new ChannelLocalization(); - localization.setDescription(node.get("description").asText()); - localization.setTitle(node.get("title").asText()); - return localization; - } - - protected ChannelContentDetails setContentDetails(JsonNode node) { - ChannelContentDetails contentDetails = new ChannelContentDetails(); - if (node == null) { - return contentDetails; - } - if (node.findValue("googlePlusUserId") != null) { - contentDetails.setGooglePlusUserId(node.get("googlePlusUserId").asText()); - } - contentDetails.setRelatedPlaylists(setRelatedPlaylists(node.findValue("relatedPlaylists"))); - return contentDetails; - } - - protected ChannelContentDetails.RelatedPlaylists setRelatedPlaylists(JsonNode node) { - ChannelContentDetails.RelatedPlaylists playlists = new ChannelContentDetails.RelatedPlaylists(); - if (node == null) { - return playlists; - } - if (node.findValue("favorites") != null) { - playlists.setFavorites(node.get("favorites").asText()); - } - if (node.findValue("likes") != null) { - playlists.setLikes(node.get("likes").asText()); - } - if (node.findValue("uploads") != null) { - playlists.setUploads(node.get("uploads").asText()); - } - return playlists; - } - - protected ChannelStatistics setChannelStatistics(JsonNode node) { - ChannelStatistics stats = new ChannelStatistics(); - if (node == null) { - return stats; - } - stats.setCommentCount(node.get("commentCount").bigIntegerValue()); - stats.setHiddenSubscriberCount(node.get("hiddenSubscriberCount").asBoolean()); - stats.setSubscriberCount(node.get("subscriberCount").bigIntegerValue()); - stats.setVideoCount(node.get("videoCount").bigIntegerValue()); - stats.setViewCount(node.get("viewCount").bigIntegerValue()); - return stats; - } - - protected ChannelTopicDetails setTopicDetails(JsonNode node) { - ChannelTopicDetails details = new ChannelTopicDetails(); - if (node == null) { - return details; - } - List<String> topicIds = new LinkedList<>(); - for (JsonNode jsonNode : node.get("topicIds")) { - topicIds.add(jsonNode.asText()); - } - details.setTopicIds(topicIds); - return details; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java deleted file mode 100644 index e7645bd..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.serializer; - -import org.apache.streams.jackson.StreamsJacksonMapper; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.api.services.youtube.model.Video; -import com.google.common.base.Preconditions; -import org.apache.commons.lang.StringUtils; - -import java.io.IOException; -import java.util.Objects; - -public class YoutubeEventClassifier { - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private static final String VIDEO_IDENTIFIER = "\"youtube#video\""; - private static final String CHANNEL_IDENTIFIER = "youtube#channel"; - - /** - * detect probable Class of a json String from YouTube. - * @param json json - * @return Class - */ - public static Class detectClass(String json) { - Objects.requireNonNull(json); - Preconditions.checkArgument(StringUtils.isNotEmpty(json)); - - ObjectNode objectNode; - try { - objectNode = (ObjectNode) mapper.readTree(json); - } catch (IOException ex) { - ex.printStackTrace(); - return null; - } - - if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().equals(VIDEO_IDENTIFIER)) { - return Video.class; - } else if (objectNode.findValue("kind") != null && objectNode.get("kind").toString().contains(CHANNEL_IDENTIFIER)) { - return com.google.api.services.youtube.model.Channel.class; - } else { - return ObjectNode.class; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java deleted file mode 100644 index 43fe8c6..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.serializer; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.api.client.util.DateTime; -import com.google.api.services.youtube.model.Thumbnail; -import com.google.api.services.youtube.model.ThumbnailDetails; -import com.google.api.services.youtube.model.Video; -import com.google.api.services.youtube.model.VideoSnippet; -import com.google.api.services.youtube.model.VideoStatistics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class YoutubeVideoDeserializer extends JsonDeserializer<Video> { - - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeVideoDeserializer.class); - - /** - * Because the Youtube Video object contains complex objects within its hierarchy, we have to use - * a custom deserializer - * - * @param jsonParser jsonParser - * @param deserializationContext deserializationContext - * @return The deserialized {@link com.google.api.services.youtube.YouTube.Videos} object - * @throws java.io.IOException IOException - * @throws com.fasterxml.jackson.core.JsonProcessingException JsonProcessingException - */ - @Override - public Video deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - JsonNode node = jsonParser.getCodec().readTree(jsonParser); - Video video = new Video(); - - try { - video.setId(node.get("id").asText()); - video.setEtag(node.get("etag").asText()); - video.setKind(node.get("kind").asText()); - - video.setSnippet(buildSnippet(node)); - video.setStatistics(buildStatistics(node)); - } catch (Exception ex) { - LOGGER.error("Exception while trying to deserialize a Video object: {}", ex); - } - - return video; - } - - /** - * Given the raw JsonNode, construct a video snippet object. - * @param node JsonNode - * @return VideoSnippet - */ - private VideoSnippet buildSnippet(JsonNode node) { - VideoSnippet snippet = new VideoSnippet(); - JsonNode snippetNode = node.get("snippet"); - - snippet.setChannelId(snippetNode.get("channelId").asText()); - snippet.setChannelTitle(snippetNode.get("channelTitle").asText()); - snippet.setDescription(snippetNode.get("description").asText()); - snippet.setTitle(snippetNode.get("title").asText()); - snippet.setPublishedAt(new DateTime(snippetNode.get("publishedAt").get("value").asLong())); - - ThumbnailDetails thumbnailDetails = new ThumbnailDetails(); - for (JsonNode t : snippetNode.get("thumbnails")) { - Thumbnail thumbnail = new Thumbnail(); - - thumbnail.setHeight(t.get("height").asLong()); - thumbnail.setUrl(t.get("url").asText()); - thumbnail.setWidth(t.get("width").asLong()); - - thumbnailDetails.setDefault(thumbnail); - } - - snippet.setThumbnails(thumbnailDetails); - - return snippet; - } - - /** - * Given the raw JsonNode, construct a statistics object. - * @param node JsonNode - * @return VideoStatistics - */ - private VideoStatistics buildStatistics(JsonNode node) { - VideoStatistics statistics = new VideoStatistics(); - JsonNode statisticsNode = node.get("statistics"); - - statistics.setCommentCount(statisticsNode.get("commentCount").bigIntegerValue()); - statistics.setDislikeCount(statisticsNode.get("dislikeCount").bigIntegerValue()); - statistics.setFavoriteCount(statisticsNode.get("favoriteCount").bigIntegerValue()); - statistics.setLikeCount(statisticsNode.get("likeCount").bigIntegerValue()); - statistics.setViewCount(statisticsNode.get("viewCount").bigIntegerValue()); - - return statistics; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java new file mode 100644 index 0000000..ee9ca18 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.processor; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.youtube.serializer.YoutubeActivityUtil; +import org.apache.streams.youtube.serializer.YoutubeChannelDeserializer; +import org.apache.streams.youtube.serializer.YoutubeEventClassifier; +import org.apache.streams.youtube.serializer.YoutubeVideoDeserializer; + +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.api.services.youtube.model.Channel; +import com.google.api.services.youtube.model.Video; +import org.apache.commons.lang.NotImplementedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; + +public class YoutubeTypeConverter implements StreamsProcessor { + + public static final String STREAMS_ID = "YoutubeTypeConverter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeTypeConverter.class); + + private StreamsJacksonMapper mapper; + private Queue<Video> inQueue; + private Queue<StreamsDatum> outQueue; + private int count = 0; + + public YoutubeTypeConverter() {} + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List<StreamsDatum> process(StreamsDatum streamsDatum) { + StreamsDatum result = null; + + try { + Object item = streamsDatum.getDocument(); + + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + Activity activity; + + if (item instanceof String) { + item = deserializeItem(item); + } + + if (item instanceof Video) { + activity = new Activity(); + YoutubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId()); + } else if (item instanceof Channel) { + activity = new Activity(); + YoutubeActivityUtil.updateActivity((Channel)item, activity, null); + } else { + throw new NotImplementedException("Type conversion not implement for type : " + item.getClass().getName()); + } + + if (activity != null) { + result = new StreamsDatum(activity); + count++; + } + } catch (Exception ex) { + LOGGER.error("Exception while converting Video to Activity: {}", ex); + } + + if (result != null) { + List<StreamsDatum> streamsDatumList = new ArrayList<>(); + streamsDatumList.add(result); + return streamsDatumList; + } else { + return new ArrayList<>(); + } + } + + private Object deserializeItem(Object item) { + try { + Class klass = YoutubeEventClassifier.detectClass((String) item); + if (klass.equals(Video.class)) { + item = mapper.readValue((String) item, Video.class); + } else if (klass.equals(Channel.class)) { + item = mapper.readValue((String) item, Channel.class); + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to deserializeItem: {}", ex); + } + + return item; + } + + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); + + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer()); + mapper.registerModule(simpleModule); + simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Channel.class, new YoutubeChannelDeserializer()); + mapper.registerModule(simpleModule); + } + + @Override + public void cleanUp() {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java new file mode 100644 index 0000000..332b1a7 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.provider; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.youtube.YoutubeConfiguration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.youtube.YouTube; +import com.google.api.services.youtube.model.Channel; +import com.google.gson.Gson; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/** + * Collects YoutubeChannelData on behalf of YoutubeChannelProvider. + */ +public class YoutubeChannelDataCollector extends YoutubeDataCollector { + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeChannelDataCollector.class); + private static final String CONTENT = "snippet,contentDetails,statistics,topicDetails"; + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final int MAX_ATTEMPTS = 5; + + private YouTube youTube; + private BlockingQueue<StreamsDatum> queue; + private BackOffStrategy strategy; + private UserInfo userInfo; + private YoutubeConfiguration youtubeConfig; + + /** + * YoutubeChannelDataCollector constructor. + * @param youTube YouTube + * @param queue BlockingQueue of StreamsDatum + * @param strategy BackOffStrategy + * @param userInfo UserInfo + * @param youtubeConfig YoutubeConfiguration + */ + public YoutubeChannelDataCollector( + YouTube youTube, + BlockingQueue<StreamsDatum> queue, + BackOffStrategy strategy, + UserInfo userInfo, + YoutubeConfiguration youtubeConfig) { + this.youTube = youTube; + this.queue = queue; + this.strategy = strategy; + this.userInfo = userInfo; + this.youtubeConfig = youtubeConfig; + } + + @Override + public void run() { + Gson gson = new Gson(); + try { + int attempt = 0; + YouTube.Channels.List channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey()); + boolean tryAgain = false; + do { + try { + List<Channel> channels = channelLists.execute().getItems(); + for (Channel channel : channels) { + String json = gson.toJson(channel); + this.queue.put(new StreamsDatum(json, channel.getId())); + } + if (StringUtils.isEmpty(channelLists.getPageToken())) { + channelLists = null; + } else { + channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setOauthToken(this.youtubeConfig.getApiKey()) + .setPageToken(channelLists.getPageToken()); + } + } catch (GoogleJsonResponseException gjre) { + LOGGER.warn("GoogleJsonResposneException caught : {}", gjre); + tryAgain = backoffAndIdentifyIfRetry(gjre, this.strategy); + ++attempt; + } catch (Throwable throwable) { + LOGGER.warn("Unable to get channel info for id : {}", this.userInfo.getUserId()); + LOGGER.warn("Excpection thrown while trying to get channel info : {}", throwable); + } + } + while ((tryAgain && attempt < MAX_ATTEMPTS) || channelLists != null); + + } catch (Throwable throwable) { + LOGGER.warn(throwable.getMessage()); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java new file mode 100644 index 0000000..d8a09d5 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.provider; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.youtube.YoutubeConfiguration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.youtube.YouTube; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Retrieve recent activity from a list of channels. + */ +public class YoutubeChannelProvider extends YoutubeProvider { + + public YoutubeChannelProvider() { + super(); + } + + public YoutubeChannelProvider(YoutubeConfiguration config) { + super(config); + } + + /** + * To use from command line: + * <p/> + * Supply (at least) the following required configuration in application.conf: + * <p/> + * youtube.oauth.pathToP12KeyFile + * youtube.oauth.serviceAccountEmailAddress + * youtube.apiKey + * youtube.youtubeUsers + * <p/> + * Launch using: + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); + YoutubeChannelProvider provider = new YoutubeChannelProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + for (StreamsDatum datum : provider.readCurrent()) { + String json; + try { + if (datum.getDocument() instanceof String) { + json = (String) datum.getDocument(); + } else { + json = mapper.writeValueAsString(datum.getDocument()); + } + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } + } + while (provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + + @Override + protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { + return new YoutubeChannelDataCollector(youtube, queue, strategy, userInfo, this.config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java new file mode 100644 index 0000000..b05365c --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.provider; + +import org.apache.streams.util.api.requests.backoff.BackOffException; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base Collector for Youtube Data. + */ +public abstract class YoutubeDataCollector implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeDataCollector.class); + + /** + * Looks at the status code of the expception. If the code indicates that the request should be retried, + * it executes the back off strategy and returns true. + * @param gjre + * @param backOff + * @return returns true if the error code of the exception indicates the request should be retried. + */ + public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException { + boolean tryAgain = false; + + switch (gjre.getStatusCode()) { + case 400 : + LOGGER.warn("Bad Request : {}", gjre); + break; + case 401 : + LOGGER.warn("Invalid Credentials : {}", gjre); + break; + case 403 : + LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage()); + backOff.backOff(); + tryAgain = true; + break; + case 503 : + LOGGER.warn("Google Backend Service Error : {}", gjre); + break; + default: + LOGGER.warn("Google Service returned error : {}", gjre); + tryAgain = true; + backOff.backOff(); + break; + } + + return tryAgain; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java new file mode 100644 index 0000000..da21722 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.provider; + +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.util.ComponentUtils; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; +import org.apache.streams.youtube.YoutubeConfiguration; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.youtube.YouTube; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang3.StringUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.math.BigInteger; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class YoutubeProvider implements StreamsProvider { + + private static final String STREAMS_ID = "YoutubeProvider"; + + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class); + private static final int MAX_BATCH_SIZE = 1000; + /** + * Define a global instance of the HTTP transport. + */ + private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport(); + /** + * Define a global instance of the JSON factory. + */ + private static final JsonFactory JSON_FACTORY = new JacksonFactory(); + private static final int DEFAULT_THREAD_POOL_SIZE = 5; + protected YouTube youtube; + protected YoutubeConfiguration config; + // This OAuth 2.0 access scope allows for full read/write access to the + // authenticated user's account. + private List<String> scopes = Collections.singletonList("https://www.googleapis.com/auth/youtube"); + private List<ListenableFuture<Object>> futures = new ArrayList<>(); + private ListeningExecutorService executor; + private BlockingQueue<StreamsDatum> datumQueue; + private AtomicBoolean isComplete; + private boolean previousPullWasEmpty; + + /** + * YoutubeProvider constructor. + * Resolves config from JVM 'youtube'. + */ + public YoutubeProvider() { + this.config = new ComponentConfigurator<>(YoutubeConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube")); + + Objects.requireNonNull(this.config.getApiKey()); + } + + /** + * YoutubeProvider constructor - uses supplied YoutubeConfiguration. + * @param config YoutubeConfiguration + */ + public YoutubeProvider(YoutubeConfiguration config) { + this.config = config; + + Objects.requireNonNull(this.config.getApiKey()); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + try { + this.youtube = createYouTubeClient(); + } catch (IOException | GeneralSecurityException ex) { + LOGGER.error("Failed to created oauth for YouTube : {}", ex); + throw new RuntimeException(ex); + } + + this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE)); + this.datumQueue = new LinkedBlockingQueue<>(1000); + this.isComplete = new AtomicBoolean(false); + this.previousPullWasEmpty = false; + } + + @Override + public void startStream() { + BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); + + for (UserInfo user : this.config.getYoutubeUsers()) { + if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) { + user.setAfterDate(this.config.getDefaultAfterDate()); + } + if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { + user.setBeforeDate(this.config.getDefaultBeforeDate()); + } + + ListenableFuture future = executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.youtube, user)); + futures.add(future); + } + + this.executor.shutdown(); + } + + protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo); + + @Override + public StreamsResultSet readCurrent() { + BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>(); + int batchCount = 0; + while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) { + StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue); + if (datum != null) { + ++batchCount; + ComponentUtils.offerUntilSuccess(datum, batch); + } + } + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @VisibleForTesting + protected YouTube createYouTubeClient() throws IOException, GeneralSecurityException { + GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder() + .setTransport(HTTP_TRANSPORT) + .setJsonFactory(JSON_FACTORY) + .setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress()) + .setServiceAccountScopes(scopes); + + if (StringUtils.isNotEmpty(getConfig().getOauth().getPathToP12KeyFile())) { + File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile()); + if (p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) { + credentialBuilder = credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile); + } + } + Credential credential = credentialBuilder.build(); + return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential).setApplicationName("Streams Application").build(); + } + + @Override + public void cleanUp() { + ComponentUtils.shutdownExecutor(this.executor, 10, 10); + this.executor = null; + } + + public YoutubeConfiguration getConfig() { + return config; + } + + public void setConfig(YoutubeConfiguration config) { + this.config = config; + } + + /** + * Set and overwrite the default before date that was read from the configuration file. + * @param defaultBeforeDate defaultBeforeDate + */ + public void setDefaultBeforeDate(DateTime defaultBeforeDate) { + this.config.setDefaultBeforeDate(defaultBeforeDate); + } + + /** + * Set and overwrite the default after date that was read from teh configuration file. + * @param defaultAfterDate defaultAfterDate + */ + public void setDefaultAfterDate(DateTime defaultAfterDate) { + this.config.setDefaultAfterDate(defaultAfterDate); + } + + /** + * Sets and overwrite the user info from the configuaration file. Uses the defaults before and after dates. + * @param userIds Set of String userIds + */ + public void setUserInfoWithDefaultDates(Set<String> userIds) { + List<UserInfo> youtubeUsers = new LinkedList<>(); + + for (String userId : userIds) { + UserInfo user = new UserInfo(); + user.setUserId(userId); + user.setAfterDate(this.config.getDefaultAfterDate()); + user.setBeforeDate(this.config.getDefaultBeforeDate()); + youtubeUsers.add(user); + } + + this.config.setYoutubeUsers(youtubeUsers); + } + + /** + * Set and overwrite user into from teh configuration file. Only sets after dater. + * @param usersAndAfterDates usersAndAfterDates + */ + public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { + List<UserInfo> youtubeUsers = new LinkedList<>(); + + for (String userId : usersAndAfterDates.keySet()) { + UserInfo user = new UserInfo(); + user.setUserId(userId); + user.setAfterDate(usersAndAfterDates.get(userId)); + youtubeUsers.add(user); + } + + this.config.setYoutubeUsers(youtubeUsers); + } + + @Override + public boolean isRunning() { + if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isComplete.set(true); + LOGGER.info("Exiting"); + } + return !isComplete.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java new file mode 100644 index 0000000..ba3e10f --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.provider; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.youtube.YoutubeConfiguration; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.youtube.YouTube; +import com.google.api.services.youtube.model.ActivityListResponse; +import com.google.api.services.youtube.model.Video; +import com.google.api.services.youtube.model.VideoListResponse; +import com.google.gson.Gson; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/** + * YoutubeDataCollector for YoutubeUserActivityProvider. + */ +public class YoutubeUserActivityCollector extends YoutubeDataCollector { + + /** + * Max results allowed per request + * https://developers.google.com/+/api/latest/activities/list + */ + private static final long MAX_RESULTS = 50; + private static final int MAX_ATTEMPTS = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeUserActivityCollector.class); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + static { //set up mapper for Google Activity Object + SimpleModule simpleModule = new SimpleModule(); + MAPPER.registerModule(simpleModule); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + private Gson gson = new Gson(); + private BlockingQueue<StreamsDatum> datumQueue; + private BackOffStrategy backOff; + private YouTube youtube; + private UserInfo userInfo; + private YoutubeConfiguration config; + + /** + * YoutubeUserActivityCollector constructor. + * @param youtube YouTube + * @param datumQueue BlockingQueue of StreamsDatum + * @param backOff BackOffStrategy + * @param userInfo UserInfo + * @param config YoutubeConfiguration + */ + public YoutubeUserActivityCollector( + YouTube youtube, + BlockingQueue<StreamsDatum> datumQueue, + BackOffStrategy backOff, + UserInfo userInfo, + YoutubeConfiguration config) { + this.youtube = youtube; + this.datumQueue = datumQueue; + this.backOff = backOff; + this.userInfo = userInfo; + this.config = config; + } + + @Override + public void run() { + collectActivityData(); + } + + /** + * Iterate through all users in the Youtube configuration and collect all videos + * associated with their accounts. + */ + protected void collectActivityData() { + try { + YouTube.Activities.List request = null; + ActivityListResponse feed = null; + + boolean tryAgain = false; + int attempt = 0; + DateTime afterDate = userInfo.getAfterDate(); + DateTime beforeDate = userInfo.getBeforeDate(); + + do { + try { + if (request == null) { + request = this.youtube.activities().list("contentDetails") + .setChannelId(userInfo.getUserId()) + .setMaxResults(MAX_RESULTS) + .setKey(config.getApiKey()); + feed = request.execute(); + } else { + request = this.youtube.activities().list("contentDetails") + .setChannelId(userInfo.getUserId()) + .setMaxResults(MAX_RESULTS) + .setPageToken(feed.getNextPageToken()) + .setKey(config.getApiKey()); + feed = request.execute(); + } + this.backOff.reset(); //successful pull reset api. + + processActivityFeed(feed, afterDate, beforeDate); + } catch (GoogleJsonResponseException gjre) { + tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff); + ++attempt; + } + } + while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS); + } catch (Throwable throwable) { + if (throwable instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throwable.printStackTrace(); + LOGGER.warn("Unable to pull Activities for user={} : {}", this.userInfo.getUserId(), throwable); + } + } + + /** + * Given a feed and an after and before date, fetch all relevant user videos + * and place them into the datumQueue for post-processing. + * @param feed ActivityListResponse + * @param afterDate DateTime + * @param beforeDate DateTime + * @throws IOException IOException + * @throws InterruptedException InterruptedException + */ + void processActivityFeed(ActivityListResponse feed, DateTime afterDate, DateTime beforeDate) throws IOException, InterruptedException { + for (com.google.api.services.youtube.model.Activity activity : feed.getItems()) { + try { + List<Video> videos = new ArrayList<>(); + + if (activity.getContentDetails().getUpload() != null) { + videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId())); + } + if (activity.getContentDetails().getPlaylistItem() != null && activity.getContentDetails().getPlaylistItem().getResourceId() != null) { + videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId())); + } + + processVideos(videos, afterDate, beforeDate, activity, feed); + } catch (Exception ex) { + LOGGER.error("Error while trying to process activity: {}, {}", activity, ex); + } + } + } + + /** + * Process a list of Video objects. + * @param videos List of Video + * @param afterDate afterDate + * @param beforeDate beforeDate + * @param activity com.google.api.services.youtube.model.Activity + * @param feed ActivityListResponse + */ + void processVideos(List<Video> videos, DateTime afterDate, DateTime beforeDate, com.google.api.services.youtube.model.Activity activity, ActivityListResponse feed) { + try { + for (Video video : videos) { + if (video != null) { + org.joda.time.DateTime published = new org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue()); + if ((afterDate == null && beforeDate == null) + || (beforeDate == null && afterDate.isBefore(published)) + || (afterDate == null && beforeDate.isAfter(published)) + || ((afterDate != null && beforeDate != null) && (afterDate.isAfter(published) && beforeDate.isBefore(published)))) { + LOGGER.debug("Providing Youtube Activity: {}", MAPPER.writeValueAsString(video)); + this.datumQueue.put(new StreamsDatum(gson.toJson(video), activity.getId())); + } else if (afterDate != null && afterDate.isAfter(published)) { + feed.setNextPageToken(null); // do not fetch next page + break; + } + } + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to process video list: {}, {}", videos, ex); + } + } + + /** + * Given a Youtube videoId, return the relevant Youtube Video object. + * @param videoId videoId + * @return List of Videos + * @throws IOException + */ + List<Video> getVideoList(String videoId) throws IOException { + VideoListResponse videosListResponse = this.youtube.videos().list("snippet,statistics") + .setId(videoId) + .setKey(config.getApiKey()) + .execute(); + + if (videosListResponse.getItems().size() == 0) { + LOGGER.debug("No Youtube videos found for videoId: {}", videoId); + return new ArrayList<>(); + } + + return videosListResponse.getItems(); + } + + BlockingQueue<StreamsDatum> getDatumQueue() { + return this.datumQueue; + } +} \ No newline at end of file
