Repository: incubator-streams Updated Branches: refs/heads/master 006234fb8 -> 104f29b1e
level up youtube provider update google client version (STREAMS-413) use provider without a runtime (STREAMS-403) add main methods to each Provider (STREAMS-411) add real integration tests (STREAMS-415) isolate isRunning and readCurrent (STREAMS-425) Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/762ce8c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/762ce8c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/762ce8c7 Branch: refs/heads/master Commit: 762ce8c7c58aff54d296afbfbc863e586406f915 Parents: 3234cdb Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Thu Oct 13 13:21:59 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Thu Oct 13 13:21:59 2016 -0500 ---------------------------------------------------------------------- .../streams-provider-youtube/pom.xml | 21 +++-- .../provider/YoutubeChannelDataCollector.java | 5 +- .../provider/YoutubeChannelProvider.java | 84 +++++++++++++++++++ .../com/youtube/provider/YoutubeProvider.java | 86 ++++++++++++++------ .../provider/YoutubeUserActivityCollector.java | 5 +- .../provider/YoutubeUserActivityProvider.java | 81 ++++++++++++++++++ .../src/site/markdown/index.md | 23 ++++++ .../providers/YoutubeChannelProviderIT.java | 55 +++++++++++++ .../YoutubeUserActivityProviderIT.java | 55 +++++++++++++ .../resources/YoutubeChannelProviderIT.conf | 22 +++++ .../YoutubeUserActivityProviderIT.conf | 22 +++++ 11 files changed, 423 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/pom.xml b/streams-contrib/streams-provider-youtube/pom.xml index 8d028af..037d4b6 100644 --- a/streams-contrib/streams-provider-youtube/pom.xml +++ b/streams-contrib/streams-provider-youtube/pom.xml @@ -30,10 +30,8 @@ <artifactId>streams-provider-youtube</artifactId> <properties> - <project.youtube.version>v3-rev107-1.18.0-rc</project.youtube.version> - <project.youtube.analytics.version>v1-rev24-1.17.0-rc</project.youtube.analytics.version> - <project.http.version>1.18.0-rc</project.http.version> - <project.oauth.version>1.18.0-rc</project.oauth.version> + <youtube.client.version>v3-rev178-1.22.0</youtube.client.version> + <google.client.version>1.22.0</google.client.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> @@ -53,7 +51,7 @@ <dependency> <groupId>com.google.http-client</groupId> <artifactId>google-http-client-jackson2</artifactId> - <version>${project.http.version}</version> + <version>${google.client.version}</version> <exclusions> <exclusion> <groupId>commons-logging</groupId> @@ -69,7 +67,7 @@ <dependency> <groupId>com.google.oauth-client</groupId> <artifactId>google-oauth-client-jetty</artifactId> - <version>${project.oauth.version}</version> + <version>${google.client.version}</version> </dependency> <dependency> @@ -101,7 +99,7 @@ <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> - <version>1.17.0-rc</version> + <version>${google.client.version}</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> @@ -111,7 +109,7 @@ <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-youtube</artifactId> - <version>v3-rev126-1.19.1</version> + <version>${youtube.client.version}</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> @@ -166,6 +164,13 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <skipTests>${skipITs}</skipTests> + </configuration> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java index e0f9d7b..8e980a7 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java @@ -25,6 +25,7 @@ import com.google.api.client.http.HttpRequest; import com.google.api.services.youtube.YouTube; import com.google.api.services.youtube.model.Channel; import com.google.api.services.youtube.model.ChannelListResponse; +import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.configuration.UserInfo; @@ -63,6 +64,7 @@ public class YoutubeChannelDataCollector extends YoutubeDataCollector{ @Override public void run() { + Gson gson = new Gson(); try { int attempt = 0; YouTube.Channels.List channelLists = this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey()); @@ -71,7 +73,8 @@ public class YoutubeChannelDataCollector extends YoutubeDataCollector{ try { List<Channel> channels = channelLists.execute().getItems(); for (Channel channel : channels) { - this.queue.put(new StreamsDatum(MAPPER.writeValueAsString(channel), channel.getId())); + String json = gson.toJson(channel); + this.queue.put(new StreamsDatum(json, channel.getId())); } if (StringUtils.isEmpty(channelLists.getPageToken())) { channelLists = null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java index 6a5fda1..817c98e 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java @@ -19,20 +19,104 @@ package com.youtube.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.services.youtube.YouTube; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.youtube.pojo.YoutubeConfiguration; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Iterator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; /** + * Retrieve recent activity from a list of channels. * + * To use from command line: + * + * Supply (at least) the following required configuration in application.conf: + * + * youtube.oauth.pathToP12KeyFile + * youtube.oauth.serviceAccountEmailAddress + * youtube.apiKey + * youtube.youtubeUsers + * + * Launch using: + * + * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" */ public class YoutubeChannelProvider extends YoutubeProvider { + public YoutubeChannelProvider() { + super(); + } + + public YoutubeChannelProvider(YoutubeConfiguration config) { + super(config); + } + @Override protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { return new YoutubeChannelDataCollector(youtube, queue, strategy, userInfo, this.config); } + + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); + YoutubeChannelProvider provider = new YoutubeChannelProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + if( datum.getDocument() instanceof String ) + json = (String)datum.getDocument(); + else + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java index b04a781..e16f4bb 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java @@ -19,14 +19,21 @@ package com.youtube.provider; +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.http.HttpTransport; import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.repackaged.com.google.common.base.Strings; import com.google.api.services.youtube.YouTube; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; @@ -41,9 +48,11 @@ import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.security.GeneralSecurityException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,6 +62,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.concurrent.Executors.newSingleThreadExecutor; + public abstract class YoutubeProvider implements StreamsProvider { public static final String STREAMS_ID = "YoutubeProvider"; @@ -60,6 +71,10 @@ public abstract class YoutubeProvider implements StreamsProvider { private final static Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class); private final static int MAX_BATCH_SIZE = 1000; + // This OAuth 2.0 access scope allows for full read/write access to the + // authenticated user's account. + List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube"); + /** * Define a global instance of the HTTP transport. */ @@ -72,7 +87,9 @@ public abstract class YoutubeProvider implements StreamsProvider { private static final int DEFAULT_THREAD_POOL_SIZE = 5; - private ExecutorService executor; + List<ListenableFuture<Object>> futures = new ArrayList<>(); + + private ListeningExecutorService executor; private BlockingQueue<StreamsDatum> datumQueue; private AtomicBoolean isComplete; private boolean previousPullWasEmpty; @@ -99,6 +116,21 @@ public abstract class YoutubeProvider implements StreamsProvider { } @Override + public void prepare(Object configurationObject) { + try { + this.youtube = createYouTubeClient(); + } catch (IOException |GeneralSecurityException e) { + LOGGER.error("Failed to created oauth for YouTube : {}", e); + throw new RuntimeException(e); + } + + this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE)); + this.datumQueue = new LinkedBlockingQueue<>(1000); + this.isComplete = new AtomicBoolean(false); + this.previousPullWasEmpty = false; + } + + @Override public void startStream() { BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); @@ -109,7 +141,9 @@ public abstract class YoutubeProvider implements StreamsProvider { if(this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { user.setBeforeDate(this.config.getDefaultBeforeDate()); } - this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.youtube, user)); + + ListenableFuture future = executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.youtube, user)); + futures.add(future); } this.executor.shutdown(); @@ -128,9 +162,6 @@ public abstract class YoutubeProvider implements StreamsProvider { ComponentUtils.offerUntilSuccess(datum, batch); } } - boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() &&this.executor.isTerminated(); - this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty); - this.previousPullWasEmpty = pullIsEmpty; return new StreamsResultSet(batch); } @@ -144,29 +175,22 @@ public abstract class YoutubeProvider implements StreamsProvider { return null; } - @Override - public boolean isRunning() { - return !this.isComplete.get(); - } - - @Override - public void prepare(Object configurationObject) { - try { - this.youtube = createYouTubeClient(); - } catch (IOException |GeneralSecurityException e) { - LOGGER.error("Failed to created oauth for GPlus : {}", e); - throw new RuntimeException(e); - } - - this.executor = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); - this.datumQueue = new LinkedBlockingQueue<>(1000); - this.isComplete = new AtomicBoolean(false); - this.previousPullWasEmpty = false; - } - @VisibleForTesting protected YouTube createYouTubeClient() throws IOException, GeneralSecurityException { - return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, null).setApplicationName("Streams Application").build(); + GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder() + .setTransport(HTTP_TRANSPORT) + .setJsonFactory(JSON_FACTORY) + .setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress()) + .setServiceAccountScopes(scopes); + + if( !Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) { + File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile()); + if( p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) { + credentialBuilder = credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile); + } + } + Credential credential = credentialBuilder.build(); + return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential).setApplicationName("Streams Application").build(); } @Override @@ -233,4 +257,14 @@ public abstract class YoutubeProvider implements StreamsProvider { this.config.setYoutubeUsers(youtubeUsers); } + + @Override + public boolean isRunning() { + if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isComplete.set(true); + LOGGER.info("Exiting"); + } + return !isComplete.get(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java index 5e7e9fa..76a69f3 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java @@ -28,6 +28,7 @@ import com.google.api.services.youtube.YouTube; import com.google.api.services.youtube.model.ActivityListResponse; import com.google.api.services.youtube.model.Video; import com.google.api.services.youtube.model.VideoListResponse; +import com.google.gson.Gson; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.configuration.UserInfo; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -63,6 +64,8 @@ public class YoutubeUserActivityCollector extends YoutubeDataCollector { private UserInfo userInfo; private YoutubeConfiguration config; + Gson gson = new Gson(); + public YoutubeUserActivityCollector(YouTube youtube, BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo userInfo, YoutubeConfiguration config) { this.youtube = youtube; this.datumQueue = datumQueue; @@ -169,7 +172,7 @@ public class YoutubeUserActivityCollector extends YoutubeDataCollector { || (afterDate == null && beforeDate.isAfter(published)) || ((afterDate != null && beforeDate != null) && (afterDate.isAfter(published) && beforeDate.isBefore(published)))) { LOGGER.debug("Providing Youtube Activity: {}", MAPPER.writeValueAsString(video)); - this.datumQueue.put(new StreamsDatum(MAPPER.writeValueAsString(video), activity.getId())); + this.datumQueue.put(new StreamsDatum(gson.toJson(video), activity.getId())); } else if (afterDate != null && afterDate.isAfter(published)) { feed.setNextPageToken(null); // do not fetch next page break; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java index 99820c1..ed3dc63 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java @@ -19,16 +19,53 @@ package com.youtube.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.services.youtube.YouTube; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.youtube.pojo.YoutubeConfiguration; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Iterator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +/** + * Retrieve recent activity from a list of user ids or names. + * + * To use from command line: + * + * Supply (at least) the following required configuration in application.conf: + * + * youtube.oauth.pathToP12KeyFile + * youtube.oauth.serviceAccountEmailAddress + * youtube.apiKey + * youtube.youtubeUsers + * + * Launch using: + * + * mvn exec:java -Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider -Dexec.args="application.conf tweets.json" + */ public class YoutubeUserActivityProvider extends YoutubeProvider { + public YoutubeUserActivityProvider() { + super(); + } + public YoutubeUserActivityProvider(YoutubeConfiguration config) { super(config); } @@ -37,4 +74,48 @@ public class YoutubeUserActivityProvider extends YoutubeProvider { protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { return new YoutubeUserActivityCollector(youtube, queue, strategy, userInfo, config); } + + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + YoutubeConfiguration config = new ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe, "youtube"); + YoutubeUserActivityProvider provider = new YoutubeUserActivityProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + if( datum.getDocument() instanceof String ) + json = (String)datum.getDocument(); + else + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/site/markdown/index.md b/streams-contrib/streams-provider-youtube/src/site/markdown/index.md index cea90fe..de050e9 100644 --- a/streams-contrib/streams-provider-youtube/src/site/markdown/index.md +++ b/streams-contrib/streams-provider-youtube/src/site/markdown/index.md @@ -22,6 +22,29 @@ This module relies on classes from com.google.apis:google-api-services-youtube | YoutubeChannelProvider [YoutubeChannelProvider.html](apidocs/com/youtube/provider/YoutubeChannelProvider.html "javadoc") | [YoutubeConfiguration.json](com/youtube/YoutubeConfiguration.json "YoutubeConfiguration.json") [YoutubeConfiguration.html](apidocs/com/youtube/YoutubeConfiguration.html "javadoc") | YoutubeUserActivityProvider [YoutubeUserActivityProvider.html](apidocs/com/youtube/provider/YoutubeUserActivityProvider.html "javadoc") | [YoutubeConfiguration.json](com/youtube/YoutubeConfiguration.json "YoutubeConfiguration.json") [YoutubeConfiguration.html](apidocs/com/youtube/YoutubeConfiguration.html "javadoc") +Test: +----- + +Log into admin console +Create project +Enable Data API on project +Create service account +Download p12 file + +Create a local file `youtube.conf` with valid youtube credentials + + youtube { + apiKey = "" + oauth { + serviceAccountEmailAddress = "" + pathToP12KeyFile = "" + } + } + +Build with integration testing enabled, using your credentials + + mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/youtube.conf" + [JavaDocs](apidocs/index.html "JavaDocs") ###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java new file mode 100644 index 0000000..cc1e811 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.test.providers; + +import com.google.common.collect.Lists; +import com.youtube.provider.YoutubeChannelProvider; +import org.junit.Test; + +import java.io.File; +import java.io.FileReader; +import java.io.LineNumberReader; + +/** + * Created by sblackmon on 10/13/16. + */ +public class YoutubeChannelProviderIT { + + @Test + public void testYoutubeChannelProvider() throws Exception { + + String configfile = "./target/test-classes/YoutubeChannelProviderIT.conf"; + String outfile = "./target/test-classes/YoutubeChannelProviderIT.stdout.txt"; + + YoutubeChannelProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); + + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); + + while(outCounter.readLine() != null) {} + + assert (outCounter.getLineNumber() > 1); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java new file mode 100644 index 0000000..e71a6d1 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.youtube.test.providers; + +import com.google.common.collect.Lists; +import com.youtube.provider.YoutubeUserActivityProvider; +import org.junit.Test; + +import java.io.File; +import java.io.FileReader; +import java.io.LineNumberReader; + +/** + * Created by sblackmon on 10/13/16. + */ +public class YoutubeUserActivityProviderIT { + + @Test + public void testYoutubeUserActivityProvider() throws Exception { + + String configfile = "./target/test-classes/YoutubeUserActivityProviderIT.conf"; + String outfile = "./target/test-classes/YoutubeUserActivityProviderIT.stdout.txt"; + + YoutubeUserActivityProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); + + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); + + while(outCounter.readLine() != null) {} + + assert (outCounter.getLineNumber() >= 250); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf new file mode 100644 index 0000000..8565c45 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +youtube { + youtubeUsers = [ + { + userId = "UCLDJ_V9KUOdOFSbDvPfGBxw" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf new file mode 100644 index 0000000..8565c45 --- /dev/null +++ b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +youtube { + youtubeUsers = [ + { + userId = "UCLDJ_V9KUOdOFSbDvPfGBxw" + } + ] +} \ No newline at end of file
