http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
deleted file mode 100644
index 120efe9..0000000
--- 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.mongo;
-
-import com.google.common.base.Objects;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class MongoConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(MongoConfigurator.class);
-
-    public static MongoConfiguration detectConfiguration(Config mongo) {
-
-        MongoConfiguration mongoConfiguration = new MongoConfiguration();
-
-        mongoConfiguration.setHost(mongo.getString("host"));
-        mongoConfiguration.setPort(new Long(mongo.getInt("port")));
-        mongoConfiguration.setDb(mongo.getString("db"));
-        mongoConfiguration.setCollection(mongo.getString("collection"));
-
-        if( mongo.hasPath("user"))
-            mongoConfiguration.setUser(mongo.getString("user"));
-        if( mongo.hasPath("password"))
-            mongoConfiguration.setPassword(mongo.getString("password"));
-        return mongoConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
index 9772f95..ba77ff1 100644
--- 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
+++ 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -23,11 +23,19 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
-import com.mongodb.*;
-import com.mongodb.util.JSON;
-import com.typesafe.config.Config;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -35,11 +43,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
-import java.net.UnknownHostException;
-import java.util.List;
 import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,7 +57,6 @@ public class MongoPersistReader implements 
StreamsPersistReader {
     public static final String STREAMS_ID = "MongoPersistReader";
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(MongoPersistReader.class);
-    private final static long MAX_WRITE_LATENCY = 1000;
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
@@ -59,7 +66,6 @@ public class MongoPersistReader implements 
StreamsPersistReader {
     private ExecutorService executor;
 
     private MongoConfiguration config;
-    private MongoPersistReaderTask readerTask;
 
     protected MongoClient client;
     protected DB db;
@@ -67,13 +73,11 @@ public class MongoPersistReader implements 
StreamsPersistReader {
 
     protected DBCursor cursor;
 
-    protected List<DBObject> insertBatch = Lists.newArrayList();
-
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     public MongoPersistReader() {
-        Config config = StreamsConfigurator.config.getConfig("mongo");
-        this.config = MongoConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(MongoConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
     }
 
     public MongoPersistReader(MongoConfiguration config) {
@@ -81,8 +85,8 @@ public class MongoPersistReader implements 
StreamsPersistReader {
     }
 
     public MongoPersistReader(Queue<StreamsDatum> persistQueue) {
-        Config config = StreamsConfigurator.config.getConfig("mongo");
-        this.config = MongoConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(MongoConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
         this.persistQueue = persistQueue;
     }
 
@@ -95,14 +99,6 @@ public class MongoPersistReader implements 
StreamsPersistReader {
     }
 
     public void stop() {
-
-//        try {
-//            client.st
-//            client.requestDone();
-//        } catch (Exception e) {
-//        } finally {
-//            client.requestDone();
-//        }
     }
 
     @Override
@@ -121,8 +117,7 @@ public class MongoPersistReader implements 
StreamsPersistReader {
 
         cursor = collection.find();
 
-        if( cursor == null ||
-            cursor.hasNext() == false )
+        if( cursor == null || !cursor.hasNext())
             throw new RuntimeException("Collection not present or empty!");
 
         persistQueue = constructQueue();
@@ -149,9 +144,8 @@ public class MongoPersistReader implements 
StreamsPersistReader {
             LOGGER.warn("document isn't valid JSON.");
             return null;
         }
-        StreamsDatum datum = new StreamsDatum(objectNode, id);
 
-        return datum;
+        return new StreamsDatum(objectNode, id);
     }
 
     private synchronized void connectToMongo() {
@@ -161,7 +155,7 @@ public class MongoPersistReader implements 
StreamsPersistReader {
         if (!Strings.isNullOrEmpty(config.getUser()) && 
!Strings.isNullOrEmpty(config.getPassword())) {
             MongoCredential credential =
                     MongoCredential.createCredential(config.getUser(), 
config.getDb(), config.getPassword().toCharArray());
-            client = new MongoClient(serverAddress, 
Lists.<MongoCredential>newArrayList(credential));
+            client = new MongoClient(serverAddress, 
Lists.newArrayList(credential));
         } else {
             client = new MongoClient(serverAddress);
         }
@@ -178,15 +172,12 @@ public class MongoPersistReader implements 
StreamsPersistReader {
     @Override
     public StreamsResultSet readAll() {
 
-        DBCursor cursor = collection.find();
-        try {
-            while(cursor.hasNext()) {
+        try (DBCursor cursor = collection.find()) {
+            while (cursor.hasNext()) {
                 DBObject dbObject = cursor.next();
                 StreamsDatum datum = prepareDatum(dbObject);
                 write(datum);
             }
-        } finally {
-            cursor.close();
         }
 
         return readCurrent();
@@ -196,14 +187,14 @@ public class MongoPersistReader implements 
StreamsPersistReader {
     public void startStream() {
 
         LOGGER.debug("startStream");
-        readerTask = new MongoPersistReaderTask(this);
+        MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this);
         Thread readerTaskThread = new Thread(readerTask);
         Future future = executor.submit(readerTaskThread);
 
         while( !future.isDone() && !future.isCancelled()) {
             try {
                 Thread.sleep(1000);
-            } catch (InterruptedException e) {}
+            } catch (InterruptedException ignored) {}
         }
 
         executor.shutdown();
@@ -219,9 +210,6 @@ public class MongoPersistReader implements 
StreamsPersistReader {
             lock.writeLock().lock();
             current = new StreamsResultSet(persistQueue);
             current.setCounter(new DatumStatusCounter());
-//            current.getCounter().add(countersCurrent);
-//            countersTotal.add(countersCurrent);
-//            countersCurrent = new DatumStatusCounter();
             persistQueue = constructQueue();
         } finally {
             lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index 4938dcc..5f6ac1f 100644
--- 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -23,30 +23,29 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.mongodb.DB;
-import com.mongodb.DBAddress;
 import com.mongodb.DBCollection;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.util.JSON;
-import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.bson.types.ObjectId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Queue;
 import java.util.Random;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -71,12 +70,13 @@ public class MongoPersistWriter implements 
StreamsPersistWriter, Runnable {
     protected DB db;
     protected DBCollection collection;
 
-    protected List<DBObject> insertBatch = Lists.newArrayList();
+    protected List<DBObject> insertBatch = new ArrayList<>();
 
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     public MongoPersistWriter() {
-        
this(MongoConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("mongo")));
+        this(new ComponentConfigurator<>(MongoConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")));
     }
 
     public MongoPersistWriter(MongoConfiguration config) {
@@ -112,7 +112,7 @@ public class MongoPersistWriter implements 
StreamsPersistWriter, Runnable {
             lock.writeLock().lock();
             collection.insert(insertBatch);
             lastWrite.set(System.currentTimeMillis());
-            insertBatch = Lists.newArrayList();
+            insertBatch = new ArrayList<>();
         } finally {
             lock.writeLock().unlock();
         }
@@ -179,7 +179,7 @@ public class MongoPersistWriter implements 
StreamsPersistWriter, Runnable {
             }
             try {
                 Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException e) {
+            } catch (InterruptedException ignored) {
             }
         }
 
@@ -187,7 +187,7 @@ public class MongoPersistWriter implements 
StreamsPersistWriter, Runnable {
 
     @Override
     public void prepare(Object configurationObject) {
-        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+        this.persistQueue = new ConcurrentLinkedQueue<>();
         start();
     }
 
@@ -240,7 +240,7 @@ public class MongoPersistWriter implements 
StreamsPersistWriter, Runnable {
         if (!Strings.isNullOrEmpty(config.getUser()) && 
!Strings.isNullOrEmpty(config.getPassword())) {
             MongoCredential credential =
                     MongoCredential.createCredential(config.getUser(), 
config.getDb(), config.getPassword().toCharArray());
-            client = new MongoClient(serverAddress, 
Lists.<MongoCredential>newArrayList(credential));
+            client = new MongoClient(serverAddress, 
Lists.newArrayList(credential));
         } else {
             client = new MongoClient(serverAddress);
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index bac96bf..7aa632e 100644
--- 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -18,10 +18,9 @@
 
 package org.apache.streams.peoplepattern;
 
-import com.google.common.collect.Maps;
-import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
 import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
@@ -31,6 +30,7 @@ import org.apache.streams.pojo.json.Actor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -43,7 +43,8 @@ public class AccountTypeProcessor extends 
SimpleHTTPGetProcessor {
     private final static Logger LOGGER = 
LoggerFactory.getLogger(AccountTypeProcessor.class);
 
     public AccountTypeProcessor() {
-        
this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern")));
     }
 
     public AccountTypeProcessor(HttpProcessorConfiguration 
peoplePatternConfiguration) {
@@ -65,11 +66,11 @@ public class AccountTypeProcessor extends 
SimpleHTTPGetProcessor {
         Actor actor = activity.getActor();
         ActivityObject actorObject = mapper.convertValue(actor, 
ActivityObject.class);
         String username = (String) 
ExtensionUtil.getInstance().getExtension(actorObject, "screenName");
-        Map<String, String> params = Maps.newHashMap();
+        Map<String, String> params = new HashMap<>();
         params.put("id", actor.getId());
         params.put("name", actor.getDisplayName());
         params.put("username", username);
         params.put("description", actor.getSummary());
         return params;
     }
-};
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
index 238685d..4f6af35 100644
--- 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
+++ 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -18,10 +18,9 @@
 
 package org.apache.streams.peoplepattern;
 
-import com.google.common.collect.Maps;
-import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
 import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
@@ -31,6 +30,7 @@ import org.apache.streams.pojo.json.Actor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -43,7 +43,8 @@ public class DemographicsProcessor extends 
SimpleHTTPGetProcessor {
     private final static Logger LOGGER = 
LoggerFactory.getLogger(DemographicsProcessor.class);
 
     public DemographicsProcessor() {
-        
this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern")));
+        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern")));
     }
 
     public DemographicsProcessor(HttpProcessorConfiguration 
peoplePatternConfiguration) {
@@ -65,7 +66,7 @@ public class DemographicsProcessor extends 
SimpleHTTPGetProcessor {
         Actor actor = activity.getActor();
         ActivityObject actorObject = mapper.convertValue(actor, 
ActivityObject.class);
         String username = (String) 
ExtensionUtil.getInstance().getExtension(actorObject, "screenName");
-        Map<String, String> params = Maps.newHashMap();
+        Map<String, String> params = new HashMap<>();
         params.put("id", actor.getId());
         params.put("name", actor.getDisplayName());
         params.put("username", username);
@@ -73,4 +74,4 @@ public class DemographicsProcessor extends 
SimpleHTTPGetProcessor {
         return params;
     }
 
-};
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java
deleted file mode 100644
index da1b778..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.facebook.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-import org.apache.streams.facebook.FacebookConfiguration;
-import org.apache.streams.facebook.FacebookOAuthConfiguration;
-import org.apache.streams.facebook.FacebookUserInformationConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FacebookStreamConfigurator {
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookStreamConfigurator.class);
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-
-    public static FacebookUserInformationConfiguration 
detectFacebookConfiguration(Config config) {
-        FacebookUserInformationConfiguration 
facebookUserInformationConfiguration = new 
FacebookUserInformationConfiguration();
-
-        try {
-            Config oauth = config.getConfig("oauth");
-            FacebookOAuthConfiguration facebookOAuthConfiguration = new 
FacebookOAuthConfiguration();
-            
facebookOAuthConfiguration.setAppAccessToken(oauth.getString("appAccessToken"));
-            
facebookOAuthConfiguration.setAppSecret(oauth.getString("appSecret"));
-            
facebookOAuthConfiguration.setUserAccessToken(oauth.getString("userAccessToken"));
-            facebookOAuthConfiguration.setAppId(oauth.getString("appId"));
-
-            
facebookUserInformationConfiguration.setOauth(facebookOAuthConfiguration);
-        } catch( ConfigException ce ) {
-            LOGGER.error("Exception while extracting Facebook oauth token: 
{}", ce.getMessage());
-        }
-
-        return facebookUserInformationConfiguration;
-    }
-
-    public static FacebookConfiguration detectConfiguration(Config config) {
-
-        FacebookConfiguration facebookConfiguration = 
mapper.convertValue(detectFacebookConfiguration(config), 
FacebookConfiguration.class);
-
-        return facebookConfiguration;
-    }
-
-    public static FacebookUserInformationConfiguration 
detectFacebookUserInformationConfiguration(Config config) {
-        return mapper.convertValue(detectFacebookConfiguration(config), 
FacebookUserInformationConfiguration.class);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
deleted file mode 100644
index d32bc9c..0000000
--- 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gmail;
-
-import com.google.gmail.GMailConfiguration;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-import org.apache.streams.config.StreamsConfigurator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class GMailConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GMailConfigurator.class);
-
-    public static GMailConfiguration detectConfiguration(Config gmail) {
-
-        GMailConfiguration gmailConfiguration = new GMailConfiguration();
-
-        gmailConfiguration.setUserName(gmail.getString("username"));
-        gmailConfiguration.setPassword(gmail.getString("password"));
-
-        return gmailConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
index 5872e27..5cc6fe7 100644
--- 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
+++ 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -19,32 +19,40 @@
 package com.google.gmail.provider;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gmail.GMailConfiguration;
-import com.google.gmail.GMailConfigurator;
 import com.googlecode.gmail4j.GmailClient;
 import com.googlecode.gmail4j.GmailConnection;
 import com.googlecode.gmail4j.http.HttpGmailConnection;
 import com.googlecode.gmail4j.javamail.ImapGmailClient;
 import com.googlecode.gmail4j.javamail.ImapGmailConnection;
 import com.googlecode.gmail4j.rss.RssGmailClient;
-import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class GMailProvider implements StreamsProvider, DatumStatusCountable {
+public class GMailProvider implements StreamsProvider, Serializable {
 
     public final static String STREAMS_ID = "GMailProvider";
 
@@ -62,9 +70,9 @@ public class GMailProvider implements StreamsProvider, 
DatumStatusCountable {
         this.config = config;
     }
 
-    protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
+    protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000);
 
-    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
+    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<>();
     protected Future task;
 
     public BlockingQueue<Object> getInQueue() {
@@ -83,8 +91,8 @@ public class GMailProvider implements StreamsProvider, 
DatumStatusCountable {
     }
 
     public GMailProvider() {
-        Config config = StreamsConfigurator.config.getConfig("gmail");
-        this.config = GMailConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(GMailConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail"));
     }
 
     public GMailProvider(GMailConfiguration config) {
@@ -92,8 +100,8 @@ public class GMailProvider implements StreamsProvider, 
DatumStatusCountable {
     }
 
     public GMailProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("gmail");
-        this.config = GMailConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(GMailConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail"));
         this.klass = klass;
     }
 
@@ -123,7 +131,7 @@ public class GMailProvider implements StreamsProvider, 
DatumStatusCountable {
         StreamsResultSet current;
 
         synchronized( GMailProvider.class ) {
-            current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+            current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(providerQueue));
             current.setCounter(new DatumStatusCounter());
             current.getCounter().add(countersCurrent);
             countersTotal.add(countersCurrent);
@@ -179,9 +187,4 @@ public class GMailProvider implements StreamsProvider, 
DatumStatusCountable {
             e.printStackTrace();
         }
     }
-
-    @Override
-    public DatumStatusCounter getDatumStatusCounter() {
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
index b9e9b2d..734e711 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
@@ -18,23 +18,20 @@
 
 package com.google.gplus.provider;
 
-import 
com.google.api.client.googleapis.auth.oauth2.GoogleAuthorizationCodeFlow;
 import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse;
 import com.google.api.client.http.HttpTransport;
 import com.google.api.client.http.javanet.NetHttpTransport;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.services.plus.Plus;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gson.Gson;
-import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -53,14 +50,14 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -94,8 +91,8 @@ public abstract class AbstractGPlusProvider implements 
StreamsProvider {
     protected Plus plus;
 
     public AbstractGPlusProvider() {
-        Config config = StreamsConfigurator.config.getConfig("gplus");
-        this.config = GPlusConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(GPlusConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus"));
     }
 
     public AbstractGPlusProvider(GPlusConfiguration config) {
@@ -221,7 +218,7 @@ public abstract class AbstractGPlusProvider implements 
StreamsProvider {
      * @param userIds
      */
     public void setUserInfoWithDefaultDates(Set<String> userIds) {
-        List<UserInfo> gPlusUsers = Lists.newLinkedList();
+        List<UserInfo> gPlusUsers = new LinkedList<>();
         for(String userId : userIds) {
             UserInfo user = new UserInfo();
             user.setUserId(userId);
@@ -237,7 +234,7 @@ public abstract class AbstractGPlusProvider implements 
StreamsProvider {
      * @param usersAndAfterDates
      */
     public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
-        List<UserInfo> gPlusUsers = Lists.newLinkedList();
+        List<UserInfo> gPlusUsers = new LinkedList<>();
         for(String userId : usersAndAfterDates.keySet()) {
             UserInfo user = new UserInfo();
             user.setUserId(userId);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
deleted file mode 100644
index 1999130..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.google.gplus.GPlusConfiguration;
-import org.apache.streams.google.gplus.GPlusOAuthConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class GPlusConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GPlusConfigurator.class);
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-
-    public static GPlusConfiguration detectConfiguration(Config config) {
-        GPlusConfiguration configuration = null;
-        try {
-            configuration = 
MAPPER.readValue(config.root().render(ConfigRenderOptions.concise()), 
GPlusConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        Preconditions.checkNotNull(configuration);
-
-        return configuration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
deleted file mode 100644
index 11e6d79..0000000
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.instagram;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class InstagramConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(InstagramConfigurator.class);
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-
-    public static InstagramConfiguration detectInstagramConfiguration(Config 
config) {
-
-        InstagramConfiguration instagramConfiguration = null;
-        try {
-            instagramConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
InstagramConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        Preconditions.checkNotNull(instagramConfiguration);
-
-        return instagramConfiguration;
-    }
-
-    public static InstagramUserInformationConfiguration 
detectInstagramUserInformationConfiguration(Config config) {
-
-        InstagramUserInformationConfiguration instagramConfiguration = null;
-        try {
-            instagramConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
InstagramUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        Preconditions.checkNotNull(instagramConfiguration);
-
-        return instagramConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index fe4b8da..fbba2a2 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -14,18 +14,16 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.InstagramConfigurator;
 import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.UsersInfo;
 import org.apache.streams.util.ComponentUtils;
@@ -37,11 +35,13 @@ import org.slf4j.LoggerFactory;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -60,16 +60,16 @@ public abstract class InstagramAbstractProvider implements 
StreamsProvider {
     private static final int MAX_BATCH_SIZE = 2000;
 
     protected InstagramConfiguration config;
-    private InstagramDataCollector dataCollector;
     protected Queue<StreamsDatum> dataQueue;
     private ListeningExecutorService executorService;
 
-    List<ListenableFuture<Object>> futures = new ArrayList<>();
+    private List<ListenableFuture<Object>> futures = new ArrayList<>();
 
     private AtomicBoolean isCompleted;
 
     public InstagramAbstractProvider() {
-        this.config = 
InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+        this.config = new ComponentConfigurator<>(InstagramConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("instagram"));
     }
 
     public InstagramAbstractProvider(InstagramConfiguration config) {
@@ -89,22 +89,22 @@ public abstract class InstagramAbstractProvider implements 
StreamsProvider {
 
     @Override
     public void startStream() {
-        this.dataCollector = getInstagramDataCollector();
+        InstagramDataCollector dataCollector = getInstagramDataCollector();
         this.executorService = 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
-        ListenableFuture future = 
this.executorService.submit(this.dataCollector);
+        ListenableFuture future = this.executorService.submit(dataCollector);
         this.futures.add(future);
     }
 
     /**
      * Return the data collector to use to connect to instagram.
-     * @return
+     * @return {@link InstagramDataCollector}
      */
     protected abstract InstagramDataCollector getInstagramDataCollector();
 
 
     @Override
     public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+        Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
         int count = 0;
         while(!this.dataQueue.isEmpty() && count < MAX_BATCH_SIZE) {
             
ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue),
 batch);
@@ -125,7 +125,7 @@ public abstract class InstagramAbstractProvider implements 
StreamsProvider {
 
     @Override
     public void prepare(Object configurationObject) {
-        this.dataQueue = Queues.newConcurrentLinkedQueue();
+        this.dataQueue = new ConcurrentLinkedQueue<>();
         this.isCompleted = new AtomicBoolean(false);
     }
 
@@ -171,7 +171,7 @@ public abstract class InstagramAbstractProvider implements 
StreamsProvider {
      * @param tokenStrings
      */
     public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
-        
ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+        ensureUsersInfo(this.config).setAuthorizedTokens(new 
HashSet<>(tokenStrings));
     }
 
     /**
@@ -197,7 +197,7 @@ public abstract class InstagramAbstractProvider implements 
StreamsProvider {
      * @param usersWithAfterDate instagram user id mapped to BeforeDate time
      */
     public void setUsersWithAfterDate(Map<String, DateTime> 
usersWithAfterDate) {
-        Set<User> users = Sets.newHashSet();
+        Set<User> users = new HashSet<>();
         for(String userId : usersWithAfterDate.keySet()) {
             User user = new User();
             user.setUserId(userId);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
index eda9b4a..9a31b5a 100644
--- 
a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
+++ 
b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
@@ -16,10 +16,7 @@ package org.apache.streams.instagram.provider.recentmedia;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -28,18 +25,10 @@ import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.*;
+import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.provider.InstagramAbstractProvider;
 import org.apache.streams.instagram.provider.InstagramDataCollector;
-import 
org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector;
-import 
org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.SerializationUtil;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,16 +36,8 @@ import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.math.BigInteger;
-import java.util.Collection;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the 
recent media data for a group of users

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
deleted file mode 100644
index 4c3ba06..0000000
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import org.apache.streams.moreover.MoreoverConfiguration;
-import org.apache.streams.moreover.MoreoverKeyData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class MoreoverConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(MoreoverConfigurator.class);
-
-    public static MoreoverConfiguration detectConfiguration(Config moreover) {
-
-        MoreoverConfiguration moreoverConfiguration = new 
MoreoverConfiguration();
-
-        List<MoreoverKeyData> apiKeys = Lists.newArrayList();
-
-        Config apiKeysConfig = moreover.getConfig("apiKeys");
-
-        if( !apiKeysConfig.isEmpty())
-            for( String apiKeyId : apiKeysConfig.root().keySet() ) {
-                Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
-                apiKeys.add(new MoreoverKeyData()
-                        .withId(apiKeyConfig.getString("key"))
-                        .withKey(apiKeyConfig.getString("key"))
-                        
.withStartingSequence(apiKeyConfig.getString("startingSequence")));
-            }
-        moreoverConfiguration.setApiKeys(apiKeys);
-
-        return moreoverConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java
deleted file mode 100644
index e1a7033..0000000
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.rss.provider;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import org.apache.streams.rss.FeedDetails;
-import org.apache.streams.rss.RssStreamConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class RssStreamConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(RssStreamConfigurator.class);
-
-    public static RssStreamConfiguration detectConfiguration(Config rss) {
-
-        RssStreamConfiguration rssStreamConfiguration = new 
RssStreamConfiguration();
-
-        List<FeedDetails> feeds = Lists.newArrayList();
-        feeds.add(new FeedDetails().withUrl(rss.getString("url")));
-
-        rssStreamConfiguration.setFeeds(feeds);
-        return rssStreamConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index a0e8ea1..d7dc918 100644
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -22,16 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.sun.syndication.feed.synd.SyndEntry;
-import com.sun.syndication.feed.synd.SyndFeed;
-import com.sun.syndication.io.FeedException;
-import com.sun.syndication.io.SyndFeedInput;
-import com.sun.syndication.io.XmlReader;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
@@ -53,14 +44,19 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.io.PrintStream;
-import java.io.Serializable;
 import java.math.BigInteger;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -84,21 +80,21 @@ public class RssStreamProvider implements StreamsProvider {
 
     private RssStreamConfiguration config;
     private boolean perpetual;
-    private Set<String> urlFeeds;
     private ExecutorService executor;
     private BlockingQueue<StreamsDatum> dataQueue;
     private AtomicBoolean isComplete;
-    private int consecutiveEmptyReads;
 
     @VisibleForTesting
     protected RssFeedScheduler scheduler;
 
     public RssStreamProvider() {
-        
this(RssStreamConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("rss")),
 false);
+        this(new ComponentConfigurator<>(RssStreamConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false);
     }
 
     public RssStreamProvider(boolean perpetual) {
-        
this(RssStreamConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("rss")),
 perpetual);
+        this(new ComponentConfigurator<>(RssStreamConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), 
perpetual);
     }
 
     public RssStreamProvider(RssStreamConfiguration config) {
@@ -120,14 +116,13 @@ public class RssStreamProvider implements StreamsProvider 
{
     }
 
     public void setRssFeeds(Set<String> urlFeeds) {
-        this.urlFeeds = urlFeeds;
     }
 
     public void setRssFeeds(Map<String, Long> feeds) {
         if(this.config == null) {
             this.config = new RssStreamConfiguration();
         }
-        List<FeedDetails> feedDetails = Lists.newLinkedList();
+        List<FeedDetails> feedDetails = new ArrayList<>();
         for(String feed : feeds.keySet()) {
             Long delay = feeds.get(feed);
             FeedDetails detail = new FeedDetails();
@@ -146,7 +141,7 @@ public class RssStreamProvider implements StreamsProvider {
 
     @Override
     public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+        Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
         int batchSize = 0;
         while(!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) {
             StreamsDatum datum = 
ComponentUtils.pollWhileNotEmpty(this.dataQueue);
@@ -177,10 +172,10 @@ public class RssStreamProvider implements StreamsProvider 
{
     @Override
     public void prepare(Object configurationObject) {
         this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, 
new LinkedBlockingQueue<Runnable>());
-        this.dataQueue = Queues.newLinkedBlockingQueue();
+        this.dataQueue = new LinkedBlockingQueue<>();
         this.scheduler = getScheduler(this.dataQueue);
         this.isComplete = new AtomicBoolean(false);
-        this.consecutiveEmptyReads = 0;
+        int consecutiveEmptyReads = 0;
     }
 
     @VisibleForTesting
@@ -222,9 +217,7 @@ public class RssStreamProvider implements StreamsProvider {
         provider.startStream();
         do {
             
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
+            for (StreamsDatum datum : provider.readCurrent()) {
                 String json;
                 try {
                     json = mapper.writeValueAsString(datum.getDocument());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
index efbff46..8b483a1 100644
--- 
a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.data.util.RFC3339Utils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -37,8 +36,8 @@ import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 public class SyndEntryActivitySerializer implements 
ActivitySerializer<ObjectNode> {
 
@@ -57,7 +56,7 @@ public class SyndEntryActivitySerializer implements 
ActivitySerializer<ObjectNod
 
     @Override
     public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
-        List<Activity> result = Lists.newLinkedList();
+        List<Activity> result = new LinkedList<>();
         for (ObjectNode node : objectNodes) {
             result.add(deserialize(node));
         }
@@ -180,7 +179,7 @@ public class SyndEntryActivitySerializer implements 
ActivitySerializer<ObjectNod
         if (entry.get("uri") != null)
             uri = entry.get("uri").textValue();
 
-        /**
+        /*
          * Order of precedence for resourceLocation selection
          *
          * 1. Valid URI
@@ -209,9 +208,7 @@ public class SyndEntryActivitySerializer implements 
ActivitySerializer<ObjectNod
      * @return boolean of whether or not the resource is valid
      */
     private boolean isValidResource(String resource) {
-        if(resource != null && resource.startsWith("http") || 
resource.startsWith("www"))
-            return true;
-        return false;
+        return resource != null && (resource.startsWith("http") || 
resource.startsWith("www"));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java
deleted file mode 100644
index f503258..0000000
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.sysomos.config;
-
-import com.sysomos.SysomosConfiguration;
-import com.typesafe.config.Config;
-
-/**
- * Creates a {@link com.sysomos.SysomosConfiguration} instance from a {@link 
com.typesafe.config.Config} object.
- */
-public class SysomosConfigurator {
-    public static SysomosConfiguration detectConfiguration(Config config) {
-        SysomosConfiguration sysomos = new SysomosConfiguration();
-        sysomos.setHeartbeatIds(config.getStringList("heartbeatIds"));
-        sysomos.setApiBatchSize(config.getLong("apiBatchSize"));
-        sysomos.setApiKey(config.getString("apiKey"));
-        sysomos.setMinDelayMs(config.getLong("minDelayMs"));
-        sysomos.setScheduledDelayMs(config.getLong("scheduledDelayMs"));
-        sysomos.setMaxBatchSize(config.getLong("maxBatchSize"));
-        return sysomos;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 8330167..217d5dc 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -20,7 +20,7 @@
 package org.apache.streams.twitter.processor;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
@@ -32,7 +32,6 @@ import 
org.apache.streams.twitter.converter.StreamsTwitterMapper;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterConfigurator;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
 import org.apache.streams.twitter.provider.TwitterProviderUtil;
 import org.slf4j.Logger;
@@ -44,6 +43,7 @@ import twitter4j.TwitterFactory;
 import twitter4j.TwitterObjectFactory;
 import twitter4j.conf.ConfigurationBuilder;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider;
@@ -69,7 +69,8 @@ public class FetchAndReplaceTwitterProcessor implements 
StreamsProcessor {
     private int retryCount;
 
     public FetchAndReplaceTwitterProcessor() {
-        
this(TwitterConfigurator.detectTwitterStreamConfiguration(StreamsConfigurator.config.getConfig("twitter")));
+        this(new ComponentConfigurator<>(TwitterStreamConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter")));
     }
 
     public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) {
@@ -92,7 +93,7 @@ public class FetchAndReplaceTwitterProcessor implements 
StreamsProcessor {
         } else {
             throw new IllegalStateException("Requires an activity document");
         }
-        return Lists.newArrayList(entry);
+        return new ArrayList<>();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index eac1218..3765229 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -107,7 +107,7 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable, Dat
 
         ObjectMapper mapper = new 
StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
 
-        PrintStream outStream = null;
+        PrintStream outStream;
         try {
             outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
         } catch (FileNotFoundException e) {
@@ -164,8 +164,8 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable, Dat
     }
 
     public TwitterStreamProvider() {
-        Config config = StreamsConfigurator.config.getConfig("twitter");
-        this.config = 
TwitterConfigurator.detectTwitterStreamConfiguration(config);
+        this.config = new 
ComponentConfigurator<>(TwitterStreamConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
     }
 
     public TwitterStreamProvider(TwitterStreamConfiguration config) {
@@ -287,9 +287,9 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable, Dat
             return;
         }
 
-        LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] 
{hosebirdHosts,endpoint,auth});
+        LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, 
auth);
 
-        providerQueue = new 
LinkedBlockingQueue<Future<List<StreamsDatum>>>(MAX_BATCH);
+        providerQueue = new LinkedBlockingQueue<>(MAX_BATCH);
 
         client = new ClientBuilder()
             .name("apache/streams/streams-contrib/streams-provider-twitter")

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java
deleted file mode 100644
index e6f8ccf..0000000
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.youtube.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.youtube.pojo.YoutubeConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class YoutubeConfigurator {
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(YoutubeConfigurator.class);
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-
-    public static YoutubeConfiguration detectConfiguration(Config config) {
-        YoutubeConfiguration configuration = null;
-
-        try {
-            configuration = 
MAPPER.readValue(config.root().render(ConfigRenderOptions.concise()), 
YoutubeConfiguration.class);
-        } catch (IOException e) {
-            LOGGER.error("Exception while trying to use YoutubeConfigurator: 
{}", e);
-        }
-
-        Preconditions.checkNotNull(configuration);
-
-        return configuration;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
index e16f4bb..ab77467 100644
--- 
a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
+++ 
b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
@@ -34,7 +34,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -53,17 +53,15 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-
 public abstract class YoutubeProvider implements StreamsProvider {
 
     public static final String STREAMS_ID = "YoutubeProvider";
@@ -73,7 +71,7 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
 
     // This OAuth 2.0 access scope allows for full read/write access to the
     // authenticated user's account.
-    List<String> scopes = 
Lists.newArrayList("https://www.googleapis.com/auth/youtube";);
+    private List<String> scopes = 
Lists.newArrayList("https://www.googleapis.com/auth/youtube";);
 
     /**
      * Define a global instance of the HTTP transport.
@@ -87,7 +85,7 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
 
     private static final int DEFAULT_THREAD_POOL_SIZE = 5;
 
-    List<ListenableFuture<Object>> futures = new ArrayList<>();
+    private List<ListenableFuture<Object>> futures = new ArrayList<>();
 
     private ListeningExecutorService executor;
     private BlockingQueue<StreamsDatum> datumQueue;
@@ -98,8 +96,8 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
     protected YoutubeConfiguration config;
 
     public YoutubeProvider() {
-        Config config = StreamsConfigurator.config.getConfig("youtube");
-        this.config = YoutubeConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(YoutubeConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
 
         Preconditions.checkNotNull(this.config.getApiKey());
     }
@@ -228,7 +226,7 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
      * @param userIds
      */
     public void setUserInfoWithDefaultDates(Set<String> userIds) {
-        List<UserInfo> youtubeUsers = Lists.newLinkedList();
+        List<UserInfo> youtubeUsers = new LinkedList<>();
 
         for(String userId : userIds) {
             UserInfo user = new UserInfo();
@@ -246,7 +244,7 @@ public abstract class YoutubeProvider implements 
StreamsProvider {
      * @param usersAndAfterDates
      */
     public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
-        List<UserInfo> youtubeUsers = Lists.newLinkedList();
+        List<UserInfo> youtubeUsers = new LinkedList<>();
 
         for(String userId : usersAndAfterDates.keySet()) {
             UserInfo user = new UserInfo();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
deleted file mode 100644
index ccfff3f..0000000
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.local.queues;
-
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.*;
-
-/**
- * MultiThread unit tests for {@link 
org.apache.streams.local.queues.ThroughputQueue}
- */
-public class ThroughputQueueMulitThreadTest extends RandomizedTest {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ThroughputQueueMulitThreadTest.class);
-    private static final String MBEAN_ID = "testQueue";
-    private static final String STREAM_ID = "test_stream";
-    private static long STREAM_START_TIME = (new DateTime()).getMillis();
-
-    /**
-     * Remove registered mbeans from previous tests
-     * @throws Exception
-     */
-    @After
-    public void unregisterMXBean() throws Exception {
-        try {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new 
ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, 
STREAM_START_TIME)));
-        } catch (InstanceNotFoundException ife) {
-            //No-op
-        }
-    }
-
-    @After
-    public void removeLocalMBeans() {
-        try {
-            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
-        } catch (Exception e) {
-            //No op.  proceed to next test
-        }
-    }
-
-
-    /**
-     * Test that queue will block on puts when the queue is full
-     * @throws InterruptedException
-     */
-    @Test
-    public void testBlockOnFullQueue() throws InterruptedException {
-        int queueSize = randomIntBetween(1, 3000);
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        CountDownLatch full = new CountDownLatch(1);
-        CountDownLatch finished = new CountDownLatch(1);
-        ThroughputQueue queue = new ThroughputQueue(queueSize);
-        BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, 
queue, queueSize);
-        executor.submit(testThread);
-        full.await();
-        assertEquals(queueSize, queue.size());
-        assertEquals(queueSize, queue.getCurrentSize());
-        assertFalse(testThread.isComplete()); //test that it is blocked
-        safeSleep(1000);
-        assertFalse(testThread.isComplete()); //still blocked
-        queue.take();
-        finished.await();
-        assertEquals(queueSize, queue.size());
-        assertEquals(queueSize, queue.getCurrentSize());
-        assertTrue(testThread.isComplete());
-        executor.shutdownNow();
-        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Test that queue will block on Take when queue is empty
-     * @throws InterruptedException
-     */
-    @Test
-    public void testBlockOnEmptyQueue() throws InterruptedException {
-        int queueSize = randomIntBetween(1, 3000);
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        CountDownLatch empty = new CountDownLatch(1);
-        CountDownLatch finished = new CountDownLatch(1);
-        ThroughputQueue queue = new ThroughputQueue();
-        BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, 
finished, queueSize, queue);
-        for(int i=0; i < queueSize; ++i) {
-            queue.put(i);
-        }
-        executor.submit(testThread);
-        empty.await();
-        assertEquals(0, queue.size());
-        assertEquals(0, queue.getCurrentSize());
-        assertFalse(testThread.isComplete());
-        safeSleep(1000);
-        assertFalse(testThread.isComplete());
-        queue.put(1);
-        finished.await();
-        assertEquals(0, queue.size());
-        assertEquals(0, queue.getCurrentSize());
-        assertTrue(testThread.isComplete());
-        executor.shutdownNow();
-        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
-    }
-
-
-    /**
-     * Test multiple threads putting and taking from the queue while
-     * this thread repeatedly calls the MXBean measurement methods.
-     * Should hammer the queue with request from multiple threads
-     * of all request types.  Purpose is to expose current modification 
exceptions
-     * and/or dead locks.
-     */
-    @Test
-    @Repeat(iterations = 3)
-    public void testMultiThreadAccessAndInteruptResponse() throws Exception {
-        int putTakeThreadCount = randomIntBetween(1, 10);
-        int dataCount = randomIntBetween(1, 2000000);
-        int pollCount = randomIntBetween(1, 2000000);
-        int maxSize = randomIntBetween(1, 1000);
-        CountDownLatch finished = new CountDownLatch(putTakeThreadCount);
-        ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID);
-        ExecutorService executor = 
Executors.newFixedThreadPool(putTakeThreadCount * 2);
-        for(int i=0; i < putTakeThreadCount; ++i) {
-            executor.submit(new PutData(finished, queue, dataCount));
-            executor.submit(new TakeData(queue));
-        }
-        for(int i=0; i < pollCount; ++i) {
-            queue.getAvgWait();
-            queue.getAdded();
-            queue.getCurrentSize();
-            queue.getMaxWait();
-            queue.getRemoved();
-            queue.getThroughput();
-        }
-        finished.await();
-        while(!queue.isEmpty()) {
-            LOGGER.info("Waiting for queue to be emptied...");
-            safeSleep(500);
-        }
-        long totalData = ((long) dataCount) * putTakeThreadCount;
-        assertEquals(totalData, queue.getAdded());
-        assertEquals(totalData, queue.getRemoved());
-        executor.shutdown();
-        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts
-        executor.shutdownNow();
-        executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown 
takes
-        //Randomized should not report thread leak
-    }
-
-
-
-    private void safeSleep(long sleep) {
-        try {
-            Thread.sleep(sleep);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-
-
-
-    /**
-     * Helper runnable for test {@link 
ThroughputQueueMulitThreadTest#testBlockOnFullQueue()}
-     */
-    private class BlocksOnFullQueue implements Runnable {
-
-        private CountDownLatch full;
-        volatile private boolean complete;
-        private int queueSize;
-        private CountDownLatch finished;
-        private BlockingQueue queue;
-
-        public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch 
finished, BlockingQueue queue, int queueSize) {
-            this.full = latch;
-            this.complete = false;
-            this.queueSize = queueSize;
-            this.finished = finished;
-            this.queue = queue;
-        }
-
-        @Override
-        public void run() {
-            try {
-                for (int i = 0; i < this.queueSize; ++i) {
-                    this.queue.put(i);
-                }
-                this.full.countDown();
-                this.queue.put(0);
-                this.complete = true;
-                this.finished.countDown();
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        public boolean isComplete() {
-            return this.complete;
-        }
-    }
-
-
-    /**
-     * Helper runnable class for test {@link 
ThroughputQueueMulitThreadTest#testBlockOnEmptyQueue()}
-     */
-    private class BlocksOnEmptyQueue implements Runnable {
-
-        private CountDownLatch full;
-        volatile private boolean complete;
-        private int queueSize;
-        private CountDownLatch finished;
-        private BlockingQueue queue;
-
-        public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch 
finished, int queueSize, BlockingQueue queue) {
-            this.full = full;
-            this.finished = finished;
-            this.queueSize = queueSize;
-            this.queue = queue;
-            this.complete = false;
-        }
-
-
-        @Override
-        public void run() {
-            try {
-                for(int i=0; i < this.queueSize; ++i) {
-                    this.queue.take();
-                }
-                this.full.countDown();
-                this.queue.take();
-                this.complete = true;
-                this.finished.countDown();
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        public boolean isComplete() {
-            return this.complete;
-        }
-    }
-
-
-    private class PutData implements Runnable {
-
-        private BlockingQueue queue;
-        private int dataCount;
-        private CountDownLatch finished;
-
-        public PutData(CountDownLatch finished, BlockingQueue queue, int 
dataCount) {
-            this.queue = queue;
-            this.dataCount = dataCount;
-            this.finished = finished;
-        }
-
-
-        @Override
-        public void run() {
-            try {
-                for(int i=0; i < this.dataCount; ++i) {
-                    this.queue.put(i);
-                }
-            } catch (InterruptedException ie) {
-                LOGGER.error("PUT DATA interupted !");
-                Thread.currentThread().interrupt();
-            }
-            this.finished.countDown();
-        }
-    }
-
-
-    private class TakeData implements Runnable {
-
-        private BlockingQueue queue;
-
-        public TakeData(BlockingQueue queue) {
-            this.queue = queue;
-        }
-
-
-        @Override
-        public void run() {
-            try {
-                while(true) {
-                    this.queue.take();
-                }
-            } catch (InterruptedException ie) {
-                LOGGER.error("PUT DATA interupted !");
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-}


Reply via email to