Repository: incubator-streams
Updated Branches:
  refs/heads/master 006234fb8 -> 104f29b1e


level up youtube provider

update google client version (STREAMS-413)
use provider without a runtime (STREAMS-403)
add main methods to each Provider (STREAMS-411)
add real integration tests (STREAMS-415)
isolate isRunning and readCurrent (STREAMS-425)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/762ce8c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/762ce8c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/762ce8c7

Branch: refs/heads/master
Commit: 762ce8c7c58aff54d296afbfbc863e586406f915
Parents: 3234cdb
Author: Steve Blackmon @steveblackmon <[email protected]>
Authored: Thu Oct 13 13:21:59 2016 -0500
Committer: Steve Blackmon @steveblackmon <[email protected]>
Committed: Thu Oct 13 13:21:59 2016 -0500

----------------------------------------------------------------------
 .../streams-provider-youtube/pom.xml            | 21 +++--
 .../provider/YoutubeChannelDataCollector.java   |  5 +-
 .../provider/YoutubeChannelProvider.java        | 84 +++++++++++++++++++
 .../com/youtube/provider/YoutubeProvider.java   | 86 ++++++++++++++------
 .../provider/YoutubeUserActivityCollector.java  |  5 +-
 .../provider/YoutubeUserActivityProvider.java   | 81 ++++++++++++++++++
 .../src/site/markdown/index.md                  | 23 ++++++
 .../providers/YoutubeChannelProviderIT.java     | 55 +++++++++++++
 .../YoutubeUserActivityProviderIT.java          | 55 +++++++++++++
 .../resources/YoutubeChannelProviderIT.conf     | 22 +++++
 .../YoutubeUserActivityProviderIT.conf          | 22 +++++
 11 files changed, 423 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/pom.xml 
b/streams-contrib/streams-provider-youtube/pom.xml
index 8d028af..037d4b6 100644
--- a/streams-contrib/streams-provider-youtube/pom.xml
+++ b/streams-contrib/streams-provider-youtube/pom.xml
@@ -30,10 +30,8 @@
     <artifactId>streams-provider-youtube</artifactId>
 
     <properties>
-        <project.youtube.version>v3-rev107-1.18.0-rc</project.youtube.version>
-        
<project.youtube.analytics.version>v1-rev24-1.17.0-rc</project.youtube.analytics.version>
-        <project.http.version>1.18.0-rc</project.http.version>
-        <project.oauth.version>1.18.0-rc</project.oauth.version>
+        <youtube.client.version>v3-rev178-1.22.0</youtube.client.version>
+        <google.client.version>1.22.0</google.client.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
@@ -53,7 +51,7 @@
         <dependency>
             <groupId>com.google.http-client</groupId>
             <artifactId>google-http-client-jackson2</artifactId>
-            <version>${project.http.version}</version>
+            <version>${google.client.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>
@@ -69,7 +67,7 @@
         <dependency>
             <groupId>com.google.oauth-client</groupId>
             <artifactId>google-oauth-client-jetty</artifactId>
-            <version>${project.oauth.version}</version>
+            <version>${google.client.version}</version>
         </dependency>
 
         <dependency>
@@ -101,7 +99,7 @@
         <dependency>
             <groupId>com.google.api-client</groupId>
             <artifactId>google-api-client</artifactId>
-            <version>1.17.0-rc</version>
+            <version>${google.client.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -111,7 +109,7 @@
         <dependency>
             <groupId>com.google.apis</groupId>
             <artifactId>google-api-services-youtube</artifactId>
-            <version>v3-rev126-1.19.1</version>
+            <version>${youtube.client.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -166,6 +164,13 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <configuration>
+                    <skipTests>${skipITs}</skipTests>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
index e0f9d7b..8e980a7 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelDataCollector.java
@@ -25,6 +25,7 @@ import com.google.api.client.http.HttpRequest;
 import com.google.api.services.youtube.YouTube;
 import com.google.api.services.youtube.model.Channel;
 import com.google.api.services.youtube.model.ChannelListResponse;
+import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
@@ -63,6 +64,7 @@ public class YoutubeChannelDataCollector extends 
YoutubeDataCollector{
 
     @Override
     public void run() {
+        Gson gson = new Gson();
         try {
             int attempt = 0;
              YouTube.Channels.List channelLists = 
this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey());
@@ -71,7 +73,8 @@ public class YoutubeChannelDataCollector extends 
YoutubeDataCollector{
                 try {
                     List<Channel> channels = channelLists.execute().getItems();
                     for (Channel channel : channels) {
-                        this.queue.put(new 
StreamsDatum(MAPPER.writeValueAsString(channel), channel.getId()));
+                        String json = gson.toJson(channel);
+                        this.queue.put(new StreamsDatum(json, 
channel.getId()));
                     }
                     if (StringUtils.isEmpty(channelLists.getPageToken())) {
                         channelLists = null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
index 6a5fda1..817c98e 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeChannelProvider.java
@@ -19,20 +19,104 @@
 
 package com.youtube.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.services.youtube.YouTube;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.youtube.pojo.YoutubeConfiguration;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
+ *  Retrieve recent activity from a list of channels.
  *
+ *  To use from command line:
+ *
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  youtube.oauth.pathToP12KeyFile
+ *  youtube.oauth.serviceAccountEmailAddress
+ *  youtube.apiKey
+ *  youtube.youtubeUsers
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
  */
 public class YoutubeChannelProvider extends YoutubeProvider {
 
+    public YoutubeChannelProvider() {
+        super();
+    }
+
+    public YoutubeChannelProvider(YoutubeConfiguration config) {
+        super(config);
+    }
+
     @Override
     protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
         return new YoutubeChannelDataCollector(youtube, queue, strategy, 
userInfo, this.config);
     }
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+        YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
+        YoutubeChannelProvider provider = new YoutubeChannelProvider(config);
+
+        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    if( datum.getDocument() instanceof String )
+                        json = (String)datum.getDocument();
+                    else
+                        json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
index b04a781..e16f4bb 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
@@ -19,14 +19,21 @@
 
 package com.youtube.provider;
 
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
 import com.google.api.client.http.HttpTransport;
 import com.google.api.client.http.javanet.NetHttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.repackaged.com.google.common.base.Strings;
 import com.google.api.services.youtube.YouTube;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
@@ -41,9 +48,11 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.security.GeneralSecurityException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -53,6 +62,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
 public abstract class YoutubeProvider implements StreamsProvider {
 
     public static final String STREAMS_ID = "YoutubeProvider";
@@ -60,6 +71,10 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
     private final static Logger LOGGER = 
LoggerFactory.getLogger(YoutubeProvider.class);
     private final static int MAX_BATCH_SIZE = 1000;
 
+    // This OAuth 2.0 access scope allows for full read/write access to the
+    // authenticated user's account.
+    List<String> scopes = 
Lists.newArrayList("https://www.googleapis.com/auth/youtube";);
+
     /**
      * Define a global instance of the HTTP transport.
      */
@@ -72,7 +87,9 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
 
     private static final int DEFAULT_THREAD_POOL_SIZE = 5;
 
-    private ExecutorService executor;
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+    private ListeningExecutorService executor;
     private BlockingQueue<StreamsDatum> datumQueue;
     private AtomicBoolean isComplete;
     private boolean previousPullWasEmpty;
@@ -99,6 +116,21 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
     }
 
     @Override
+    public void prepare(Object configurationObject) {
+        try {
+            this.youtube = createYouTubeClient();
+        } catch (IOException |GeneralSecurityException e) {
+            LOGGER.error("Failed to created oauth for YouTube : {}", e);
+            throw new RuntimeException(e);
+        }
+
+        this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE));
+        this.datumQueue = new LinkedBlockingQueue<>(1000);
+        this.isComplete = new AtomicBoolean(false);
+        this.previousPullWasEmpty = false;
+    }
+
+    @Override
     public void startStream() {
         BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
 
@@ -109,7 +141,9 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
             if(this.config.getDefaultBeforeDate() != null && 
user.getBeforeDate() == null) {
                 user.setBeforeDate(this.config.getDefaultBeforeDate());
             }
-            this.executor.submit(getDataCollector(backOffStrategy, 
this.datumQueue, this.youtube, user));
+
+            ListenableFuture future = 
executor.submit(getDataCollector(backOffStrategy, this.datumQueue, 
this.youtube, user));
+            futures.add(future);
         }
 
         this.executor.shutdown();
@@ -128,9 +162,6 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
                 ComponentUtils.offerUntilSuccess(datum, batch);
             }
         }
-        boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() 
&&this.executor.isTerminated();
-        this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
-        this.previousPullWasEmpty = pullIsEmpty;
         return new StreamsResultSet(batch);
     }
 
@@ -144,29 +175,22 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
         return null;
     }
 
-    @Override
-    public boolean isRunning() {
-        return !this.isComplete.get();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        try {
-            this.youtube = createYouTubeClient();
-        } catch (IOException |GeneralSecurityException e) {
-            LOGGER.error("Failed to created oauth for GPlus : {}", e);
-            throw new RuntimeException(e);
-        }
-
-        this.executor = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
-        this.datumQueue = new LinkedBlockingQueue<>(1000);
-        this.isComplete = new AtomicBoolean(false);
-        this.previousPullWasEmpty = false;
-    }
-
     @VisibleForTesting
     protected YouTube createYouTubeClient() throws IOException, 
GeneralSecurityException {
-        return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, 
null).setApplicationName("Streams Application").build();
+        GoogleCredential.Builder credentialBuilder = new 
GoogleCredential.Builder()
+                .setTransport(HTTP_TRANSPORT)
+                .setJsonFactory(JSON_FACTORY)
+                
.setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress())
+                .setServiceAccountScopes(scopes);
+
+        if( 
!Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) {
+            File p12KeyFile = new 
File(getConfig().getOauth().getPathToP12KeyFile());
+            if( p12KeyFile.exists() && p12KeyFile.isFile() && 
p12KeyFile.canRead()) {
+                credentialBuilder = 
credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile);
+            }
+        }
+        Credential credential = credentialBuilder.build();
+        return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, 
credential).setApplicationName("Streams Application").build();
     }
 
     @Override
@@ -233,4 +257,14 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
 
         this.config.setYoutubeUsers(youtubeUsers);
     }
+
+    @Override
+    public boolean isRunning() {
+        if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            isComplete.set(true);
+            LOGGER.info("Exiting");
+        }
+        return !isComplete.get();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
index 5e7e9fa..76a69f3 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
@@ -28,6 +28,7 @@ import com.google.api.services.youtube.YouTube;
 import com.google.api.services.youtube.model.ActivityListResponse;
 import com.google.api.services.youtube.model.Video;
 import com.google.api.services.youtube.model.VideoListResponse;
+import com.google.gson.Gson;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -63,6 +64,8 @@ public class YoutubeUserActivityCollector extends 
YoutubeDataCollector {
     private UserInfo userInfo;
     private YoutubeConfiguration config;
 
+    Gson gson = new Gson();
+
     public YoutubeUserActivityCollector(YouTube youtube, 
BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo 
userInfo, YoutubeConfiguration config) {
         this.youtube = youtube;
         this.datumQueue = datumQueue;
@@ -169,7 +172,7 @@ public class YoutubeUserActivityCollector extends 
YoutubeDataCollector {
                             || (afterDate == null && 
beforeDate.isAfter(published))
                             || ((afterDate != null && beforeDate != null) && 
(afterDate.isAfter(published) && beforeDate.isBefore(published)))) {
                         LOGGER.debug("Providing Youtube Activity: {}", 
MAPPER.writeValueAsString(video));
-                        this.datumQueue.put(new 
StreamsDatum(MAPPER.writeValueAsString(video), activity.getId()));
+                        this.datumQueue.put(new 
StreamsDatum(gson.toJson(video), activity.getId()));
                     } else if (afterDate != null && 
afterDate.isAfter(published)) {
                         feed.setNextPageToken(null); // do not fetch next page
                         break;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
index 99820c1..ed3dc63 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
@@ -19,16 +19,53 @@
 
 package com.youtube.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.services.youtube.YouTube;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
+/**
+ *  Retrieve recent activity from a list of user ids or names.
+ *
+ *  To use from command line:
+ *
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  youtube.oauth.pathToP12KeyFile
+ *  youtube.oauth.serviceAccountEmailAddress
+ *  youtube.apiKey
+ *  youtube.youtubeUsers
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
+ */
 public class YoutubeUserActivityProvider extends YoutubeProvider {
 
+    public YoutubeUserActivityProvider() {
+        super();
+    }
+
     public YoutubeUserActivityProvider(YoutubeConfiguration config) {
         super(config);
     }
@@ -37,4 +74,48 @@ public class YoutubeUserActivityProvider extends 
YoutubeProvider {
     protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
         return new YoutubeUserActivityCollector(youtube, queue, strategy, 
userInfo, config);
     }
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+        YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
+        YoutubeUserActivityProvider provider = new 
YoutubeUserActivityProvider(config);
+
+        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    if( datum.getDocument() instanceof String )
+                        json = (String)datum.getDocument();
+                    else
+                        json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/site/markdown/index.md 
b/streams-contrib/streams-provider-youtube/src/site/markdown/index.md
index cea90fe..de050e9 100644
--- a/streams-contrib/streams-provider-youtube/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-youtube/src/site/markdown/index.md
@@ -22,6 +22,29 @@ This module relies on classes from 
com.google.apis:google-api-services-youtube
 | YoutubeChannelProvider 
[YoutubeChannelProvider.html](apidocs/com/youtube/provider/YoutubeChannelProvider.html
 "javadoc") | [YoutubeConfiguration.json](com/youtube/YoutubeConfiguration.json 
"YoutubeConfiguration.json") 
[YoutubeConfiguration.html](apidocs/com/youtube/YoutubeConfiguration.html 
"javadoc")
 | YoutubeUserActivityProvider 
[YoutubeUserActivityProvider.html](apidocs/com/youtube/provider/YoutubeUserActivityProvider.html
 "javadoc") | [YoutubeConfiguration.json](com/youtube/YoutubeConfiguration.json 
"YoutubeConfiguration.json") 
[YoutubeConfiguration.html](apidocs/com/youtube/YoutubeConfiguration.html 
"javadoc")
 
+Test:
+-----
+
+Log into admin console
+Create project
+Enable Data API on project
+Create service account
+Download p12 file
+
+Create a local file `youtube.conf` with valid youtube credentials
+
+    youtube {
+      apiKey = ""
+      oauth {
+        serviceAccountEmailAddress = ""
+        pathToP12KeyFile = ""
+      }
+    }
+    
+Build with integration testing enabled, using your credentials
+
+    mvn clean test verify -DskipITs=false 
-DargLine="-Dconfig.file=`pwd`/youtube.conf"
+
 [JavaDocs](apidocs/index.html "JavaDocs")
 
 ###### Licensed under Apache License 2.0 - 
http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
new file mode 100644
index 0000000..cc1e811
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.test.providers;
+
+import com.google.common.collect.Lists;
+import com.youtube.provider.YoutubeChannelProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+/**
+ * Created by sblackmon on 10/13/16.
+ */
+public class YoutubeChannelProviderIT {
+
+    @Test
+    public void testYoutubeChannelProvider() throws Exception {
+
+        String configfile = 
"./target/test-classes/YoutubeChannelProviderIT.conf";
+        String outfile = 
"./target/test-classes/YoutubeChannelProviderIT.stdout.txt";
+
+        YoutubeChannelProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() > 1);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
new file mode 100644
index 0000000..e71a6d1
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.test.providers;
+
+import com.google.common.collect.Lists;
+import com.youtube.provider.YoutubeUserActivityProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+/**
+ * Created by sblackmon on 10/13/16.
+ */
+public class YoutubeUserActivityProviderIT {
+
+    @Test
+    public void testYoutubeUserActivityProvider() throws Exception {
+
+        String configfile = 
"./target/test-classes/YoutubeUserActivityProviderIT.conf";
+        String outfile = 
"./target/test-classes/YoutubeUserActivityProviderIT.stdout.txt";
+
+        YoutubeUserActivityProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() >= 250);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf
 
b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf
new file mode 100644
index 0000000..8565c45
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeChannelProviderIT.conf
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+youtube {
+  youtubeUsers = [
+    {
+      userId = "UCLDJ_V9KUOdOFSbDvPfGBxw"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/762ce8c7/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf
 
b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf
new file mode 100644
index 0000000..8565c45
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/test/resources/YoutubeUserActivityProviderIT.conf
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+youtube {
+  youtubeUsers = [
+    {
+      userId = "UCLDJ_V9KUOdOFSbDvPfGBxw"
+    }
+  ]
+}
\ No newline at end of file

Reply via email to