http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
deleted file mode 100644
index 401b836..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.provider;
-
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.javanet.NetHttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.repackaged.com.google.common.base.Strings;
-import com.google.api.services.youtube.YouTube;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.youtube.pojo.YoutubeConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class YoutubeProvider implements StreamsProvider {
-
-  private static final String STREAMS_ID = "YoutubeProvider";
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeProvider.class);
-  private static final int MAX_BATCH_SIZE = 1000;
-
-  // This OAuth 2.0 access scope allows for full read/write access to the
-  // authenticated user's account.
-  private List<String> scopes = 
Collections.singletonList("https://www.googleapis.com/auth/youtube";);
-
-  /**
-   * Define a global instance of the HTTP transport.
-   */
-  private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
-
-  /**
-   * Define a global instance of the JSON factory.
-   */
-  private static final JsonFactory JSON_FACTORY = new JacksonFactory();
-
-  private static final int DEFAULT_THREAD_POOL_SIZE = 5;
-
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
-
-  private ListeningExecutorService executor;
-  private BlockingQueue<StreamsDatum> datumQueue;
-  private AtomicBoolean isComplete;
-  private boolean previousPullWasEmpty;
-
-  protected YouTube youtube;
-  protected YoutubeConfiguration config;
-
-  /**
-   * YoutubeProvider constructor.
-   * Resolves config from JVM 'youtube'.
-   */
-  public YoutubeProvider() {
-    this.config = new ComponentConfigurator<>(YoutubeConfiguration.class)
-        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
-
-    Objects.requireNonNull(this.config.getApiKey());
-  }
-
-  /**
-   * YoutubeProvider constructor - uses supplied YoutubeConfiguration.
-   * @param config YoutubeConfiguration
-   */
-  public YoutubeProvider(YoutubeConfiguration config) {
-    this.config = config;
-
-    Objects.requireNonNull(this.config.getApiKey());
-  }
-
-  @Override
-  public String getId() {
-    return STREAMS_ID;
-  }
-
-  @Override
-  public void prepare(Object configurationObject) {
-    try {
-      this.youtube = createYouTubeClient();
-    } catch (IOException | GeneralSecurityException ex) {
-      LOGGER.error("Failed to created oauth for YouTube : {}", ex);
-      throw new RuntimeException(ex);
-    }
-
-    this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE));
-    this.datumQueue = new LinkedBlockingQueue<>(1000);
-    this.isComplete = new AtomicBoolean(false);
-    this.previousPullWasEmpty = false;
-  }
-
-  @Override
-  public void startStream() {
-    BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
-
-    for (UserInfo user : this.config.getYoutubeUsers()) {
-      if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == 
null) {
-        user.setAfterDate(this.config.getDefaultAfterDate());
-      }
-      if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() 
== null) {
-        user.setBeforeDate(this.config.getDefaultBeforeDate());
-      }
-
-      ListenableFuture future = 
executor.submit(getDataCollector(backOffStrategy, this.datumQueue, 
this.youtube, user));
-      futures.add(future);
-    }
-
-    this.executor.shutdown();
-  }
-
-  protected abstract Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo);
-
-  @Override
-  public StreamsResultSet readCurrent() {
-    BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
-    int batchCount = 0;
-    while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
-      StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
-      if (datum != null) {
-        ++batchCount;
-        ComponentUtils.offerUntilSuccess(datum, batch);
-      }
-    }
-    return new StreamsResultSet(batch);
-  }
-
-  @Override
-  public StreamsResultSet readNew(BigInteger sequence) {
-    return null;
-  }
-
-  @Override
-  public StreamsResultSet readRange(DateTime start, DateTime end) {
-    return null;
-  }
-
-  @VisibleForTesting
-  protected YouTube createYouTubeClient() throws IOException, 
GeneralSecurityException {
-    GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder()
-        .setTransport(HTTP_TRANSPORT)
-        .setJsonFactory(JSON_FACTORY)
-        
.setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress())
-        .setServiceAccountScopes(scopes);
-
-    if ( !Strings.isNullOrEmpty(getConfig().getOauth().getPathToP12KeyFile())) 
{
-      File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile());
-      if ( p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) 
{
-        credentialBuilder = 
credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile);
-      }
-    }
-    Credential credential = credentialBuilder.build();
-    return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, 
credential).setApplicationName("Streams Application").build();
-  }
-
-  @Override
-  public void cleanUp() {
-    ComponentUtils.shutdownExecutor(this.executor, 10, 10);
-    this.executor = null;
-  }
-
-  public YoutubeConfiguration getConfig() {
-    return config;
-  }
-
-  public void setConfig(YoutubeConfiguration config) {
-    this.config = config;
-  }
-
-  /**
-   * Set and overwrite the default before date that was read from the 
configuration file.
-   * @param defaultBeforeDate defaultBeforeDate
-   */
-  public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
-    this.config.setDefaultBeforeDate(defaultBeforeDate);
-  }
-
-  /**
-   * Set and overwrite the default after date that was read from teh 
configuration file.
-   * @param defaultAfterDate defaultAfterDate
-   */
-  public void setDefaultAfterDate(DateTime defaultAfterDate) {
-    this.config.setDefaultAfterDate(defaultAfterDate);
-  }
-
-  /**
-   * Sets and overwrite the user info from the configuaration file.  Uses the 
defaults before and after dates.
-   * @param userIds Set of String userIds
-   */
-  public void setUserInfoWithDefaultDates(Set<String> userIds) {
-    List<UserInfo> youtubeUsers = new LinkedList<>();
-
-    for (String userId : userIds) {
-      UserInfo user = new UserInfo();
-      user.setUserId(userId);
-      user.setAfterDate(this.config.getDefaultAfterDate());
-      user.setBeforeDate(this.config.getDefaultBeforeDate());
-      youtubeUsers.add(user);
-    }
-
-    this.config.setYoutubeUsers(youtubeUsers);
-  }
-
-  /**
-   * Set and overwrite user into from teh configuration file. Only sets after 
dater.
-   * @param usersAndAfterDates usersAndAfterDates
-   */
-  public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
-    List<UserInfo> youtubeUsers = new LinkedList<>();
-
-    for (String userId : usersAndAfterDates.keySet()) {
-      UserInfo user = new UserInfo();
-      user.setUserId(userId);
-      user.setAfterDate(usersAndAfterDates.get(userId));
-      youtubeUsers.add(user);
-    }
-
-    this.config.setYoutubeUsers(youtubeUsers);
-  }
-
-  @Override
-  public boolean isRunning() {
-    if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
-      isComplete.set(true);
-      LOGGER.info("Exiting");
-    }
-    return !isComplete.get();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
deleted file mode 100644
index 518a762..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityCollector.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.services.youtube.YouTube;
-import com.google.api.services.youtube.model.ActivityListResponse;
-import com.google.api.services.youtube.model.Video;
-import com.google.api.services.youtube.model.VideoListResponse;
-import com.google.gson.Gson;
-import org.apache.youtube.pojo.YoutubeConfiguration;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * YoutubeDataCollector for YoutubeUserActivityProvider.
- */
-public class YoutubeUserActivityCollector extends YoutubeDataCollector {
-
-  /**
-   * Max results allowed per request
-   * https://developers.google.com/+/api/latest/activities/list
-   */
-  private static final long MAX_RESULTS = 50;
-  private static final int MAX_ATTEMPTS = 5;
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeUserActivityCollector.class);
-  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-
-  static { //set up mapper for Google Activity Object
-    SimpleModule simpleModule = new SimpleModule();
-    MAPPER.registerModule(simpleModule);
-    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-  }
-
-  private BlockingQueue<StreamsDatum> datumQueue;
-  private BackOffStrategy backOff;
-  private YouTube youtube;
-  private UserInfo userInfo;
-  private YoutubeConfiguration config;
-
-  Gson gson = new Gson();
-
-  /**
-   * YoutubeUserActivityCollector constructor.
-   * @param youtube YouTube
-   * @param datumQueue BlockingQueue of StreamsDatum
-   * @param backOff BackOffStrategy
-   * @param userInfo UserInfo
-   * @param config YoutubeConfiguration
-   */
-  public YoutubeUserActivityCollector(
-      YouTube youtube,
-      BlockingQueue<StreamsDatum> datumQueue,
-      BackOffStrategy backOff,
-      UserInfo userInfo,
-      YoutubeConfiguration config) {
-    this.youtube = youtube;
-    this.datumQueue = datumQueue;
-    this.backOff = backOff;
-    this.userInfo = userInfo;
-    this.config = config;
-  }
-
-  @Override
-  public void run() {
-    collectActivityData();
-  }
-
-  /**
-   * Iterate through all users in the Youtube configuration and collect all 
videos
-   * associated with their accounts.
-   */
-  protected void collectActivityData() {
-    try {
-      YouTube.Activities.List request = null;
-      ActivityListResponse feed = null;
-
-      boolean tryAgain = false;
-      int attempt = 0;
-      DateTime afterDate = userInfo.getAfterDate();
-      DateTime beforeDate = userInfo.getBeforeDate();
-
-      do {
-        try {
-          if (request == null) {
-            request = this.youtube.activities().list("contentDetails")
-                .setChannelId(userInfo.getUserId())
-                .setMaxResults(MAX_RESULTS)
-                .setKey(config.getApiKey());
-            feed = request.execute();
-          } else {
-            request = this.youtube.activities().list("contentDetails")
-                .setChannelId(userInfo.getUserId())
-                .setMaxResults(MAX_RESULTS)
-                .setPageToken(feed.getNextPageToken())
-                .setKey(config.getApiKey());
-            feed = request.execute();
-          }
-          this.backOff.reset(); //successful pull reset api.
-
-          processActivityFeed(feed, afterDate, beforeDate);
-        } catch (GoogleJsonResponseException gjre) {
-          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
-          ++attempt;
-        }
-      }
-      while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) 
&& attempt < MAX_ATTEMPTS);
-    } catch (Throwable throwable) {
-      if (throwable instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throwable.printStackTrace();
-      LOGGER.warn("Unable to pull Activities for user={} : 
{}",this.userInfo.getUserId(), throwable);
-    }
-  }
-
-  /**
-   * Given a feed and an after and before date, fetch all relevant user videos
-   * and place them into the datumQueue for post-processing.
-   * @param feed ActivityListResponse
-   * @param afterDate DateTime
-   * @param beforeDate DateTime
-   * @throws IOException IOException
-   * @throws InterruptedException InterruptedException
-   */
-  void processActivityFeed(ActivityListResponse feed, DateTime afterDate, 
DateTime beforeDate) throws IOException, InterruptedException {
-    for (com.google.api.services.youtube.model.Activity activity : 
feed.getItems()) {
-      try {
-        List<Video> videos = new ArrayList<>();
-
-        if (activity.getContentDetails().getUpload() != null) {
-          
videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId()));
-        }
-        if (activity.getContentDetails().getPlaylistItem() != null && 
activity.getContentDetails().getPlaylistItem().getResourceId() != null) {
-          
videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId()));
-        }
-
-        processVideos(videos, afterDate, beforeDate, activity, feed);
-      } catch (Exception ex) {
-        LOGGER.error("Error while trying to process activity: {}, {}", 
activity, ex);
-      }
-    }
-  }
-
-  /**
-   * Process a list of Video objects.
-   * @param videos List of Video
-   * @param afterDate afterDate
-   * @param beforeDate beforeDate
-   * @param activity com.google.api.services.youtube.model.Activity
-   * @param feed ActivityListResponse
-   */
-  void processVideos(List<Video> videos, DateTime afterDate, DateTime 
beforeDate, com.google.api.services.youtube.model.Activity activity, 
ActivityListResponse feed) {
-    try {
-      for (Video video : videos) {
-        if (video != null) {
-          org.joda.time.DateTime published = new 
org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue());
-          if ((afterDate == null && beforeDate == null)
-              || (beforeDate == null && afterDate.isBefore(published))
-              || (afterDate == null && beforeDate.isAfter(published))
-              || ((afterDate != null && beforeDate != null) && 
(afterDate.isAfter(published) && beforeDate.isBefore(published)))) {
-            LOGGER.debug("Providing Youtube Activity: {}", 
MAPPER.writeValueAsString(video));
-            this.datumQueue.put(new StreamsDatum(gson.toJson(video), 
activity.getId()));
-          } else if (afterDate != null && afterDate.isAfter(published)) {
-            feed.setNextPageToken(null); // do not fetch next page
-            break;
-          }
-        }
-      }
-    } catch (Exception ex) {
-      LOGGER.error("Exception while trying to process video list: {}, {}", 
videos, ex);
-    }
-  }
-
-  /**
-   * Given a Youtube videoId, return the relevant Youtube Video object.
-   * @param videoId videoId
-   * @return List of Video
-   * @throws IOException
-   */
-  List<Video> getVideoList(String videoId) throws IOException {
-    VideoListResponse videosListResponse = 
this.youtube.videos().list("snippet,statistics")
-        .setId(videoId)
-        .setKey(config.getApiKey())
-        .execute();
-
-    if (videosListResponse.getItems().size() == 0) {
-      LOGGER.debug("No Youtube videos found for videoId: {}", videoId);
-      return new ArrayList<>();
-    }
-
-    return videosListResponse.getItems();
-  }
-
-  BlockingQueue<StreamsDatum> getDatumQueue() {
-    return this.datumQueue;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
deleted file mode 100644
index 934a0e5..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeUserActivityProvider.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.provider;
-
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.youtube.YouTube;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.youtube.pojo.YoutubeConfiguration;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- *  Retrieve recent activity from a list of user ids or names.
- */
-public class YoutubeUserActivityProvider extends YoutubeProvider {
-
-  public YoutubeUserActivityProvider() {
-    super();
-  }
-
-  public YoutubeUserActivityProvider(YoutubeConfiguration config) {
-    super(config);
-  }
-
-  @Override
-  protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
-    return new YoutubeUserActivityCollector(youtube, queue, strategy, 
userInfo, config);
-  }
-
-  /**
-   * To use from command line:
-   *
-   * <p/>
-   * Supply (at least) the following required configuration in 
application.conf:
-   *
-   * <p/>
-   * youtube.oauth.pathToP12KeyFile
-   * youtube.oauth.serviceAccountEmailAddress
-   * youtube.apiKey
-   * youtube.youtubeUsers
-   *
-   * <p/>
-   * Launch using:
-   *
-   * <p/>
-   * mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
-   *
-   * @param args args
-   * @throws Exception Exception
-   */
-  public static void main(String[] args) throws Exception {
-
-    Preconditions.checkArgument(args.length >= 2);
-
-    String configfile = args[0];
-    String outfile = args[1];
-
-    Config reference = ConfigFactory.load();
-    File file = new File(configfile);
-    assert (file.exists());
-    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-
-    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
-    YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
-    YoutubeUserActivityProvider provider = new 
YoutubeUserActivityProvider(config);
-
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
-        String json;
-        try {
-          if ( datum.getDocument() instanceof String ) {
-            json = (String) datum.getDocument();
-          } else {
-            json = mapper.writeValueAsString(datum.getDocument());
-          }
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
-      }
-    }
-    while ( provider.isRunning());
-    provider.cleanUp();
-    outStream.flush();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
deleted file mode 100644
index 32a011f..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.serializer;
-
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Image;
-import org.apache.streams.pojo.json.Provider;
-
-import com.google.api.services.youtube.YouTube;
-import com.google.api.services.youtube.model.Channel;
-import com.google.api.services.youtube.model.Thumbnail;
-import com.google.api.services.youtube.model.ThumbnailDetails;
-import com.google.api.services.youtube.model.Video;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-public class YoutubeActivityUtil {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeActivityUtil.class);
-
-  /**
-   * Given a {@link YouTube.Videos} object and an
-   * {@link Activity} object, fill out the appropriate details
-   *
-   * @param video Video
-   * @param activity Activity
-   * @throws ActivitySerializerException ActivitySerializerException
-   */
-  public static void updateActivity(Video video, Activity activity, String 
channelId) throws ActivitySerializerException {
-    activity.setActor(buildActor(video, video.getSnippet().getChannelId()));
-    activity.setVerb("post");
-
-    activity.setId(formatId(activity.getVerb(), 
Optional.ofNullable(video.getId()).orElse(null)));
-
-    activity.setPublished(new 
DateTime(video.getSnippet().getPublishedAt().getValue()));
-    activity.setTitle(video.getSnippet().getTitle());
-    activity.setContent(video.getSnippet().getDescription());
-    activity.setUrl("https://www.youtube.com/watch?v="; + video.getId());
-
-    activity.setProvider(getProvider());
-
-    activity.setObject(buildActivityObject(video));
-
-    addYoutubeExtensions(activity, video);
-  }
-
-
-  /**
-   * Given a {@link Channel} object and an
-   * {@link Activity} object, fill out the appropriate details
-   *
-   * @param channel Channel
-   * @param activity Activity
-   * @throws ActivitySerializerException ActivitySerializerException
-   */
-  public static void updateActivity(Channel channel, Activity activity, String 
channelId) throws ActivitySerializerException {
-    try {
-      activity.setProvider(getProvider());
-      activity.setVerb("post");
-      activity.setActor(createActorForChannel(channel));
-      Map<String, Object> extensions = new HashMap<>();
-      extensions.put("youtube", channel);
-      activity.setAdditionalProperty("extensions", extensions);
-    } catch (Throwable throwable) {
-      throw new ActivitySerializerException(throwable);
-    }
-  }
-
-  /**
-   * createActorForChannel.
-   * @param channel Channel
-   * @return $.actor
-   */
-  public static ActivityObject createActorForChannel(Channel channel) {
-    ActivityObject actor = new ActivityObject();
-    // TODO: use generic provider id concatenator
-    actor.setId("id:youtube:" + channel.getId());
-    actor.setSummary(channel.getSnippet().getDescription());
-    actor.setDisplayName(channel.getSnippet().getTitle());
-    Image image = new Image();
-    image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl());
-    actor.setImage(image);
-    actor.setUrl("https://youtube.com/user/"; + channel.getId());
-    Map<String, Object> actorExtensions = new HashMap<>();
-    actorExtensions.put("followers", 
channel.getStatistics().getSubscriberCount());
-    actorExtensions.put("posts", channel.getStatistics().getVideoCount());
-    actor.setAdditionalProperty("extensions", actorExtensions);
-    return actor;
-  }
-
-  /**
-   * Given a video object, create the appropriate activity object with a valid 
image
-   * (thumbnail) and video URL.
-   * @param video Video
-   * @return Activity Object with Video URL and a thumbnail image
-   */
-  private static ActivityObject buildActivityObject(Video video) {
-    ActivityObject activityObject = new ActivityObject();
-
-    ThumbnailDetails thumbnailDetails = video.getSnippet().getThumbnails();
-    Thumbnail thumbnail = thumbnailDetails.getDefault();
-
-    if (thumbnail != null) {
-      Image image = new Image();
-      image.setUrl(thumbnail.getUrl());
-      image.setHeight(thumbnail.getHeight());
-      image.setWidth(thumbnail.getWidth());
-
-      activityObject.setImage(image);
-    }
-
-    activityObject.setUrl("https://www.youtube.com/watch?v="; + video.getId());
-    activityObject.setObjectType("video");
-
-    return activityObject;
-  }
-
-  /**
-   * Add the Youtube extensions to the Activity object that we're building.
-   * @param activity Activity
-   * @param video Video
-   */
-  private static void addYoutubeExtensions(Activity activity, Video video) {
-    Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
-
-    extensions.put("youtube", video);
-
-    if (video.getStatistics() != null) {
-      Map<String, Object> likes = new HashMap<>();
-      likes.put("count", video.getStatistics().getCommentCount());
-      extensions.put("likes", likes);
-    }
-  }
-
-  /**
-   * Build an {@link ActivityObject} actor given the video object
-   * @param video Video
-   * @param id id
-   * @return Actor object
-   */
-  private static ActivityObject buildActor(Video video, String id) {
-    ActivityObject actor = new ActivityObject();
-
-    actor.setId("id:youtube:" + id);
-    actor.setDisplayName(video.getSnippet().getChannelTitle());
-    actor.setSummary(video.getSnippet().getDescription());
-    actor.setAdditionalProperty("handle", 
video.getSnippet().getChannelTitle());
-
-    return actor;
-  }
-
-  /**
-   * Gets the common youtube {@link Provider} object
-   * @return a provider object representing YouTube
-   */
-  public static Provider getProvider() {
-    Provider provider = new Provider();
-    provider.setId("id:providers:youtube");
-    provider.setDisplayName("YouTube");
-    return provider;
-  }
-
-  /**
-   * Formats the ID to conform with the Apache Streams activity ID convention
-   * @param idparts the parts of the ID to join
-   * @return a valid Activity ID in format "id:youtube:part1:part2:...partN"
-   */
-  public static String formatId(String... idparts) {
-    return String.join(":",
-        Stream.concat(Arrays.stream(new String[]{"id:youtube"}), 
Arrays.stream(idparts)).collect(Collectors.toList()));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
deleted file mode 100644
index e28b4a1..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.serializer;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.youtube.model.Channel;
-import com.google.api.services.youtube.model.ChannelContentDetails;
-import com.google.api.services.youtube.model.ChannelLocalization;
-import com.google.api.services.youtube.model.ChannelSnippet;
-import com.google.api.services.youtube.model.ChannelStatistics;
-import com.google.api.services.youtube.model.ChannelTopicDetails;
-import com.google.api.services.youtube.model.Thumbnail;
-import com.google.api.services.youtube.model.ThumbnailDetails;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * YoutubeChannelDeserializer is a JsonDeserializer for Channel.
- */
-public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> {
-
-  @Override
-  public Channel deserialize(JsonParser jp, DeserializationContext ctxt) 
throws IOException {
-    JsonNode node = jp.getCodec().readTree(jp);
-    try {
-      Channel channel = new Channel();
-      if (node.findPath("etag") != null) {
-        channel.setEtag(node.get("etag").asText());
-      }
-      if (node.findPath("kind") != null) {
-        channel.setKind(node.get("kind").asText());
-      }
-      channel.setId(node.get("id").asText());
-      channel.setTopicDetails(setTopicDetails(node.findValue("topicDetails")));
-      
channel.setStatistics(setChannelStatistics(node.findValue("statistics")));
-      
channel.setContentDetails(setContentDetails(node.findValue("contentDetails")));
-      channel.setSnippet(setChannelSnippet(node.findValue("snippet")));
-      return channel;
-    } catch (Throwable throwable) {
-      throw new IOException(throwable);
-    }
-  }
-
-  protected ChannelSnippet setChannelSnippet(JsonNode node) {
-    ChannelSnippet snippet = new ChannelSnippet();
-    snippet.setTitle(node.get("title").asText());
-    snippet.setDescription(node.get("description").asText());
-    snippet.setPublishedAt(new 
DateTime(node.get("publishedAt").get("value").longValue()));
-    snippet.setLocalized(setLocalized(node.findValue("localized")));
-    snippet.setThumbnails(setThumbnails(node.findValue("thumbnails")));
-    return snippet;
-  }
-
-  protected ThumbnailDetails setThumbnails(JsonNode node) {
-    ThumbnailDetails details = new ThumbnailDetails();
-    if (node == null) {
-      return details;
-    }
-    details.setDefault(new 
Thumbnail().setUrl(node.get("default").get("url").asText()));
-    details.setHigh(new 
Thumbnail().setUrl(node.get("high").get("url").asText()));
-    details.setMedium(new 
Thumbnail().setUrl(node.get("medium").get("url").asText()));
-    return details;
-  }
-
-  protected ChannelLocalization setLocalized(JsonNode node) {
-    if (node == null) {
-      return new ChannelLocalization();
-    }
-    ChannelLocalization localization = new ChannelLocalization();
-    localization.setDescription(node.get("description").asText());
-    localization.setTitle(node.get("title").asText());
-    return localization;
-  }
-
-  protected ChannelContentDetails setContentDetails(JsonNode node) {
-    ChannelContentDetails contentDetails = new ChannelContentDetails();
-    if (node == null) {
-      return contentDetails;
-    }
-    if (node.findValue("googlePlusUserId") != null) {
-      
contentDetails.setGooglePlusUserId(node.get("googlePlusUserId").asText());
-    }
-    
contentDetails.setRelatedPlaylists(setRelatedPlaylists(node.findValue("relatedPlaylists")));
-    return contentDetails;
-  }
-
-  protected ChannelContentDetails.RelatedPlaylists 
setRelatedPlaylists(JsonNode node) {
-    ChannelContentDetails.RelatedPlaylists playlists = new 
ChannelContentDetails.RelatedPlaylists();
-    if (node == null) {
-      return playlists;
-    }
-    if (node.findValue("favorites") != null) {
-      playlists.setFavorites(node.get("favorites").asText());
-    }
-    if (node.findValue("likes") != null) {
-      playlists.setLikes(node.get("likes").asText());
-    }
-    if (node.findValue("uploads") != null) {
-      playlists.setUploads(node.get("uploads").asText());
-    }
-    return playlists;
-  }
-
-  protected ChannelStatistics setChannelStatistics(JsonNode node) {
-    ChannelStatistics stats = new ChannelStatistics();
-    if (node == null) {
-      return stats;
-    }
-    stats.setCommentCount(node.get("commentCount").bigIntegerValue());
-    
stats.setHiddenSubscriberCount(node.get("hiddenSubscriberCount").asBoolean());
-    stats.setSubscriberCount(node.get("subscriberCount").bigIntegerValue());
-    stats.setVideoCount(node.get("videoCount").bigIntegerValue());
-    stats.setViewCount(node.get("viewCount").bigIntegerValue());
-    return stats;
-  }
-
-  protected ChannelTopicDetails setTopicDetails(JsonNode node) {
-    ChannelTopicDetails details = new ChannelTopicDetails();
-    if (node == null) {
-      return details;
-    }
-    List<String> topicIds = new LinkedList<>();
-    for (JsonNode jsonNode : node.get("topicIds")) {
-      topicIds.add(jsonNode.asText());
-    }
-    details.setTopicIds(topicIds);
-    return  details;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
deleted file mode 100644
index e7645bd..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.serializer;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.api.services.youtube.model.Video;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-import java.util.Objects;
-
-public class YoutubeEventClassifier {
-  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-  private static final String VIDEO_IDENTIFIER = "\"youtube#video\"";
-  private static final String CHANNEL_IDENTIFIER = "youtube#channel";
-
-  /**
-   * detect probable Class of a json String from YouTube.
-   * @param json json
-   * @return Class
-   */
-  public static Class detectClass(String json) {
-    Objects.requireNonNull(json);
-    Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
-    ObjectNode objectNode;
-    try {
-      objectNode = (ObjectNode) mapper.readTree(json);
-    } catch (IOException ex) {
-      ex.printStackTrace();
-      return null;
-    }
-
-    if (objectNode.findValue("kind") != null && 
objectNode.get("kind").toString().equals(VIDEO_IDENTIFIER)) {
-      return Video.class;
-    } else if (objectNode.findValue("kind") != null && 
objectNode.get("kind").toString().contains(CHANNEL_IDENTIFIER)) {
-      return com.google.api.services.youtube.model.Channel.class;
-    } else {
-      return ObjectNode.class;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
deleted file mode 100644
index 43fe8c6..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeVideoDeserializer.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.serializer;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.youtube.model.Thumbnail;
-import com.google.api.services.youtube.model.ThumbnailDetails;
-import com.google.api.services.youtube.model.Video;
-import com.google.api.services.youtube.model.VideoSnippet;
-import com.google.api.services.youtube.model.VideoStatistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class YoutubeVideoDeserializer extends JsonDeserializer<Video> {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeVideoDeserializer.class);
-
-  /**
-   * Because the Youtube Video object contains complex objects within its 
hierarchy, we have to use
-   * a custom deserializer
-   *
-   * @param jsonParser jsonParser
-   * @param deserializationContext deserializationContext
-   * @return The deserialized {@link 
com.google.api.services.youtube.YouTube.Videos} object
-   * @throws java.io.IOException IOException
-   * @throws com.fasterxml.jackson.core.JsonProcessingException 
JsonProcessingException
-   */
-  @Override
-  public Video deserialize(JsonParser jsonParser, DeserializationContext 
deserializationContext) throws IOException, JsonProcessingException {
-    JsonNode node = jsonParser.getCodec().readTree(jsonParser);
-    Video video = new Video();
-
-    try {
-      video.setId(node.get("id").asText());
-      video.setEtag(node.get("etag").asText());
-      video.setKind(node.get("kind").asText());
-
-      video.setSnippet(buildSnippet(node));
-      video.setStatistics(buildStatistics(node));
-    } catch (Exception ex) {
-      LOGGER.error("Exception while trying to deserialize a Video object: {}", 
ex);
-    }
-
-    return video;
-  }
-
-  /**
-   * Given the raw JsonNode, construct a video snippet object.
-   * @param node JsonNode
-   * @return VideoSnippet
-   */
-  private VideoSnippet buildSnippet(JsonNode node) {
-    VideoSnippet snippet = new VideoSnippet();
-    JsonNode snippetNode = node.get("snippet");
-
-    snippet.setChannelId(snippetNode.get("channelId").asText());
-    snippet.setChannelTitle(snippetNode.get("channelTitle").asText());
-    snippet.setDescription(snippetNode.get("description").asText());
-    snippet.setTitle(snippetNode.get("title").asText());
-    snippet.setPublishedAt(new 
DateTime(snippetNode.get("publishedAt").get("value").asLong()));
-
-    ThumbnailDetails thumbnailDetails = new ThumbnailDetails();
-    for (JsonNode t : snippetNode.get("thumbnails")) {
-      Thumbnail thumbnail = new Thumbnail();
-
-      thumbnail.setHeight(t.get("height").asLong());
-      thumbnail.setUrl(t.get("url").asText());
-      thumbnail.setWidth(t.get("width").asLong());
-
-      thumbnailDetails.setDefault(thumbnail);
-    }
-
-    snippet.setThumbnails(thumbnailDetails);
-
-    return snippet;
-  }
-
-  /**
-   * Given the raw JsonNode, construct a statistics object.
-   * @param node JsonNode
-   * @return VideoStatistics
-   */
-  private VideoStatistics buildStatistics(JsonNode node) {
-    VideoStatistics statistics = new VideoStatistics();
-    JsonNode statisticsNode = node.get("statistics");
-
-    
statistics.setCommentCount(statisticsNode.get("commentCount").bigIntegerValue());
-    
statistics.setDislikeCount(statisticsNode.get("dislikeCount").bigIntegerValue());
-    
statistics.setFavoriteCount(statisticsNode.get("favoriteCount").bigIntegerValue());
-    statistics.setLikeCount(statisticsNode.get("likeCount").bigIntegerValue());
-    statistics.setViewCount(statisticsNode.get("viewCount").bigIntegerValue());
-
-    return statistics;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java
new file mode 100644
index 0000000..ee9ca18
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/processor/YoutubeTypeConverter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.processor;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.youtube.serializer.YoutubeActivityUtil;
+import org.apache.streams.youtube.serializer.YoutubeChannelDeserializer;
+import org.apache.streams.youtube.serializer.YoutubeEventClassifier;
+import org.apache.streams.youtube.serializer.YoutubeVideoDeserializer;
+
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.youtube.model.Channel;
+import com.google.api.services.youtube.model.Video;
+import org.apache.commons.lang.NotImplementedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+public class YoutubeTypeConverter implements StreamsProcessor {
+
+  public static final String STREAMS_ID = "YoutubeTypeConverter";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeTypeConverter.class);
+
+  private StreamsJacksonMapper mapper;
+  private Queue<Video> inQueue;
+  private Queue<StreamsDatum> outQueue;
+  private int count = 0;
+
+  public YoutubeTypeConverter() {}
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum streamsDatum) {
+    StreamsDatum result = null;
+
+    try {
+      Object item = streamsDatum.getDocument();
+
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+      Activity activity;
+
+      if (item instanceof String) {
+        item = deserializeItem(item);
+      }
+
+      if (item instanceof Video) {
+        activity = new Activity();
+        YoutubeActivityUtil.updateActivity((Video)item, activity, 
streamsDatum.getId());
+      } else if (item instanceof Channel) {
+        activity = new Activity();
+        YoutubeActivityUtil.updateActivity((Channel)item, activity, null);
+      } else {
+        throw new NotImplementedException("Type conversion not implement for 
type : " + item.getClass().getName());
+      }
+
+      if (activity != null) {
+        result = new StreamsDatum(activity);
+        count++;
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while converting Video to Activity: {}", ex);
+    }
+
+    if (result != null) {
+      List<StreamsDatum> streamsDatumList = new ArrayList<>();
+      streamsDatumList.add(result);
+      return streamsDatumList;
+    } else {
+      return new ArrayList<>();
+    }
+  }
+
+  private Object deserializeItem(Object item) {
+    try {
+      Class klass = YoutubeEventClassifier.detectClass((String) item);
+      if (klass.equals(Video.class)) {
+        item = mapper.readValue((String) item, Video.class);
+      } else if (klass.equals(Channel.class)) {
+        item = mapper.readValue((String) item, Channel.class);
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserializeItem: {}", ex);
+    }
+
+    return item;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    mapper = StreamsJacksonMapper.getInstance();
+
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
+    mapper.registerModule(simpleModule);
+    simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Channel.class, new 
YoutubeChannelDeserializer());
+    mapper.registerModule(simpleModule);
+  }
+
+  @Override
+  public void cleanUp() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java
new file mode 100644
index 0000000..332b1a7
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelDataCollector.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.youtube.YouTube;
+import com.google.api.services.youtube.model.Channel;
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Collects YoutubeChannelData on behalf of YoutubeChannelProvider.
+ */
+public class YoutubeChannelDataCollector extends YoutubeDataCollector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeChannelDataCollector.class);
+  private static final String CONTENT = 
"snippet,contentDetails,statistics,topicDetails";
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  private static final int MAX_ATTEMPTS = 5;
+
+  private YouTube youTube;
+  private BlockingQueue<StreamsDatum> queue;
+  private BackOffStrategy strategy;
+  private UserInfo userInfo;
+  private YoutubeConfiguration youtubeConfig;
+
+  /**
+   * YoutubeChannelDataCollector constructor.
+   * @param youTube       YouTube
+   * @param queue         BlockingQueue of StreamsDatum
+   * @param strategy      BackOffStrategy
+   * @param userInfo      UserInfo
+   * @param youtubeConfig YoutubeConfiguration
+   */
+  public YoutubeChannelDataCollector(
+      YouTube youTube,
+      BlockingQueue<StreamsDatum> queue,
+      BackOffStrategy strategy,
+      UserInfo userInfo,
+      YoutubeConfiguration youtubeConfig) {
+    this.youTube = youTube;
+    this.queue = queue;
+    this.strategy = strategy;
+    this.userInfo = userInfo;
+    this.youtubeConfig = youtubeConfig;
+  }
+
+  @Override
+  public void run() {
+    Gson gson = new Gson();
+    try {
+      int attempt = 0;
+      YouTube.Channels.List channelLists = 
this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setKey(this.youtubeConfig.getApiKey());
+      boolean tryAgain = false;
+      do {
+        try {
+          List<Channel> channels = channelLists.execute().getItems();
+          for (Channel channel : channels) {
+            String json = gson.toJson(channel);
+            this.queue.put(new StreamsDatum(json, channel.getId()));
+          }
+          if (StringUtils.isEmpty(channelLists.getPageToken())) {
+            channelLists = null;
+          } else {
+            channelLists = 
this.youTube.channels().list(CONTENT).setId(this.userInfo.getUserId()).setOauthToken(this.youtubeConfig.getApiKey())
+                .setPageToken(channelLists.getPageToken());
+          }
+        } catch (GoogleJsonResponseException gjre) {
+          LOGGER.warn("GoogleJsonResposneException caught : {}", gjre);
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.strategy);
+          ++attempt;
+        } catch (Throwable throwable) {
+          LOGGER.warn("Unable to get channel info for id : {}", 
this.userInfo.getUserId());
+          LOGGER.warn("Excpection thrown while trying to get channel info : 
{}", throwable);
+        }
+      }
+      while ((tryAgain && attempt < MAX_ATTEMPTS) || channelLists != null);
+
+    } catch (Throwable throwable) {
+      LOGGER.warn(throwable.getMessage());
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java
new file mode 100644
index 0000000..d8a09d5
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeChannelProvider.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.youtube.YouTube;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Retrieve recent activity from a list of channels.
+ */
+public class YoutubeChannelProvider extends YoutubeProvider {
+
+  public YoutubeChannelProvider() {
+    super();
+  }
+
+  public YoutubeChannelProvider(YoutubeConfiguration config) {
+    super(config);
+  }
+
+  /**
+   * To use from command line:
+   * <p/>
+   * Supply (at least) the following required configuration in 
application.conf:
+   * <p/>
+   * youtube.oauth.pathToP12KeyFile
+   * youtube.oauth.serviceAccountEmailAddress
+   * youtube.apiKey
+   * youtube.youtubeUsers
+   * <p/>
+   * Launch using:
+   * <p/>
+   * mvn exec:java 
-Dexec.mainClass=org.apache.streams.youtube.provider.YoutubeUserActivityProvider
 -Dexec.args="application.conf tweets.json"
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    YoutubeConfiguration config = new 
ComponentConfigurator<>(YoutubeConfiguration.class).detectConfiguration(typesafe,
 "youtube");
+    YoutubeChannelProvider provider = new YoutubeChannelProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+      for (StreamsDatum datum : provider.readCurrent()) {
+        String json;
+        try {
+          if (datum.getDocument() instanceof String) {
+            json = (String) datum.getDocument();
+          } else {
+            json = mapper.writeValueAsString(datum.getDocument());
+          }
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
+    }
+    while (provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
+
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
+    return new YoutubeChannelDataCollector(youtube, queue, strategy, userInfo, 
this.config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java
new file mode 100644
index 0000000..b05365c
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeDataCollector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.util.api.requests.backoff.BackOffException;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base Collector for Youtube Data.
+ */
+public abstract class YoutubeDataCollector implements Runnable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeDataCollector.class);
+
+  /**
+   * Looks at the status code of the expception.  If the code indicates that 
the request should be retried,
+   * it executes the back off strategy and returns true.
+   * @param gjre
+   * @param backOff
+   * @return returns true if the error code of the exception indicates the 
request should be retried.
+   */
+  public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, 
BackOffStrategy backOff) throws BackOffException {
+    boolean tryAgain = false;
+
+    switch (gjre.getStatusCode()) {
+      case 400 :
+        LOGGER.warn("Bad Request  : {}",  gjre);
+        break;
+      case 401 :
+        LOGGER.warn("Invalid Credentials : {}", gjre);
+        break;
+      case 403 :
+        LOGGER.warn("Possible rate limit exception. Retrying. : {}", 
gjre.getMessage());
+        backOff.backOff();
+        tryAgain = true;
+        break;
+      case 503 :
+        LOGGER.warn("Google Backend Service Error : {}", gjre);
+        break;
+      default:
+        LOGGER.warn("Google Service returned error : {}", gjre);
+        tryAgain = true;
+        backOff.backOff();
+        break;
+    }
+
+    return tryAgain;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java
new file mode 100644
index 0000000..da21722
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeProvider.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.youtube.YouTube;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class YoutubeProvider implements StreamsProvider {
+
+  private static final String STREAMS_ID = "YoutubeProvider";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeProvider.class);
+  private static final int MAX_BATCH_SIZE = 1000;
+  /**
+   * Define a global instance of the HTTP transport.
+   */
+  private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
+  /**
+   * Define a global instance of the JSON factory.
+   */
+  private static final JsonFactory JSON_FACTORY = new JacksonFactory();
+  private static final int DEFAULT_THREAD_POOL_SIZE = 5;
+  protected YouTube youtube;
+  protected YoutubeConfiguration config;
+  // This OAuth 2.0 access scope allows for full read/write access to the
+  // authenticated user's account.
+  private List<String> scopes = 
Collections.singletonList("https://www.googleapis.com/auth/youtube";);
+  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private ListeningExecutorService executor;
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private AtomicBoolean isComplete;
+  private boolean previousPullWasEmpty;
+
+  /**
+   * YoutubeProvider constructor.
+   * Resolves config from JVM 'youtube'.
+   */
+  public YoutubeProvider() {
+    this.config = new ComponentConfigurator<>(YoutubeConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
+
+    Objects.requireNonNull(this.config.getApiKey());
+  }
+
+  /**
+   * YoutubeProvider constructor - uses supplied YoutubeConfiguration.
+   * @param config YoutubeConfiguration
+   */
+  public YoutubeProvider(YoutubeConfiguration config) {
+    this.config = config;
+
+    Objects.requireNonNull(this.config.getApiKey());
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    try {
+      this.youtube = createYouTubeClient();
+    } catch (IOException | GeneralSecurityException ex) {
+      LOGGER.error("Failed to created oauth for YouTube : {}", ex);
+      throw new RuntimeException(ex);
+    }
+
+    this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE));
+    this.datumQueue = new LinkedBlockingQueue<>(1000);
+    this.isComplete = new AtomicBoolean(false);
+    this.previousPullWasEmpty = false;
+  }
+
+  @Override
+  public void startStream() {
+    BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
+
+    for (UserInfo user : this.config.getYoutubeUsers()) {
+      if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == 
null) {
+        user.setAfterDate(this.config.getDefaultAfterDate());
+      }
+      if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() 
== null) {
+        user.setBeforeDate(this.config.getDefaultBeforeDate());
+      }
+
+      ListenableFuture future = 
executor.submit(getDataCollector(backOffStrategy, this.datumQueue, 
this.youtube, user));
+      futures.add(future);
+    }
+
+    this.executor.shutdown();
+  }
+
+  protected abstract Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo);
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
+    int batchCount = 0;
+    while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
+      StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
+      if (datum != null) {
+        ++batchCount;
+        ComponentUtils.offerUntilSuccess(datum, batch);
+      }
+    }
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @VisibleForTesting
+  protected YouTube createYouTubeClient() throws IOException, 
GeneralSecurityException {
+    GoogleCredential.Builder credentialBuilder = new GoogleCredential.Builder()
+        .setTransport(HTTP_TRANSPORT)
+        .setJsonFactory(JSON_FACTORY)
+        
.setServiceAccountId(getConfig().getOauth().getServiceAccountEmailAddress())
+        .setServiceAccountScopes(scopes);
+
+    if (StringUtils.isNotEmpty(getConfig().getOauth().getPathToP12KeyFile())) {
+      File p12KeyFile = new File(getConfig().getOauth().getPathToP12KeyFile());
+      if (p12KeyFile.exists() && p12KeyFile.isFile() && p12KeyFile.canRead()) {
+        credentialBuilder = 
credentialBuilder.setServiceAccountPrivateKeyFromP12File(p12KeyFile);
+      }
+    }
+    Credential credential = credentialBuilder.build();
+    return new YouTube.Builder(HTTP_TRANSPORT, JSON_FACTORY, 
credential).setApplicationName("Streams Application").build();
+  }
+
+  @Override
+  public void cleanUp() {
+    ComponentUtils.shutdownExecutor(this.executor, 10, 10);
+    this.executor = null;
+  }
+
+  public YoutubeConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(YoutubeConfiguration config) {
+    this.config = config;
+  }
+
+  /**
+   * Set and overwrite the default before date that was read from the 
configuration file.
+   * @param defaultBeforeDate defaultBeforeDate
+   */
+  public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
+    this.config.setDefaultBeforeDate(defaultBeforeDate);
+  }
+
+  /**
+   * Set and overwrite the default after date that was read from teh 
configuration file.
+   * @param defaultAfterDate defaultAfterDate
+   */
+  public void setDefaultAfterDate(DateTime defaultAfterDate) {
+    this.config.setDefaultAfterDate(defaultAfterDate);
+  }
+
+  /**
+   * Sets and overwrite the user info from the configuaration file.  Uses the 
defaults before and after dates.
+   * @param userIds Set of String userIds
+   */
+  public void setUserInfoWithDefaultDates(Set<String> userIds) {
+    List<UserInfo> youtubeUsers = new LinkedList<>();
+
+    for (String userId : userIds) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(this.config.getDefaultAfterDate());
+      user.setBeforeDate(this.config.getDefaultBeforeDate());
+      youtubeUsers.add(user);
+    }
+
+    this.config.setYoutubeUsers(youtubeUsers);
+  }
+
+  /**
+   * Set and overwrite user into from teh configuration file. Only sets after 
dater.
+   * @param usersAndAfterDates usersAndAfterDates
+   */
+  public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
+    List<UserInfo> youtubeUsers = new LinkedList<>();
+
+    for (String userId : usersAndAfterDates.keySet()) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(usersAndAfterDates.get(userId));
+      youtubeUsers.add(user);
+    }
+
+    this.config.setYoutubeUsers(youtubeUsers);
+  }
+
+  @Override
+  public boolean isRunning() {
+    if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      isComplete.set(true);
+      LOGGER.info("Exiting");
+    }
+    return !isComplete.get();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java
new file mode 100644
index 0000000..ba3e10f
--- /dev/null
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/org/apache/streams/youtube/provider/YoutubeUserActivityCollector.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.youtube.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.youtube.YoutubeConfiguration;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.youtube.YouTube;
+import com.google.api.services.youtube.model.ActivityListResponse;
+import com.google.api.services.youtube.model.Video;
+import com.google.api.services.youtube.model.VideoListResponse;
+import com.google.gson.Gson;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * YoutubeDataCollector for YoutubeUserActivityProvider.
+ */
+public class YoutubeUserActivityCollector extends YoutubeDataCollector {
+
+  /**
+   * Max results allowed per request
+   * https://developers.google.com/+/api/latest/activities/list
+   */
+  private static final long MAX_RESULTS = 50;
+  private static final int MAX_ATTEMPTS = 5;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeUserActivityCollector.class);
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+
+  static { //set up mapper for Google Activity Object
+    SimpleModule simpleModule = new SimpleModule();
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  private Gson gson = new Gson();
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private BackOffStrategy backOff;
+  private YouTube youtube;
+  private UserInfo userInfo;
+  private YoutubeConfiguration config;
+
+  /**
+   * YoutubeUserActivityCollector constructor.
+   * @param youtube YouTube
+   * @param datumQueue BlockingQueue of StreamsDatum
+   * @param backOff BackOffStrategy
+   * @param userInfo UserInfo
+   * @param config YoutubeConfiguration
+   */
+  public YoutubeUserActivityCollector(
+      YouTube youtube,
+      BlockingQueue<StreamsDatum> datumQueue,
+      BackOffStrategy backOff,
+      UserInfo userInfo,
+      YoutubeConfiguration config) {
+    this.youtube = youtube;
+    this.datumQueue = datumQueue;
+    this.backOff = backOff;
+    this.userInfo = userInfo;
+    this.config = config;
+  }
+
+  @Override
+  public void run() {
+    collectActivityData();
+  }
+
+  /**
+   * Iterate through all users in the Youtube configuration and collect all 
videos
+   * associated with their accounts.
+   */
+  protected void collectActivityData() {
+    try {
+      YouTube.Activities.List request = null;
+      ActivityListResponse feed = null;
+
+      boolean tryAgain = false;
+      int attempt = 0;
+      DateTime afterDate = userInfo.getAfterDate();
+      DateTime beforeDate = userInfo.getBeforeDate();
+
+      do {
+        try {
+          if (request == null) {
+            request = this.youtube.activities().list("contentDetails")
+                .setChannelId(userInfo.getUserId())
+                .setMaxResults(MAX_RESULTS)
+                .setKey(config.getApiKey());
+            feed = request.execute();
+          } else {
+            request = this.youtube.activities().list("contentDetails")
+                .setChannelId(userInfo.getUserId())
+                .setMaxResults(MAX_RESULTS)
+                .setPageToken(feed.getNextPageToken())
+                .setKey(config.getApiKey());
+            feed = request.execute();
+          }
+          this.backOff.reset(); //successful pull reset api.
+
+          processActivityFeed(feed, afterDate, beforeDate);
+        } catch (GoogleJsonResponseException gjre) {
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
+          ++attempt;
+        }
+      }
+      while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) 
&& attempt < MAX_ATTEMPTS);
+    } catch (Throwable throwable) {
+      if (throwable instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throwable.printStackTrace();
+      LOGGER.warn("Unable to pull Activities for user={} : {}", 
this.userInfo.getUserId(), throwable);
+    }
+  }
+
+  /**
+   * Given a feed and an after and before date, fetch all relevant user videos
+   * and place them into the datumQueue for post-processing.
+   * @param feed ActivityListResponse
+   * @param afterDate DateTime
+   * @param beforeDate DateTime
+   * @throws IOException IOException
+   * @throws InterruptedException InterruptedException
+   */
+  void processActivityFeed(ActivityListResponse feed, DateTime afterDate, 
DateTime beforeDate) throws IOException, InterruptedException {
+    for (com.google.api.services.youtube.model.Activity activity : 
feed.getItems()) {
+      try {
+        List<Video> videos = new ArrayList<>();
+
+        if (activity.getContentDetails().getUpload() != null) {
+          
videos.addAll(getVideoList(activity.getContentDetails().getUpload().getVideoId()));
+        }
+        if (activity.getContentDetails().getPlaylistItem() != null && 
activity.getContentDetails().getPlaylistItem().getResourceId() != null) {
+          
videos.addAll(getVideoList(activity.getContentDetails().getPlaylistItem().getResourceId().getVideoId()));
+        }
+
+        processVideos(videos, afterDate, beforeDate, activity, feed);
+      } catch (Exception ex) {
+        LOGGER.error("Error while trying to process activity: {}, {}", 
activity, ex);
+      }
+    }
+  }
+
+  /**
+   * Process a list of Video objects.
+   * @param videos     List of Video
+   * @param afterDate  afterDate
+   * @param beforeDate beforeDate
+   * @param activity com.google.api.services.youtube.model.Activity
+   * @param feed ActivityListResponse
+   */
+  void processVideos(List<Video> videos, DateTime afterDate, DateTime 
beforeDate, com.google.api.services.youtube.model.Activity activity, 
ActivityListResponse feed) {
+    try {
+      for (Video video : videos) {
+        if (video != null) {
+          org.joda.time.DateTime published = new 
org.joda.time.DateTime(video.getSnippet().getPublishedAt().getValue());
+          if ((afterDate == null && beforeDate == null)
+              || (beforeDate == null && afterDate.isBefore(published))
+              || (afterDate == null && beforeDate.isAfter(published))
+              || ((afterDate != null && beforeDate != null) && 
(afterDate.isAfter(published) && beforeDate.isBefore(published)))) {
+            LOGGER.debug("Providing Youtube Activity: {}", 
MAPPER.writeValueAsString(video));
+            this.datumQueue.put(new StreamsDatum(gson.toJson(video), 
activity.getId()));
+          } else if (afterDate != null && afterDate.isAfter(published)) {
+            feed.setNextPageToken(null); // do not fetch next page
+            break;
+          }
+        }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to process video list: {}, {}", 
videos, ex);
+    }
+  }
+
+  /**
+   * Given a Youtube videoId, return the relevant Youtube Video object.
+   * @param videoId videoId
+   * @return List of Videos
+   * @throws IOException
+   */
+  List<Video> getVideoList(String videoId) throws IOException {
+    VideoListResponse videosListResponse = 
this.youtube.videos().list("snippet,statistics")
+        .setId(videoId)
+        .setKey(config.getApiKey())
+        .execute();
+
+    if (videosListResponse.getItems().size() == 0) {
+      LOGGER.debug("No Youtube videos found for videoId: {}", videoId);
+      return new ArrayList<>();
+    }
+
+    return videosListResponse.getItems();
+  }
+
+  BlockingQueue<StreamsDatum> getDatumQueue() {
+    return this.datumQueue;
+  }
+}
\ No newline at end of file


Reply via email to